1# vim:set et ts=4 sw=4:
3"""Utility functions
5@contact: Debian FTP Master <ftpmaster@debian.org>
6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
7@license: GNU General Public License version 2 or later
8"""
10# This program is free software; you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation; either version 2 of the License, or
13# (at your option) any later version.
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
20# You should have received a copy of the GNU General Public License
21# along with this program; if not, write to the Free Software
22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24import datetime
25import email.policy
26import errno
27import functools
28import grp
29import os
30import pwd
31import re
32import shutil
33import subprocess
34import sys
35import tempfile
36from collections import defaultdict
37from collections.abc import Iterable, Mapping, Sequence
38from typing import TYPE_CHECKING, Literal, NoReturn, Optional, Union
40import apt_inst
41import apt_pkg
42import sqlalchemy.sql as sql
44import daklib.config as config
45import daklib.mail
46from daklib.dbconn import (
47 Architecture,
48 Component,
49 DBConn,
50 Override,
51 OverrideType,
52 get_active_keyring_paths,
53 get_architecture,
54 get_component,
55 get_or_set_metadatakey,
56 get_suite,
57 get_suite_architectures,
58)
60from .dak_exceptions import (
61 InvalidDscError,
62 NoFilesFieldError,
63 NoFreeFilenameError,
64 ParseChangesError,
65 SendmailFailedError,
66 UnknownFormatError,
67)
68from .formats import parse_format, validate_changes_format
69from .gpg import SignedFile
70from .regexes import (
71 re_build_dep_arch,
72 re_issource,
73 re_multi_line_field,
74 re_parse_maintainer,
75 re_re_mark,
76 re_single_line_field,
77 re_srchasver,
78 re_whitespace_comment,
79)
80from .srcformats import get_format_from_string
81from .textutils import fix_maintainer
83if TYPE_CHECKING: 83 ↛ 84line 83 didn't jump to line 84, because the condition on line 83 was never true
84 import daklib.daklog
85 import daklib.fstransactions
86 import daklib.upload
88################################################################################
90key_uid_email_cache: dict[str, list[str]] = (
91 {}
92) #: Cache for email addresses from gpg key uids
94################################################################################
97def input_or_exit(prompt: Optional[str] = None) -> str:
98 try:
99 return input(prompt)
100 except EOFError:
101 sys.exit("\nUser interrupt (^D).")
104################################################################################
107def extract_component_from_section(section: str) -> tuple[str, str]:
108 """split "section" into "section", "component" parts
110 If "component" is not given, "main" is used instead.
112 :return: tuple (section, component)
113 """
114 if section.find("/") != -1:
115 return section, section.split("/", 1)[0]
116 return section, "main"
119################################################################################
122def parse_deb822(
123 armored_contents: bytes, signing_rules: Literal[-1, 0, 1] = 0, keyrings=None
124) -> dict[str, str]:
125 require_signature = True
126 if keyrings is None: 126 ↛ 130line 126 didn't jump to line 130, because the condition on line 126 was never false
127 keyrings = []
128 require_signature = False
130 signed_file = SignedFile(
131 armored_contents, keyrings=keyrings, require_signature=require_signature
132 )
133 contents = signed_file.contents.decode("utf-8")
135 error = ""
136 changes = {}
138 # Split the lines in the input, keeping the linebreaks.
139 lines = contents.splitlines(True)
141 if len(lines) == 0:
142 raise ParseChangesError("[Empty changes file]")
144 # Reindex by line number so we can easily verify the format of
145 # .dsc files...
146 index = 0
147 indexed_lines = {}
148 for line in lines:
149 index += 1
150 indexed_lines[index] = line[:-1]
152 num_of_lines = len(indexed_lines)
153 index = 0
154 first = -1
155 while index < num_of_lines:
156 index += 1
157 line = indexed_lines[index]
158 if line == "" and signing_rules == 1: 158 ↛ 159line 158 didn't jump to line 159, because the condition on line 158 was never true
159 if index != num_of_lines:
160 raise InvalidDscError(index)
161 break
162 if slf := re_single_line_field.match(line):
163 field = slf.groups()[0].lower()
164 changes[field] = slf.groups()[1]
165 first = 1
166 continue
167 if line == " .":
168 changes[field] += "\n"
169 continue
170 if mlf := re_multi_line_field.match(line):
171 if first == -1: 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true
172 raise ParseChangesError(
173 "'%s'\n [Multi-line field continuing on from nothing?]" % (line)
174 )
175 if first == 1 and changes[field] != "":
176 changes[field] += "\n"
177 first = 0
178 changes[field] += mlf.groups()[0] + "\n"
179 continue
180 error += line
182 changes["filecontents"] = armored_contents.decode()
184 if "source" in changes:
185 # Strip the source version in brackets from the source field,
186 # put it in the "source-version" field instead.
187 if srcver := re_srchasver.search(changes["source"]): 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true
188 changes["source"] = srcver.group(1)
189 changes["source-version"] = srcver.group(2)
191 if error: 191 ↛ 192line 191 didn't jump to line 192, because the condition on line 191 was never true
192 raise ParseChangesError(error)
194 return changes
197################################################################################
200def parse_changes(
201 filename: str,
202 signing_rules: Literal[-1, 0, 1] = 0,
203 dsc_file: bool = False,
204 keyrings=None,
205) -> dict[str, str]:
206 """
207 Parses a changes or source control (.dsc) file and returns a dictionary
208 where each field is a key. The mandatory first argument is the
209 filename of the .changes file.
211 signing_rules is an optional argument:
213 - If signing_rules == -1, no signature is required.
214 - If signing_rules == 0 (the default), a signature is required.
215 - If signing_rules == 1, it turns on the same strict format checking
216 as dpkg-source.
218 The rules for (signing_rules == 1)-mode are:
220 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----"
221 followed by any PGP header data and must end with a blank line.
223 - The data section must end with a blank line and must be followed by
224 "-----BEGIN PGP SIGNATURE-----".
226 :param dsc_file: `filename` is a Debian source control (.dsc) file
227 """
229 with open(filename, "rb") as changes_in:
230 content = changes_in.read()
231 changes = parse_deb822(content, signing_rules, keyrings=keyrings)
233 if not dsc_file:
234 # Finally ensure that everything needed for .changes is there
235 must_keywords = (
236 "Format",
237 "Date",
238 "Source",
239 "Architecture",
240 "Version",
241 "Distribution",
242 "Maintainer",
243 "Changes",
244 "Files",
245 )
247 missingfields = []
248 for keyword in must_keywords:
249 if keyword.lower() not in changes: 249 ↛ 250line 249 didn't jump to line 250, because the condition on line 249 was never true
250 missingfields.append(keyword)
252 if len(missingfields):
253 raise ParseChangesError(
254 "Missing mandatory field(s) in changes file (policy 5.5): %s"
255 % (missingfields)
256 )
258 return changes
261################################################################################
264def check_dsc_files(
265 dsc_filename: str,
266 dsc: Mapping[str, str],
267 dsc_files: Mapping[str, Mapping[str, str]],
268) -> list[str]:
269 """
270 Verify that the files listed in the Files field of the .dsc are
271 those expected given the announced Format.
273 :param dsc_filename: path of .dsc file
274 :param dsc: the content of the .dsc parsed by :func:`parse_changes`
275 :param dsc_files: the file list returned by :func:`build_file_list`
276 :return: all errors detected
277 """
278 rejmsg = []
280 # Ensure .dsc lists proper set of source files according to the format
281 # announced
282 has: defaultdict[str, int] = defaultdict(lambda: 0)
284 ftype_lookup = (
285 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)),
286 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")),
287 (r"diff\.gz", ("debian_diff",)),
288 (r"tar\.gz", ("native_tar_gz", "native_tar")),
289 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)),
290 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)),
291 (r"tar\.(gz|bz2|xz)", ("native_tar",)),
292 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)),
293 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)),
294 )
296 for f in dsc_files:
297 m = re_issource.match(f)
298 if not m: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true
299 rejmsg.append(
300 "%s: %s in Files field not recognised as source." % (dsc_filename, f)
301 )
302 continue
304 # Populate 'has' dictionary by resolving keys in lookup table
305 matched = False
306 for regex, keys in ftype_lookup: 306 ↛ 314line 306 didn't jump to line 314, because the loop on line 306 didn't complete
307 if re.match(regex, m.group(3)):
308 matched = True
309 for key in keys:
310 has[key] += 1
311 break
313 # File does not match anything in lookup table; reject
314 if not matched: 314 ↛ 315line 314 didn't jump to line 315, because the condition on line 314 was never true
315 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f))
316 break
318 # Check for multiple files
319 for file_type in (
320 "orig_tar",
321 "orig_tar_sig",
322 "native_tar",
323 "debian_tar",
324 "debian_diff",
325 ):
326 if has[file_type] > 1: 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true
327 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type))
329 # Source format specific tests
330 try:
331 format = get_format_from_string(dsc["format"])
332 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)])
334 except UnknownFormatError:
335 # Not an error here for now
336 pass
338 return rejmsg
341################################################################################
343# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl
346def build_file_list(
347 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum"
348) -> dict[str, dict[str, str]]:
349 files = {}
351 # Make sure we have a Files: field to parse...
352 if field not in changes: 352 ↛ 353line 352 didn't jump to line 353, because the condition on line 352 was never true
353 raise NoFilesFieldError
355 # Validate .changes Format: field
356 if not is_a_dsc: 356 ↛ 357line 356 didn't jump to line 357, because the condition on line 356 was never true
357 validate_changes_format(parse_format(changes["format"]), field)
359 includes_section = (not is_a_dsc) and field == "files"
361 # Parse each entry/line:
362 for i in changes[field].split("\n"): 362 ↛ 387line 362 didn't jump to line 387, because the loop on line 362 didn't complete
363 if not i:
364 break
365 s = i.split()
366 section = priority = ""
367 try:
368 if includes_section: 368 ↛ 369line 368 didn't jump to line 369, because the condition on line 368 was never true
369 (md5, size, section, priority, name) = s
370 else:
371 (md5, size, name) = s
372 except ValueError:
373 raise ParseChangesError(i)
375 if section == "": 375 ↛ 377line 375 didn't jump to line 377, because the condition on line 375 was never false
376 section = "-"
377 if priority == "": 377 ↛ 380line 377 didn't jump to line 380, because the condition on line 377 was never false
378 priority = "-"
380 (section, component) = extract_component_from_section(section)
382 files[name] = dict(
383 size=size, section=section, priority=priority, component=component
384 )
385 files[name][hashname] = md5
387 return files
390################################################################################
393def send_mail(message: str, whitelists: Optional[list[str]] = None) -> None:
394 """sendmail wrapper, takes a message string
396 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists
397 everything, otherwise an address is whitelisted if it is
398 included in any of the lists.
399 In addition a global whitelist can be specified in
400 Dinstall::MailWhiteList.
401 """
403 msg = daklib.mail.parse_mail(message)
405 # The incoming message might be UTF-8, but outgoing mail should
406 # use a legacy-compatible encoding. Set the content to the
407 # text to make sure this is the case.
408 # Note that this does not work with multipart messages.
409 msg.set_content(msg.get_payload(), cte="quoted-printable")
411 # Check whether we're supposed to be sending mail
412 call_sendmail = True
413 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]:
414 call_sendmail = False
416 if whitelists is None or None in whitelists:
417 whitelists = []
418 if Cnf.get("Dinstall::MailWhiteList", ""): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true
419 whitelists.append(Cnf["Dinstall::MailWhiteList"])
420 if len(whitelists) != 0: 420 ↛ 421line 420 didn't jump to line 421, because the condition on line 420 was never true
421 whitelist = []
422 for path in whitelists:
423 with open(path, "r") as whitelist_in:
424 for line in whitelist_in:
425 if not re_whitespace_comment.match(line):
426 if re_re_mark.match(line):
427 whitelist.append(
428 re.compile(re_re_mark.sub("", line.strip(), 1))
429 )
430 else:
431 whitelist.append(re.compile(re.escape(line.strip())))
433 # Fields to check.
434 fields = ["To", "Bcc", "Cc"]
435 for field in fields:
436 # Check each field
437 value = msg.get(field, None)
438 if value is not None:
439 match = []
440 for item in value.split(","):
441 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer(
442 item.strip()
443 )
444 mail_whitelisted = 0
445 for wr in whitelist:
446 if wr.match(mail):
447 mail_whitelisted = 1
448 break
449 if not mail_whitelisted:
450 print("Skipping {0} since it's not whitelisted".format(item))
451 continue
452 match.append(item)
454 # Doesn't have any mail in whitelist so remove the header
455 if len(match) == 0:
456 del msg[field]
457 else:
458 msg.replace_header(field, ", ".join(match))
460 # Change message fields in order if we don't have a To header
461 if "To" not in msg:
462 fields.reverse()
463 for field in fields:
464 if field in msg:
465 msg[fields[-1]] = msg[field]
466 del msg[field]
467 break
468 else:
469 # return, as we removed all recipients.
470 call_sendmail = False
472 # sign mail
473 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 473 ↛ 483line 473 didn't jump to line 483, because the condition on line 473 was never false
474 kwargs = {
475 "keyids": [mailkey],
476 "pubring": Cnf.get("Dinstall::SigningPubKeyring") or None,
477 "secring": Cnf.get("Dinstall::SigningKeyring") or None,
478 "homedir": Cnf.get("Dinstall::SigningHomedir") or None,
479 "passphrase_file": Cnf.get("Dinstall::SigningPassphraseFile") or None,
480 }
481 msg = daklib.mail.sign_mail(msg, **kwargs)
483 msg_bytes = msg.as_bytes(policy=email.policy.default)
485 maildir = Cnf.get("Dir::Mail")
486 if maildir: 486 ↛ 493line 486 didn't jump to line 493, because the condition on line 486 was never false
487 path = os.path.join(maildir, datetime.datetime.now().isoformat())
488 path = find_next_free(path)
489 with open(path, "wb") as fh:
490 fh.write(msg_bytes)
492 # Invoke sendmail
493 if not call_sendmail:
494 return
495 try:
496 subprocess.run(
497 Cnf["Dinstall::SendmailCommand"].split(),
498 input=msg_bytes,
499 check=True,
500 stdout=subprocess.PIPE,
501 stderr=subprocess.STDOUT,
502 )
503 except subprocess.CalledProcessError as e:
504 raise SendmailFailedError(e.output.decode().rstrip())
507################################################################################
510def poolify(source: str) -> str:
511 """convert `source` name into directory path used in pool structure"""
512 if source[:3] == "lib": 512 ↛ 513line 512 didn't jump to line 513, because the condition on line 512 was never true
513 return source[:4] + "/" + source + "/"
514 else:
515 return source[:1] + "/" + source + "/"
518################################################################################
521def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None:
522 if os.path.exists(dest) and os.path.isdir(dest):
523 dest_dir = dest
524 else:
525 dest_dir = os.path.dirname(dest)
526 if not os.path.lexists(dest_dir):
527 umask = os.umask(00000)
528 os.makedirs(dest_dir, 0o2775)
529 os.umask(umask)
530 # print "Moving %s to %s..." % (src, dest)
531 if os.path.exists(dest) and os.path.isdir(dest):
532 dest += "/" + os.path.basename(src)
533 # Don't overwrite unless forced to
534 if os.path.lexists(dest):
535 if not overwrite:
536 fubar("Can't move %s to %s - file already exists." % (src, dest))
537 else:
538 if not os.access(dest, os.W_OK):
539 fubar(
540 "Can't move %s to %s - can't write to existing file." % (src, dest)
541 )
542 shutil.copy2(src, dest)
543 os.chmod(dest, perms)
544 os.unlink(src)
547################################################################################
550def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str:
551 """Perform a substition of template"""
552 with open(filename) as templatefile:
553 template = templatefile.read()
554 for k, v in subst_map.items():
555 template = template.replace(k, str(v))
556 return template
559################################################################################
562def fubar(msg: str, exit_code: int = 1) -> NoReturn:
563 """print error message and exit program"""
564 print("E:", msg, file=sys.stderr)
565 sys.exit(exit_code)
568def warn(msg: str) -> None:
569 """print warning message"""
570 print("W:", msg, file=sys.stderr)
573################################################################################
576def whoami() -> str:
577 """get user name
579 Returns the user name with a laughable attempt at rfc822 conformancy
580 (read: removing stray periods).
581 """
582 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "")
585def getusername() -> str:
586 """get login name"""
587 return pwd.getpwuid(os.getuid())[0]
590################################################################################
593def size_type(c: Union[int, float]) -> str:
594 t = " B"
595 if c > 10240:
596 c = c / 1024
597 t = " KB"
598 if c > 10240: 598 ↛ 599line 598 didn't jump to line 599, because the condition on line 598 was never true
599 c = c / 1024
600 t = " MB"
601 return "%d%s" % (c, t)
604################################################################################
607def find_next_free(dest: str, too_many: int = 100) -> str:
608 extra = 0
609 orig_dest = dest
610 while os.path.lexists(dest) and extra < too_many:
611 dest = orig_dest + "." + repr(extra)
612 extra += 1
613 if extra >= too_many: 613 ↛ 614line 613 didn't jump to line 614, because the condition on line 613 was never true
614 raise NoFreeFilenameError
615 return dest
618################################################################################
621def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str:
622 return sep.join(x if x is not None else "" for x in original)
625################################################################################
628def prefix_multi_line_string(
629 lines: str, prefix: str, include_blank_lines: bool = False
630) -> str:
631 """prepend `prefix` to each line in `lines`"""
632 return "\n".join(
633 prefix + cleaned_line
634 for line in lines.split("\n")
635 if (cleaned_line := line.strip()) or include_blank_lines
636 )
639################################################################################
642def join_with_commas_and(list: Sequence[str]) -> str:
643 if len(list) == 0: 643 ↛ 644line 643 didn't jump to line 644, because the condition on line 643 was never true
644 return "nothing"
645 if len(list) == 1: 645 ↛ 647line 645 didn't jump to line 647, because the condition on line 645 was never false
646 return list[0]
647 return ", ".join(list[:-1]) + " and " + list[-1]
650################################################################################
653def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str:
654 pp_deps = (
655 f"{pkg} ({constraint} {version})" if constraint else pkg
656 for pkg, constraint, version in deps
657 )
658 return " |".join(pp_deps)
661################################################################################
664def get_conf():
665 return Cnf
668################################################################################
671def parse_args(Options) -> tuple[str, str, str, bool]:
672 """Handle -a, -c and -s arguments; returns them as SQL constraints"""
673 # XXX: This should go away and everything which calls it be converted
674 # to use SQLA properly. For now, we'll just fix it not to use
675 # the old Pg interface though
676 session = DBConn().session()
677 # Process suite
678 if Options["Suite"]: 678 ↛ 696line 678 didn't jump to line 696, because the condition on line 678 was never false
679 suite_ids_list = []
680 for suitename in split_args(Options["Suite"]):
681 suite = get_suite(suitename, session=session)
682 if not suite or suite.suite_id is None: 682 ↛ 683line 682 didn't jump to line 683, because the condition on line 682 was never true
683 warn(
684 "suite '%s' not recognised."
685 % (suite and suite.suite_name or suitename)
686 )
687 else:
688 suite_ids_list.append(suite.suite_id)
689 if suite_ids_list: 689 ↛ 694line 689 didn't jump to line 694, because the condition on line 689 was never false
690 con_suites = "AND su.id IN (%s)" % ", ".join(
691 [str(i) for i in suite_ids_list]
692 )
693 else:
694 fubar("No valid suite given.")
695 else:
696 con_suites = ""
698 # Process component
699 if Options["Component"]: 699 ↛ 700line 699 didn't jump to line 700, because the condition on line 699 was never true
700 component_ids_list = []
701 for componentname in split_args(Options["Component"]):
702 component = get_component(componentname, session=session)
703 if component is None:
704 warn("component '%s' not recognised." % (componentname))
705 else:
706 component_ids_list.append(component.component_id)
707 if component_ids_list:
708 con_components = "AND c.id IN (%s)" % ", ".join(
709 [str(i) for i in component_ids_list]
710 )
711 else:
712 fubar("No valid component given.")
713 else:
714 con_components = ""
716 # Process architecture
717 con_architectures = ""
718 check_source = False
719 if Options["Architecture"]: 719 ↛ 720line 719 didn't jump to line 720, because the condition on line 719 was never true
720 arch_ids_list = []
721 for archname in split_args(Options["Architecture"]):
722 if archname == "source":
723 check_source = True
724 else:
725 arch = get_architecture(archname, session=session)
726 if arch is None:
727 warn("architecture '%s' not recognised." % (archname))
728 else:
729 arch_ids_list.append(arch.arch_id)
730 if arch_ids_list:
731 con_architectures = "AND a.id IN (%s)" % ", ".join(
732 [str(i) for i in arch_ids_list]
733 )
734 else:
735 if not check_source:
736 fubar("No valid architecture given.")
737 else:
738 check_source = True
740 return (con_suites, con_architectures, con_components, check_source)
743################################################################################
746@functools.total_ordering
747class ArchKey:
748 """
749 Key object for use in sorting lists of architectures.
751 Sorts normally except that 'source' dominates all others.
752 """
754 __slots__ = ["arch", "issource"]
756 def __init__(self, arch, *args):
757 self.arch = arch
758 self.issource = arch == "source"
760 def __lt__(self, other: "ArchKey") -> bool:
761 if self.issource:
762 return not other.issource
763 if other.issource:
764 return False
765 return self.arch < other.arch
767 def __eq__(self, other: object) -> bool:
768 if not isinstance(other, ArchKey): 768 ↛ 769line 768 didn't jump to line 769, because the condition on line 768 was never true
769 return NotImplemented
770 return self.arch == other.arch
773################################################################################
776def split_args(s: str, dwim: bool = True) -> list[str]:
777 """
778 Split command line arguments which can be separated by either commas
779 or whitespace. If dwim is set, it will complain about string ending
780 in comma since this usually means someone did 'dak ls -a i386, m68k
781 foo' or something and the inevitable confusion resulting from 'm68k'
782 being treated as an argument is undesirable.
783 """
785 if s.find(",") == -1: 785 ↛ 788line 785 didn't jump to line 788, because the condition on line 785 was never false
786 return s.split()
787 else:
788 if s[-1:] == "," and dwim:
789 fubar("split_args: found trailing comma, spurious space maybe?")
790 return s.split(",")
793################################################################################
796def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]:
797 if keyrings is None: 797 ↛ 800line 797 didn't jump to line 800, because the condition on line 797 was never false
798 keyrings = get_active_keyring_paths()
800 return ["--keyring={}".format(path) for path in keyrings]
803################################################################################
806def _gpg_get_addresses_from_listing(output: bytes) -> list[str]:
807 addresses: list[str] = []
809 for line in output.split(b"\n"):
810 parts = line.split(b":")
811 if parts[0] not in (b"uid", b"pub"):
812 continue
813 if parts[1] in (b"i", b"d", b"r"): 813 ↛ 815line 813 didn't jump to line 815, because the condition on line 813 was never true
814 # Skip uid that is invalid, disabled or revoked
815 continue
816 try:
817 uid_bytes = parts[9]
818 except IndexError:
819 continue
820 try:
821 uid = uid_bytes.decode(encoding="utf-8")
822 except UnicodeDecodeError:
823 # If the uid is not valid UTF-8, we assume it is an old uid
824 # still encoding in Latin-1.
825 uid = uid_bytes.decode(encoding="latin1")
826 m = re_parse_maintainer.match(uid)
827 if not m:
828 continue
829 address = m.group(2)
830 if address.endswith("@debian.org"): 830 ↛ 833line 830 didn't jump to line 833, because the condition on line 830 was never true
831 # prefer @debian.org addresses
832 # TODO: maybe not hardcode the domain
833 addresses.insert(0, address)
834 else:
835 addresses.append(address)
837 return addresses
840def gpg_get_key_addresses(fingerprint: str) -> list[str]:
841 """retreive email addresses from gpg key uids for a given fingerprint"""
842 addresses = key_uid_email_cache.get(fingerprint)
843 if addresses is not None:
844 return addresses
846 try:
847 cmd = ["gpg", "--no-default-keyring"]
848 cmd.extend(gpg_keyring_args())
849 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint])
850 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
851 except subprocess.CalledProcessError:
852 addresses = []
853 else:
854 addresses = _gpg_get_addresses_from_listing(output)
856 key_uid_email_cache[fingerprint] = addresses
857 return addresses
860################################################################################
863def open_ldap_connection():
864 """open connection to the configured LDAP server"""
865 import ldap # type: ignore
867 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"]
868 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile")
870 conn = ldap.initialize(LDAPServer)
872 if ca_cert_file:
873 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
874 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file)
875 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True)
876 conn.start_tls_s()
878 conn.simple_bind_s("", "")
880 return conn
883################################################################################
886def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]:
887 """retrieve login from LDAP linked to a given fingerprint"""
888 import ldap
890 conn = open_ldap_connection()
891 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
892 Attrs = conn.search_s(
893 LDAPDn,
894 ldap.SCOPE_ONELEVEL,
895 "(keyfingerprint=%s)" % fingerprint,
896 ["uid", "keyfingerprint"],
897 )
898 login: dict[str, str] = {}
899 for elem in Attrs:
900 fpr = elem[1]["keyFingerPrint"][0].decode()
901 uid = elem[1]["uid"][0].decode()
902 login[fpr] = uid
903 return login
906################################################################################
909def get_users_from_ldap() -> dict[str, str]:
910 """retrieve login and user names from LDAP"""
911 import ldap
913 conn = open_ldap_connection()
914 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
915 Attrs = conn.search_s(
916 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"]
917 )
918 users: dict[str, str] = {}
919 for elem in Attrs:
920 elem = elem[1]
921 name = []
922 for k in ("cn", "mn", "sn"):
923 try:
924 value = elem[k][0].decode()
925 if value and value[0] != "-":
926 name.append(value)
927 except KeyError:
928 pass
929 users[" ".join(name)] = elem["uid"][0]
930 return users
933################################################################################
936def clean_symlink(src: str, dest: str, root: str) -> str:
937 """
938 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'.
939 Returns fixed 'src'
940 """
941 src = src.replace(root, "", 1)
942 dest = dest.replace(root, "", 1)
943 dest = os.path.dirname(dest)
944 new_src = "../" * len(dest.split("/"))
945 return new_src + src
948################################################################################
951def temp_dirname(
952 parent: Optional[str] = None,
953 prefix: str = "dak",
954 suffix: str = "",
955 mode: Optional[int] = None,
956 group: Optional[str] = None,
957) -> str:
958 """
959 Return a secure and unique directory by pre-creating it.
961 :param parent: If non-null it will be the directory the directory is pre-created in.
962 :param prefix: The filename will be prefixed with this string
963 :param suffix: The filename will end with this string
964 :param mode: If set the file will get chmodded to those permissions
965 :param group: If set the file will get chgrped to the specified group.
966 :return: Returns a pair (fd, name)
968 """
970 tfname = tempfile.mkdtemp(suffix, prefix, parent)
971 if mode is not None: 971 ↛ 973line 971 didn't jump to line 973, because the condition on line 971 was never false
972 os.chmod(tfname, mode)
973 if group is not None: 973 ↛ 974line 973 didn't jump to line 974, because the condition on line 973 was never true
974 gid = grp.getgrnam(group).gr_gid
975 os.chown(tfname, -1, gid)
976 return tfname
979################################################################################
982def get_changes_files(from_dir: str) -> list[str]:
983 """
984 Takes a directory and lists all .changes files in it (as well as chdir'ing
985 to the directory; this is due to broken behaviour on the part of p-u/p-a
986 when you're not in the right place)
988 Returns a list of filenames
989 """
990 try:
991 # Much of the rest of p-u/p-a depends on being in the right place
992 os.chdir(from_dir)
993 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")]
994 except OSError as e:
995 fubar("Failed to read list from directory %s (%s)" % (from_dir, e))
997 return changes_files
1000################################################################################
1003Cnf = config.Config().Cnf
1005################################################################################
1008def parse_wnpp_bug_file(
1009 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm",
1010) -> dict[str, list[str]]:
1011 """
1012 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm
1013 Well, actually it parsed a local copy, but let's document the source
1014 somewhere ;)
1016 returns a dict associating source package name with a list of open wnpp
1017 bugs (Yes, there might be more than one)
1018 """
1020 try:
1021 with open(file) as f:
1022 lines = f.readlines()
1023 except OSError:
1024 print(
1025 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any."
1026 % file
1027 )
1028 lines = []
1029 wnpp = {}
1031 for line in lines:
1032 splited_line = line.split(": ", 1)
1033 if len(splited_line) > 1:
1034 wnpp[splited_line[0]] = splited_line[1].split("|")
1036 for source in wnpp:
1037 bugs = []
1038 for wnpp_bug in wnpp[source]:
1039 bug_no = re.search(r"(\d)+", wnpp_bug).group()
1040 if bug_no:
1041 bugs.append(bug_no)
1042 wnpp[source] = bugs
1043 return wnpp
1046################################################################################
1049def deb_extract_control(path: str) -> bytes:
1050 """extract DEBIAN/control from a binary package"""
1051 return apt_inst.DebFile(path).control.extractdata("control")
1054################################################################################
1057def mail_addresses_for_upload(
1058 maintainer: str,
1059 changed_by: str,
1060 fingerprint: str,
1061 authorized_by_fingerprint: Optional[str],
1062) -> list[str]:
1063 """mail addresses to contact for an upload
1065 :param maintainer: Maintainer field of the .changes file
1066 :param changed_by: Changed-By field of the .changes file
1067 :param fingerprint: fingerprint of the key used to sign the upload
1068 :return: list of RFC 2047-encoded mail addresses to contact regarding
1069 this upload
1070 """
1071 recipients = Cnf.value_list("Dinstall::UploadMailRecipients")
1072 if not recipients: 1072 ↛ 1081line 1072 didn't jump to line 1081, because the condition on line 1072 was never false
1073 recipients = [
1074 "maintainer",
1075 "changed_by",
1076 "signer",
1077 "authorized_by",
1078 ]
1080 # Ensure signer and authorized_by are last if present
1081 for r in ("signer", "authorized_by"):
1082 try:
1083 recipients.remove(r)
1084 except ValueError:
1085 pass
1086 else:
1087 recipients.append(r)
1089 # Compute the set of addresses of the recipients
1090 addresses = set() # Name + email
1091 emails = set() # Email only, used to avoid duplicates
1092 for recipient in recipients:
1093 if recipient.startswith("mail:"): # Email hardcoded in config 1093 ↛ 1094line 1093 didn't jump to line 1094, because the condition on line 1093 was never true
1094 address = recipient[5:]
1095 elif recipient == "maintainer":
1096 address = maintainer
1097 elif recipient == "changed_by":
1098 address = changed_by
1099 elif recipient == "signer" or recipient == "authorized_by": 1099 ↛ 1109line 1099 didn't jump to line 1109, because the condition on line 1099 was never false
1100 fpr = fingerprint if recipient == "signer" else authorized_by_fingerprint
1101 if not fpr: 1101 ↛ 1102line 1101 didn't jump to line 1102, because the condition on line 1101 was never true
1102 continue
1103 fpr_addresses = gpg_get_key_addresses(fpr)
1104 address = fpr_addresses[0] if fpr_addresses else None
1105 if any(x in emails for x in fpr_addresses):
1106 # The signer already gets a copy via another email
1107 address = None
1108 else:
1109 raise Exception(
1110 "Unsupported entry in {0}: {1}".format(
1111 "Dinstall::UploadMailRecipients", recipient
1112 )
1113 )
1115 if address is not None:
1116 mail = fix_maintainer(address)[3]
1117 if mail not in emails:
1118 addresses.add(address)
1119 emails.add(mail)
1121 encoded_addresses = [fix_maintainer(e)[1] for e in addresses]
1122 return encoded_addresses
1125################################################################################
1128def call_editor_for_file(path: str) -> None:
1129 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor"))
1130 subprocess.check_call([editor, path])
1133################################################################################
1136def call_editor(text: str = "", suffix: str = ".txt") -> str:
1137 """run editor and return the result as a string
1139 :param text: initial text
1140 :param suffix: extension for temporary file
1141 :return: string with the edited text
1142 """
1143 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh:
1144 print(text, end="", file=fh)
1145 fh.flush()
1146 call_editor_for_file(fh.name)
1147 fh.seek(0)
1148 return fh.read()
1151################################################################################
1154def check_reverse_depends(
1155 removals: Iterable[str],
1156 suite: str,
1157 arches: Optional[Iterable[Architecture]] = None,
1158 session=None,
1159 cruft: bool = False,
1160 quiet: bool = False,
1161 include_arch_all: bool = True,
1162) -> bool:
1163 dbsuite = get_suite(suite, session)
1164 overridesuite = dbsuite
1165 if dbsuite.overridesuite is not None: 1165 ↛ 1166line 1165 didn't jump to line 1166, because the condition on line 1165 was never true
1166 overridesuite = get_suite(dbsuite.overridesuite, session)
1167 dep_problem = False
1168 p2c = {}
1169 all_broken = defaultdict(lambda: defaultdict(set)) 1169 ↛ exitline 1169 didn't run the lambda on line 1169
1170 if arches: 1170 ↛ 1171line 1170 didn't jump to line 1171, because the condition on line 1170 was never true
1171 all_arches = set(arches)
1172 else:
1173 all_arches = set(x.arch_string for x in get_suite_architectures(suite))
1174 all_arches -= set(["source", "all"])
1175 removal_set = set(removals)
1176 metakey_d = get_or_set_metadatakey("Depends", session)
1177 metakey_p = get_or_set_metadatakey("Provides", session)
1178 params = {
1179 "suite_id": dbsuite.suite_id,
1180 "metakey_d_id": metakey_d.key_id,
1181 "metakey_p_id": metakey_p.key_id,
1182 }
1183 if include_arch_all: 1183 ↛ 1186line 1183 didn't jump to line 1186, because the condition on line 1183 was never false
1184 rdep_architectures = all_arches | set(["all"])
1185 else:
1186 rdep_architectures = all_arches
1187 for architecture in rdep_architectures:
1188 deps = {}
1189 sources = {}
1190 virtual_packages = {}
1191 try:
1192 params["arch_id"] = get_architecture(architecture, session).arch_id
1193 except AttributeError:
1194 continue
1196 statement = sql.text(
1197 """
1198 SELECT b.package, s.source, c.name as component,
1199 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends,
1200 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides
1201 FROM binaries b
1202 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id
1203 JOIN source s ON b.source = s.id
1204 JOIN files_archive_map af ON b.file = af.file_id
1205 JOIN component c ON af.component_id = c.id
1206 WHERE b.architecture = :arch_id"""
1207 )
1208 query = (
1209 session.query(
1210 sql.column("package"),
1211 sql.column("source"),
1212 sql.column("component"),
1213 sql.column("depends"),
1214 sql.column("provides"),
1215 )
1216 .from_statement(statement)
1217 .params(params)
1218 )
1219 for package, source, component, depends, provides in query:
1220 sources[package] = source
1221 p2c[package] = component
1222 if depends is not None: 1222 ↛ 1223line 1222 didn't jump to line 1223, because the condition on line 1222 was never true
1223 deps[package] = depends
1224 # Maintain a counter for each virtual package. If a
1225 # Provides: exists, set the counter to 0 and count all
1226 # provides by a package not in the list for removal.
1227 # If the counter stays 0 at the end, we know that only
1228 # the to-be-removed packages provided this virtual
1229 # package.
1230 if provides is not None: 1230 ↛ 1231line 1230 didn't jump to line 1231, because the condition on line 1230 was never true
1231 for virtual_pkg in provides.split(","):
1232 virtual_pkg = virtual_pkg.strip()
1233 if virtual_pkg == package:
1234 continue
1235 if virtual_pkg not in virtual_packages:
1236 virtual_packages[virtual_pkg] = 0
1237 if package not in removals:
1238 virtual_packages[virtual_pkg] += 1
1240 # If a virtual package is only provided by the to-be-removed
1241 # packages, treat the virtual package as to-be-removed too.
1242 removal_set.update(
1243 virtual_pkg
1244 for virtual_pkg in virtual_packages
1245 if not virtual_packages[virtual_pkg]
1246 )
1248 # Check binary dependencies (Depends)
1249 for package in deps: 1249 ↛ 1250line 1249 didn't jump to line 1250, because the loop on line 1249 never started
1250 if package in removals:
1251 continue
1252 try:
1253 parsed_dep = apt_pkg.parse_depends(deps[package])
1254 except ValueError as e:
1255 print("Error for package %s: %s" % (package, e))
1256 parsed_dep = []
1257 for dep in parsed_dep:
1258 # Check for partial breakage. If a package has a ORed
1259 # dependency, there is only a dependency problem if all
1260 # packages in the ORed depends will be removed.
1261 unsat = 0
1262 for dep_package, _, _ in dep:
1263 if dep_package in removals:
1264 unsat += 1
1265 if unsat == len(dep):
1266 component = p2c[package]
1267 source = sources[package]
1268 if component != "main":
1269 source = "%s/%s" % (source, component)
1270 all_broken[source][package].add(architecture)
1271 dep_problem = True
1273 if all_broken and not quiet: 1273 ↛ 1274line 1273 didn't jump to line 1274, because the condition on line 1273 was never true
1274 if cruft:
1275 print(" - broken Depends:")
1276 else:
1277 print("# Broken Depends:")
1278 for source, bindict in sorted(all_broken.items()):
1279 lines = []
1280 for binary, arches in sorted(bindict.items()):
1281 if arches == all_arches or "all" in arches:
1282 lines.append(binary)
1283 else:
1284 lines.append("%s [%s]" % (binary, " ".join(sorted(arches))))
1285 if cruft:
1286 print(" %s: %s" % (source, lines[0]))
1287 else:
1288 print("%s: %s" % (source, lines[0]))
1289 for line in lines[1:]:
1290 if cruft:
1291 print(" " + " " * (len(source) + 2) + line)
1292 else:
1293 print(" " * (len(source) + 2) + line)
1294 if not cruft:
1295 print()
1297 # Check source dependencies (Build-Depends and Build-Depends-Indep)
1298 all_broken = defaultdict(set)
1299 metakey_bd = get_or_set_metadatakey("Build-Depends", session)
1300 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session)
1301 if include_arch_all: 1301 ↛ 1304line 1301 didn't jump to line 1304, because the condition on line 1301 was never false
1302 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id)
1303 else:
1304 metakey_ids = (metakey_bd.key_id,)
1306 params = {
1307 "suite_id": dbsuite.suite_id,
1308 "metakey_ids": metakey_ids,
1309 }
1310 statement = sql.text(
1311 """
1312 SELECT s.source, string_agg(sm.value, ', ') as build_dep
1313 FROM source s
1314 JOIN source_metadata sm ON s.id = sm.src_id
1315 WHERE s.id in
1316 (SELECT src FROM newest_src_association
1317 WHERE suite = :suite_id)
1318 AND sm.key_id in :metakey_ids
1319 GROUP BY s.id, s.source"""
1320 )
1321 query = (
1322 session.query(sql.column("source"), sql.column("build_dep"))
1323 .from_statement(statement)
1324 .params(params)
1325 )
1326 for source, build_dep in query:
1327 if source in removals:
1328 continue
1329 parsed_dep = []
1330 if build_dep is not None: 1330 ↛ 1337line 1330 didn't jump to line 1337, because the condition on line 1330 was never false
1331 # Remove [arch] information since we want to see breakage on all arches
1332 build_dep = re_build_dep_arch.sub("", build_dep)
1333 try:
1334 parsed_dep = apt_pkg.parse_src_depends(build_dep)
1335 except ValueError as e:
1336 print("Error for source %s: %s" % (source, e))
1337 for dep in parsed_dep:
1338 unsat = 0
1339 for dep_package, _, _ in dep:
1340 if dep_package in removals: 1340 ↛ 1341line 1340 didn't jump to line 1341, because the condition on line 1340 was never true
1341 unsat += 1
1342 if unsat == len(dep): 1342 ↛ 1343line 1342 didn't jump to line 1343
1343 (component,) = (
1344 session.query(Component.component_name)
1345 .join(Component.overrides)
1346 .filter(Override.suite == overridesuite)
1347 .filter(
1348 Override.package
1349 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source)
1350 )
1351 .join(Override.overridetype)
1352 .filter(OverrideType.overridetype == "dsc")
1353 .first()
1354 )
1355 key = source
1356 if component != "main":
1357 key = "%s/%s" % (source, component)
1358 all_broken[key].add(pp_deps(dep))
1359 dep_problem = True
1361 if all_broken and not quiet: 1361 ↛ 1362line 1361 didn't jump to line 1362, because the condition on line 1361 was never true
1362 if cruft:
1363 print(" - broken Build-Depends:")
1364 else:
1365 print("# Broken Build-Depends:")
1366 for source, bdeps in sorted(all_broken.items()):
1367 bdeps = sorted(bdeps)
1368 if cruft:
1369 print(" %s: %s" % (source, bdeps[0]))
1370 else:
1371 print("%s: %s" % (source, bdeps[0]))
1372 for bdep in bdeps[1:]:
1373 if cruft:
1374 print(" " + " " * (len(source) + 2) + bdep)
1375 else:
1376 print(" " * (len(source) + 2) + bdep)
1377 if not cruft:
1378 print()
1380 return dep_problem
1383################################################################################
1386def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]:
1387 """source packages referenced via Built-Using
1389 :param control: control file to take Built-Using field from
1390 :return: list of (source_name, source_version) pairs
1391 """
1392 built_using = control.get("Built-Using", None)
1393 if built_using is None:
1394 return []
1396 bu = []
1397 for dep in apt_pkg.parse_depends(built_using):
1398 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field"
1399 source_name, source_version, comp = dep[0]
1400 assert comp == "=", "Built-Using must contain strict dependencies"
1401 bu.append((source_name, source_version))
1403 return bu
1406################################################################################
1409def is_in_debug_section(control: Mapping[str, str]) -> bool:
1410 """binary package is a debug package
1412 :param control: control file of binary package
1413 :return: True if the binary package is a debug package
1414 """
1415 section = control["Section"].split("/", 1)[-1]
1416 auto_built_package = control.get("Auto-Built-Package")
1417 return section == "debug" and auto_built_package == "debug-symbols"
1420################################################################################
1423def find_possibly_compressed_file(filename: str) -> str:
1424 """
1426 :param filename: path to a control file (Sources, Packages, etc) to
1427 look for
1428 :return: path to the (possibly compressed) control file, or null if the
1429 file doesn't exist
1430 """
1431 _compressions = ("", ".xz", ".gz", ".bz2")
1433 for ext in _compressions: 1433 ↛ 1438line 1433 didn't jump to line 1438, because the loop on line 1433 didn't complete
1434 _file = filename + ext
1435 if os.path.exists(_file):
1436 return _file
1438 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
1441################################################################################
1444def parse_boolean_from_user(value: str) -> bool:
1445 value = value.lower()
1446 if value in {"yes", "true", "enable", "enabled"}:
1447 return True
1448 if value in {"no", "false", "disable", "disabled"}: 1448 ↛ 1450line 1448 didn't jump to line 1450, because the condition on line 1448 was never false
1449 return False
1450 raise ValueError("Not sure whether %s should be a True or a False" % value)
1453def suite_suffix(suite_name: str) -> str:
1454 """Return suite_suffix for the given suite"""
1455 suffix = Cnf.find("Dinstall::SuiteSuffix", "")
1456 if suffix == "": 1456 ↛ 1458line 1456 didn't jump to line 1458, because the condition on line 1456 was never false
1457 return ""
1458 elif "Dinstall::SuiteSuffixSuites" not in Cnf:
1459 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future
1460 return suffix
1461 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"):
1462 return suffix
1463 return ""
1466################################################################################
1469def process_buildinfos(
1470 directory: str,
1471 buildinfo_files: "Iterable[daklib.upload.HashedFile]",
1472 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1473 logger: "daklib.daklog.Logger",
1474) -> None:
1475 """Copy buildinfo files into Dir::BuildinfoArchive
1477 :param directory: directory where .changes is stored
1478 :param buildinfo_files: names of buildinfo files
1479 :param fs_transaction: FilesystemTransaction instance
1480 :param logger: logger instance
1481 """
1483 if "Dir::BuildinfoArchive" not in Cnf:
1484 return
1486 target_dir = os.path.join(
1487 Cnf["Dir::BuildinfoArchive"],
1488 datetime.datetime.now().strftime("%Y/%m/%d"),
1489 )
1491 for f in buildinfo_files:
1492 src = os.path.join(directory, f.filename)
1493 dst = find_next_free(os.path.join(target_dir, f.filename))
1495 logger.log(["Archiving", f.filename])
1496 fs_transaction.copy(src, dst, mode=0o644)
1499################################################################################
1502def move_to_morgue(
1503 morguesubdir: str,
1504 filenames: Iterable[str],
1505 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1506 logger: "daklib.daklog.Logger",
1507):
1508 """Move a file to the correct dir in morgue
1510 :param morguesubdir: subdirectory of morgue where this file needs to go
1511 :param filenames: names of files
1512 :param fs_transaction: FilesystemTransaction instance
1513 :param logger: logger instance
1514 """
1516 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf.get("Dir::Base"), "morgue"))
1518 # Build directory as morguedir/morguesubdir/year/month/day
1519 now = datetime.datetime.now()
1520 dest = os.path.join(
1521 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day
1522 )
1524 for filename in filenames:
1525 dest_filename = dest + "/" + os.path.basename(filename)
1526 # If the destination file exists; try to find another filename to use
1527 if os.path.lexists(dest_filename): 1527 ↛ 1528line 1527 didn't jump to line 1528, because the condition on line 1527 was never true
1528 dest_filename = find_next_free(dest_filename)
1529 logger.log(["move to morgue", filename, dest_filename])
1530 fs_transaction.move(filename, dest_filename)