Coverage for daklib/utils.py: 57%
715 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-08-26 22:11 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-08-26 22:11 +0000
1# vim:set et ts=4 sw=4:
3"""Utility functions
5@contact: Debian FTP Master <ftpmaster@debian.org>
6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
7@license: GNU General Public License version 2 or later
8"""
10# This program is free software; you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation; either version 2 of the License, or
13# (at your option) any later version.
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
20# You should have received a copy of the GNU General Public License
21# along with this program; if not, write to the Free Software
22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24import datetime
25import email.policy
26import errno
27import functools
28import grp
29import os
30import pwd
31import re
32import shutil
33import subprocess
34import sys
35import tempfile
36from collections import defaultdict
37from collections.abc import Iterable, Mapping, Sequence
38from typing import TYPE_CHECKING, Literal, NoReturn, Optional, Union
40import apt_inst
41import apt_pkg
42import sqlalchemy.sql as sql
44import daklib.config as config
45import daklib.mail
46from daklib.dbconn import (
47 Architecture,
48 Component,
49 DBConn,
50 Override,
51 OverrideType,
52 get_active_keyring_paths,
53 get_architecture,
54 get_component,
55 get_or_set_metadatakey,
56 get_suite,
57 get_suite_architectures,
58)
60from .dak_exceptions import (
61 InvalidDscError,
62 NoFilesFieldError,
63 NoFreeFilenameError,
64 ParseChangesError,
65 SendmailFailedError,
66 UnknownFormatError,
67)
68from .formats import parse_format, validate_changes_format
69from .gpg import SignedFile
70from .regexes import (
71 re_build_dep_arch,
72 re_issource,
73 re_multi_line_field,
74 re_parse_maintainer,
75 re_re_mark,
76 re_single_line_field,
77 re_srchasver,
78 re_whitespace_comment,
79)
80from .srcformats import get_format_from_string
81from .textutils import fix_maintainer
83if TYPE_CHECKING: 83 ↛ 84line 83 didn't jump to line 84, because the condition on line 83 was never true
84 import daklib.daklog
85 import daklib.fstransactions
86 import daklib.upload
88################################################################################
90key_uid_email_cache: dict[str, list[str]] = (
91 {}
92) #: Cache for email addresses from gpg key uids
94################################################################################
97def input_or_exit(prompt: Optional[str] = None) -> str:
98 try:
99 return input(prompt)
100 except EOFError:
101 sys.exit("\nUser interrupt (^D).")
104################################################################################
107def extract_component_from_section(section: str) -> tuple[str, str]:
108 """split "section" into "section", "component" parts
110 If "component" is not given, "main" is used instead.
112 :return: tuple (section, component)
113 """
114 if section.find("/") != -1:
115 return section, section.split("/", 1)[0]
116 return section, "main"
119################################################################################
122def parse_deb822(
123 armored_contents: bytes, signing_rules: Literal[-1, 0, 1] = 0, keyrings=None
124) -> dict[str, str]:
125 require_signature = True
126 if keyrings is None: 126 ↛ 130line 126 didn't jump to line 130, because the condition on line 126 was never false
127 keyrings = []
128 require_signature = False
130 signed_file = SignedFile(
131 armored_contents, keyrings=keyrings, require_signature=require_signature
132 )
133 contents = signed_file.contents.decode("utf-8")
135 error = ""
136 changes = {}
138 # Split the lines in the input, keeping the linebreaks.
139 lines = contents.splitlines(True)
141 if len(lines) == 0:
142 raise ParseChangesError("[Empty changes file]")
144 # Reindex by line number so we can easily verify the format of
145 # .dsc files...
146 index = 0
147 indexed_lines = {}
148 for line in lines:
149 index += 1
150 indexed_lines[index] = line[:-1]
152 num_of_lines = len(indexed_lines)
153 index = 0
154 first = -1
155 while index < num_of_lines:
156 index += 1
157 line = indexed_lines[index]
158 if line == "" and signing_rules == 1: 158 ↛ 159line 158 didn't jump to line 159, because the condition on line 158 was never true
159 if index != num_of_lines:
160 raise InvalidDscError(index)
161 break
162 if slf := re_single_line_field.match(line):
163 field = slf.groups()[0].lower()
164 changes[field] = slf.groups()[1]
165 first = 1
166 continue
167 if line == " .":
168 changes[field] += "\n"
169 continue
170 if mlf := re_multi_line_field.match(line):
171 if first == -1: 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true
172 raise ParseChangesError(
173 "'%s'\n [Multi-line field continuing on from nothing?]" % (line)
174 )
175 if first == 1 and changes[field] != "":
176 changes[field] += "\n"
177 first = 0
178 changes[field] += mlf.groups()[0] + "\n"
179 continue
180 error += line
182 changes["filecontents"] = armored_contents.decode()
184 if "source" in changes:
185 # Strip the source version in brackets from the source field,
186 # put it in the "source-version" field instead.
187 if srcver := re_srchasver.search(changes["source"]): 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true
188 changes["source"] = srcver.group(1)
189 changes["source-version"] = srcver.group(2)
191 if error: 191 ↛ 192line 191 didn't jump to line 192, because the condition on line 191 was never true
192 raise ParseChangesError(error)
194 return changes
197################################################################################
200def parse_changes(
201 filename: str,
202 signing_rules: Literal[-1, 0, 1] = 0,
203 dsc_file: bool = False,
204 keyrings=None,
205) -> dict[str, str]:
206 """
207 Parses a changes or source control (.dsc) file and returns a dictionary
208 where each field is a key. The mandatory first argument is the
209 filename of the .changes file.
211 signing_rules is an optional argument:
213 - If signing_rules == -1, no signature is required.
214 - If signing_rules == 0 (the default), a signature is required.
215 - If signing_rules == 1, it turns on the same strict format checking
216 as dpkg-source.
218 The rules for (signing_rules == 1)-mode are:
220 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----"
221 followed by any PGP header data and must end with a blank line.
223 - The data section must end with a blank line and must be followed by
224 "-----BEGIN PGP SIGNATURE-----".
226 :param dsc_file: `filename` is a Debian source control (.dsc) file
227 """
229 with open(filename, "rb") as changes_in:
230 content = changes_in.read()
231 changes = parse_deb822(content, signing_rules, keyrings=keyrings)
233 if not dsc_file:
234 # Finally ensure that everything needed for .changes is there
235 must_keywords = (
236 "Format",
237 "Date",
238 "Source",
239 "Architecture",
240 "Version",
241 "Distribution",
242 "Maintainer",
243 "Changes",
244 "Files",
245 )
247 missingfields = []
248 for keyword in must_keywords:
249 if keyword.lower() not in changes: 249 ↛ 250line 249 didn't jump to line 250, because the condition on line 249 was never true
250 missingfields.append(keyword)
252 if len(missingfields):
253 raise ParseChangesError(
254 "Missing mandatory field(s) in changes file (policy 5.5): %s"
255 % (missingfields)
256 )
258 return changes
261################################################################################
264def check_dsc_files(
265 dsc_filename: str,
266 dsc: Mapping[str, str],
267 dsc_files: Mapping[str, Mapping[str, str]],
268) -> list[str]:
269 """
270 Verify that the files listed in the Files field of the .dsc are
271 those expected given the announced Format.
273 :param dsc_filename: path of .dsc file
274 :param dsc: the content of the .dsc parsed by :func:`parse_changes`
275 :param dsc_files: the file list returned by :func:`build_file_list`
276 :return: all errors detected
277 """
278 rejmsg = []
280 # Ensure .dsc lists proper set of source files according to the format
281 # announced
282 has: defaultdict[str, int] = defaultdict(lambda: 0)
284 ftype_lookup = (
285 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)),
286 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")),
287 (r"diff\.gz", ("debian_diff",)),
288 (r"tar\.gz", ("native_tar_gz", "native_tar")),
289 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)),
290 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)),
291 (r"tar\.(gz|bz2|xz)", ("native_tar",)),
292 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)),
293 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)),
294 )
296 for f in dsc_files:
297 m = re_issource.match(f)
298 if not m: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true
299 rejmsg.append(
300 "%s: %s in Files field not recognised as source." % (dsc_filename, f)
301 )
302 continue
304 # Populate 'has' dictionary by resolving keys in lookup table
305 matched = False
306 for regex, keys in ftype_lookup: 306 ↛ 314line 306 didn't jump to line 314, because the loop on line 306 didn't complete
307 if re.match(regex, m.group(3)):
308 matched = True
309 for key in keys:
310 has[key] += 1
311 break
313 # File does not match anything in lookup table; reject
314 if not matched: 314 ↛ 315line 314 didn't jump to line 315, because the condition on line 314 was never true
315 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f))
316 break
318 # Check for multiple files
319 for file_type in (
320 "orig_tar",
321 "orig_tar_sig",
322 "native_tar",
323 "debian_tar",
324 "debian_diff",
325 ):
326 if has[file_type] > 1: 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true
327 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type))
329 # Source format specific tests
330 try:
331 format = get_format_from_string(dsc["format"])
332 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)])
334 except UnknownFormatError:
335 # Not an error here for now
336 pass
338 return rejmsg
341################################################################################
343# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl
346def build_file_list(
347 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum"
348) -> dict[str, dict[str, str]]:
349 files = {}
351 # Make sure we have a Files: field to parse...
352 if field not in changes: 352 ↛ 353line 352 didn't jump to line 353, because the condition on line 352 was never true
353 raise NoFilesFieldError
355 # Validate .changes Format: field
356 if not is_a_dsc: 356 ↛ 357line 356 didn't jump to line 357, because the condition on line 356 was never true
357 validate_changes_format(parse_format(changes["format"]), field)
359 includes_section = (not is_a_dsc) and field == "files"
361 # Parse each entry/line:
362 for i in changes[field].split("\n"): 362 ↛ 387line 362 didn't jump to line 387, because the loop on line 362 didn't complete
363 if not i:
364 break
365 s = i.split()
366 section = priority = ""
367 try:
368 if includes_section: 368 ↛ 369line 368 didn't jump to line 369, because the condition on line 368 was never true
369 (md5, size, section, priority, name) = s
370 else:
371 (md5, size, name) = s
372 except ValueError:
373 raise ParseChangesError(i)
375 if section == "": 375 ↛ 377line 375 didn't jump to line 377, because the condition on line 375 was never false
376 section = "-"
377 if priority == "": 377 ↛ 380line 377 didn't jump to line 380, because the condition on line 377 was never false
378 priority = "-"
380 (section, component) = extract_component_from_section(section)
382 files[name] = dict(
383 size=size, section=section, priority=priority, component=component
384 )
385 files[name][hashname] = md5
387 return files
390################################################################################
393def send_mail(message: str, whitelists: Optional[list[str]] = None) -> None:
394 """sendmail wrapper, takes a message string
396 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists
397 everything, otherwise an address is whitelisted if it is
398 included in any of the lists.
399 In addition a global whitelist can be specified in
400 Dinstall::MailWhiteList.
401 """
403 msg = daklib.mail.parse_mail(message)
405 # The incoming message might be UTF-8, but outgoing mail should
406 # use a legacy-compatible encoding. Set the content to the
407 # text to make sure this is the case.
408 # Note that this does not work with multipart messages.
409 msg.set_content(msg.get_payload(), cte="quoted-printable")
411 # Check whether we're supposed to be sending mail
412 call_sendmail = True
413 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]:
414 call_sendmail = False
416 if whitelists is None or None in whitelists:
417 whitelists = []
418 if Cnf.get("Dinstall::MailWhiteList", ""): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true
419 whitelists.append(Cnf["Dinstall::MailWhiteList"])
420 if len(whitelists) != 0: 420 ↛ 421line 420 didn't jump to line 421, because the condition on line 420 was never true
421 whitelist = []
422 for path in whitelists:
423 with open(path, "r") as whitelist_in:
424 for line in whitelist_in:
425 if not re_whitespace_comment.match(line):
426 if re_re_mark.match(line):
427 whitelist.append(
428 re.compile(re_re_mark.sub("", line.strip(), 1))
429 )
430 else:
431 whitelist.append(re.compile(re.escape(line.strip())))
433 # Fields to check.
434 fields = ["To", "Bcc", "Cc"]
435 for field in fields:
436 # Check each field
437 value = msg.get(field, None)
438 if value is not None:
439 match = []
440 for item in value.split(","):
441 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer(
442 item.strip()
443 )
444 mail_whitelisted = 0
445 for wr in whitelist:
446 if wr.match(mail):
447 mail_whitelisted = 1
448 break
449 if not mail_whitelisted:
450 print("Skipping {0} since it's not whitelisted".format(item))
451 continue
452 match.append(item)
454 # Doesn't have any mail in whitelist so remove the header
455 if len(match) == 0:
456 del msg[field]
457 else:
458 msg.replace_header(field, ", ".join(match))
460 # Change message fields in order if we don't have a To header
461 if "To" not in msg:
462 fields.reverse()
463 for field in fields:
464 if field in msg:
465 msg[fields[-1]] = msg[field]
466 del msg[field]
467 break
468 else:
469 # return, as we removed all recipients.
470 call_sendmail = False
472 # sign mail
473 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 473 ↛ 482line 473 didn't jump to line 482, because the condition on line 473 was never false
474 kwargs = {
475 "keyids": [mailkey],
476 "pubring": Cnf.get("Dinstall::SigningPubKeyring") or None,
477 "homedir": Cnf.get("Dinstall::SigningHomedir") or None,
478 "passphrase_file": Cnf.get("Dinstall::SigningPassphraseFile") or None,
479 }
480 msg = daklib.mail.sign_mail(msg, **kwargs)
482 msg_bytes = msg.as_bytes(policy=email.policy.default)
484 maildir = Cnf.get("Dir::Mail")
485 if maildir: 485 ↛ 492line 485 didn't jump to line 492, because the condition on line 485 was never false
486 path = os.path.join(maildir, datetime.datetime.now().isoformat())
487 path = find_next_free(path)
488 with open(path, "wb") as fh:
489 fh.write(msg_bytes)
491 # Invoke sendmail
492 if not call_sendmail:
493 return
494 try:
495 subprocess.run(
496 Cnf["Dinstall::SendmailCommand"].split(),
497 input=msg_bytes,
498 check=True,
499 stdout=subprocess.PIPE,
500 stderr=subprocess.STDOUT,
501 )
502 except subprocess.CalledProcessError as e:
503 raise SendmailFailedError(e.output.decode().rstrip())
506################################################################################
509def poolify(source: str) -> str:
510 """convert `source` name into directory path used in pool structure"""
511 if source[:3] == "lib": 511 ↛ 512line 511 didn't jump to line 512, because the condition on line 511 was never true
512 return source[:4] + "/" + source + "/"
513 else:
514 return source[:1] + "/" + source + "/"
517################################################################################
520def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None:
521 if os.path.exists(dest) and os.path.isdir(dest):
522 dest_dir = dest
523 else:
524 dest_dir = os.path.dirname(dest)
525 if not os.path.lexists(dest_dir):
526 umask = os.umask(00000)
527 os.makedirs(dest_dir, 0o2775)
528 os.umask(umask)
529 # print "Moving %s to %s..." % (src, dest)
530 if os.path.exists(dest) and os.path.isdir(dest):
531 dest += "/" + os.path.basename(src)
532 # Don't overwrite unless forced to
533 if os.path.lexists(dest):
534 if not overwrite:
535 fubar("Can't move %s to %s - file already exists." % (src, dest))
536 else:
537 if not os.access(dest, os.W_OK):
538 fubar(
539 "Can't move %s to %s - can't write to existing file." % (src, dest)
540 )
541 shutil.copy2(src, dest)
542 os.chmod(dest, perms)
543 os.unlink(src)
546################################################################################
549def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str:
550 """Perform a substition of template"""
551 with open(filename) as templatefile:
552 template = templatefile.read()
553 for k, v in subst_map.items():
554 template = template.replace(k, str(v))
555 return template
558################################################################################
561def fubar(msg: str, exit_code: int = 1) -> NoReturn:
562 """print error message and exit program"""
563 print("E:", msg, file=sys.stderr)
564 sys.exit(exit_code)
567def warn(msg: str) -> None:
568 """print warning message"""
569 print("W:", msg, file=sys.stderr)
572################################################################################
575def whoami() -> str:
576 """get user name
578 Returns the user name with a laughable attempt at rfc822 conformancy
579 (read: removing stray periods).
580 """
581 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "")
584def getusername() -> str:
585 """get login name"""
586 return pwd.getpwuid(os.getuid())[0]
589################################################################################
592def size_type(c: Union[int, float]) -> str:
593 t = " B"
594 if c > 10240:
595 c = c / 1024
596 t = " KB"
597 if c > 10240: 597 ↛ 598line 597 didn't jump to line 598, because the condition on line 597 was never true
598 c = c / 1024
599 t = " MB"
600 return "%d%s" % (c, t)
603################################################################################
606def find_next_free(dest: str, too_many: int = 100) -> str:
607 extra = 0
608 orig_dest = dest
609 while os.path.lexists(dest) and extra < too_many:
610 dest = orig_dest + "." + repr(extra)
611 extra += 1
612 if extra >= too_many: 612 ↛ 613line 612 didn't jump to line 613, because the condition on line 612 was never true
613 raise NoFreeFilenameError
614 return dest
617################################################################################
620def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str:
621 return sep.join(x if x is not None else "" for x in original)
624################################################################################
627def prefix_multi_line_string(
628 lines: str, prefix: str, include_blank_lines: bool = False
629) -> str:
630 """prepend `prefix` to each line in `lines`"""
631 return "\n".join(
632 prefix + cleaned_line
633 for line in lines.split("\n")
634 if (cleaned_line := line.strip()) or include_blank_lines
635 )
638################################################################################
641def join_with_commas_and(list: Sequence[str]) -> str:
642 if len(list) == 0: 642 ↛ 643line 642 didn't jump to line 643, because the condition on line 642 was never true
643 return "nothing"
644 if len(list) == 1: 644 ↛ 646line 644 didn't jump to line 646, because the condition on line 644 was never false
645 return list[0]
646 return ", ".join(list[:-1]) + " and " + list[-1]
649################################################################################
652def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str:
653 pp_deps = (
654 f"{pkg} ({constraint} {version})" if constraint else pkg
655 for pkg, constraint, version in deps
656 )
657 return " |".join(pp_deps)
660################################################################################
663def get_conf():
664 return Cnf
667################################################################################
670def parse_args(Options) -> tuple[str, str, str, bool]:
671 """Handle -a, -c and -s arguments; returns them as SQL constraints"""
672 # XXX: This should go away and everything which calls it be converted
673 # to use SQLA properly. For now, we'll just fix it not to use
674 # the old Pg interface though
675 session = DBConn().session()
676 # Process suite
677 if Options["Suite"]: 677 ↛ 695line 677 didn't jump to line 695, because the condition on line 677 was never false
678 suite_ids_list = []
679 for suitename in split_args(Options["Suite"]):
680 suite = get_suite(suitename, session=session)
681 if not suite or suite.suite_id is None: 681 ↛ 682line 681 didn't jump to line 682, because the condition on line 681 was never true
682 warn(
683 "suite '%s' not recognised."
684 % (suite and suite.suite_name or suitename)
685 )
686 else:
687 suite_ids_list.append(suite.suite_id)
688 if suite_ids_list: 688 ↛ 693line 688 didn't jump to line 693, because the condition on line 688 was never false
689 con_suites = "AND su.id IN (%s)" % ", ".join(
690 [str(i) for i in suite_ids_list]
691 )
692 else:
693 fubar("No valid suite given.")
694 else:
695 con_suites = ""
697 # Process component
698 if Options["Component"]: 698 ↛ 699line 698 didn't jump to line 699, because the condition on line 698 was never true
699 component_ids_list = []
700 for componentname in split_args(Options["Component"]):
701 component = get_component(componentname, session=session)
702 if component is None:
703 warn("component '%s' not recognised." % (componentname))
704 else:
705 component_ids_list.append(component.component_id)
706 if component_ids_list:
707 con_components = "AND c.id IN (%s)" % ", ".join(
708 [str(i) for i in component_ids_list]
709 )
710 else:
711 fubar("No valid component given.")
712 else:
713 con_components = ""
715 # Process architecture
716 con_architectures = ""
717 check_source = False
718 if Options["Architecture"]: 718 ↛ 719line 718 didn't jump to line 719, because the condition on line 718 was never true
719 arch_ids_list = []
720 for archname in split_args(Options["Architecture"]):
721 if archname == "source":
722 check_source = True
723 else:
724 arch = get_architecture(archname, session=session)
725 if arch is None:
726 warn("architecture '%s' not recognised." % (archname))
727 else:
728 arch_ids_list.append(arch.arch_id)
729 if arch_ids_list:
730 con_architectures = "AND a.id IN (%s)" % ", ".join(
731 [str(i) for i in arch_ids_list]
732 )
733 else:
734 if not check_source:
735 fubar("No valid architecture given.")
736 else:
737 check_source = True
739 return (con_suites, con_architectures, con_components, check_source)
742################################################################################
745@functools.total_ordering
746class ArchKey:
747 """
748 Key object for use in sorting lists of architectures.
750 Sorts normally except that 'source' dominates all others.
751 """
753 __slots__ = ["arch", "issource"]
755 def __init__(self, arch, *args):
756 self.arch = arch
757 self.issource = arch == "source"
759 def __lt__(self, other: "ArchKey") -> bool:
760 if self.issource:
761 return not other.issource
762 if other.issource:
763 return False
764 return self.arch < other.arch
766 def __eq__(self, other: object) -> bool:
767 if not isinstance(other, ArchKey): 767 ↛ 768line 767 didn't jump to line 768, because the condition on line 767 was never true
768 return NotImplemented
769 return self.arch == other.arch
772################################################################################
775def split_args(s: str, dwim: bool = True) -> list[str]:
776 """
777 Split command line arguments which can be separated by either commas
778 or whitespace. If dwim is set, it will complain about string ending
779 in comma since this usually means someone did 'dak ls -a i386, m68k
780 foo' or something and the inevitable confusion resulting from 'm68k'
781 being treated as an argument is undesirable.
782 """
784 if s.find(",") == -1: 784 ↛ 787line 784 didn't jump to line 787, because the condition on line 784 was never false
785 return s.split()
786 else:
787 if s[-1:] == "," and dwim:
788 fubar("split_args: found trailing comma, spurious space maybe?")
789 return s.split(",")
792################################################################################
795def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]:
796 if keyrings is None: 796 ↛ 799line 796 didn't jump to line 799, because the condition on line 796 was never false
797 keyrings = get_active_keyring_paths()
799 return ["--keyring={}".format(path) for path in keyrings]
802################################################################################
805def _gpg_get_addresses_from_listing(output: bytes) -> list[str]:
806 addresses: list[str] = []
808 for line in output.split(b"\n"):
809 parts = line.split(b":")
810 if parts[0] not in (b"uid", b"pub"):
811 continue
812 if parts[1] in (b"i", b"d", b"r"): 812 ↛ 814line 812 didn't jump to line 814, because the condition on line 812 was never true
813 # Skip uid that is invalid, disabled or revoked
814 continue
815 try:
816 uid_bytes = parts[9]
817 except IndexError:
818 continue
819 try:
820 uid = uid_bytes.decode(encoding="utf-8")
821 except UnicodeDecodeError:
822 # If the uid is not valid UTF-8, we assume it is an old uid
823 # still encoding in Latin-1.
824 uid = uid_bytes.decode(encoding="latin1")
825 m = re_parse_maintainer.match(uid)
826 if not m:
827 continue
828 address = m.group(2)
829 if address.endswith("@debian.org"): 829 ↛ 832line 829 didn't jump to line 832, because the condition on line 829 was never true
830 # prefer @debian.org addresses
831 # TODO: maybe not hardcode the domain
832 addresses.insert(0, address)
833 else:
834 addresses.append(address)
836 return addresses
839def gpg_get_key_addresses(fingerprint: str) -> list[str]:
840 """retreive email addresses from gpg key uids for a given fingerprint"""
841 addresses = key_uid_email_cache.get(fingerprint)
842 if addresses is not None:
843 return addresses
845 try:
846 cmd = ["gpg", "--no-default-keyring"]
847 cmd.extend(gpg_keyring_args())
848 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint])
849 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
850 except subprocess.CalledProcessError:
851 addresses = []
852 else:
853 addresses = _gpg_get_addresses_from_listing(output)
855 key_uid_email_cache[fingerprint] = addresses
856 return addresses
859################################################################################
862def open_ldap_connection():
863 """open connection to the configured LDAP server"""
864 import ldap # type: ignore
866 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"]
867 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile")
869 conn = ldap.initialize(LDAPServer)
871 if ca_cert_file:
872 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
873 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file)
874 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True)
875 conn.start_tls_s()
877 conn.simple_bind_s("", "")
879 return conn
882################################################################################
885def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]:
886 """retrieve login from LDAP linked to a given fingerprint"""
887 import ldap
889 conn = open_ldap_connection()
890 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
891 Attrs = conn.search_s(
892 LDAPDn,
893 ldap.SCOPE_ONELEVEL,
894 "(keyfingerprint=%s)" % fingerprint,
895 ["uid", "keyfingerprint"],
896 )
897 login: dict[str, str] = {}
898 for elem in Attrs:
899 fpr = elem[1]["keyFingerPrint"][0].decode()
900 uid = elem[1]["uid"][0].decode()
901 login[fpr] = uid
902 return login
905################################################################################
908def get_users_from_ldap() -> dict[str, str]:
909 """retrieve login and user names from LDAP"""
910 import ldap
912 conn = open_ldap_connection()
913 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
914 Attrs = conn.search_s(
915 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"]
916 )
917 users: dict[str, str] = {}
918 for elem in Attrs:
919 elem = elem[1]
920 name = []
921 for k in ("cn", "mn", "sn"):
922 try:
923 value = elem[k][0].decode()
924 if value and value[0] != "-":
925 name.append(value)
926 except KeyError:
927 pass
928 users[" ".join(name)] = elem["uid"][0]
929 return users
932################################################################################
935def clean_symlink(src: str, dest: str, root: str) -> str:
936 """
937 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'.
938 Returns fixed 'src'
939 """
940 src = src.replace(root, "", 1)
941 dest = dest.replace(root, "", 1)
942 dest = os.path.dirname(dest)
943 new_src = "../" * len(dest.split("/"))
944 return new_src + src
947################################################################################
950def temp_dirname(
951 parent: Optional[str] = None,
952 prefix: str = "dak",
953 suffix: str = "",
954 mode: Optional[int] = None,
955 group: Optional[str] = None,
956) -> str:
957 """
958 Return a secure and unique directory by pre-creating it.
960 :param parent: If non-null it will be the directory the directory is pre-created in.
961 :param prefix: The filename will be prefixed with this string
962 :param suffix: The filename will end with this string
963 :param mode: If set the file will get chmodded to those permissions
964 :param group: If set the file will get chgrped to the specified group.
965 :return: Returns a pair (fd, name)
967 """
969 tfname = tempfile.mkdtemp(suffix, prefix, parent)
970 if mode is not None: 970 ↛ 972line 970 didn't jump to line 972, because the condition on line 970 was never false
971 os.chmod(tfname, mode)
972 if group is not None: 972 ↛ 973line 972 didn't jump to line 973, because the condition on line 972 was never true
973 gid = grp.getgrnam(group).gr_gid
974 os.chown(tfname, -1, gid)
975 return tfname
978################################################################################
981def get_changes_files(from_dir: str) -> list[str]:
982 """
983 Takes a directory and lists all .changes files in it (as well as chdir'ing
984 to the directory; this is due to broken behaviour on the part of p-u/p-a
985 when you're not in the right place)
987 Returns a list of filenames
988 """
989 try:
990 # Much of the rest of p-u/p-a depends on being in the right place
991 os.chdir(from_dir)
992 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")]
993 except OSError as e:
994 fubar("Failed to read list from directory %s (%s)" % (from_dir, e))
996 return changes_files
999################################################################################
1002Cnf = config.Config().Cnf
1004################################################################################
1007def parse_wnpp_bug_file(
1008 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm",
1009) -> dict[str, list[str]]:
1010 """
1011 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm
1012 Well, actually it parsed a local copy, but let's document the source
1013 somewhere ;)
1015 returns a dict associating source package name with a list of open wnpp
1016 bugs (Yes, there might be more than one)
1017 """
1019 try:
1020 with open(file) as f:
1021 lines = f.readlines()
1022 except OSError:
1023 print(
1024 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any."
1025 % file
1026 )
1027 lines = []
1028 wnpp = {}
1030 for line in lines:
1031 splited_line = line.split(": ", 1)
1032 if len(splited_line) > 1:
1033 wnpp[splited_line[0]] = splited_line[1].split("|")
1035 for source in wnpp:
1036 bugs = []
1037 for wnpp_bug in wnpp[source]:
1038 bug_no = re.search(r"(\d)+", wnpp_bug).group()
1039 if bug_no:
1040 bugs.append(bug_no)
1041 wnpp[source] = bugs
1042 return wnpp
1045################################################################################
1048def deb_extract_control(path: str) -> bytes:
1049 """extract DEBIAN/control from a binary package"""
1050 return apt_inst.DebFile(path).control.extractdata("control")
1053################################################################################
1056def mail_addresses_for_upload(
1057 maintainer: str,
1058 changed_by: str,
1059 fingerprint: str,
1060 authorized_by_fingerprint: Optional[str],
1061) -> list[str]:
1062 """mail addresses to contact for an upload
1064 :param maintainer: Maintainer field of the .changes file
1065 :param changed_by: Changed-By field of the .changes file
1066 :param fingerprint: fingerprint of the key used to sign the upload
1067 :return: list of RFC 2047-encoded mail addresses to contact regarding
1068 this upload
1069 """
1070 recipients = Cnf.value_list("Dinstall::UploadMailRecipients")
1071 if not recipients: 1071 ↛ 1080line 1071 didn't jump to line 1080, because the condition on line 1071 was never false
1072 recipients = [
1073 "maintainer",
1074 "changed_by",
1075 "signer",
1076 "authorized_by",
1077 ]
1079 # Ensure signer and authorized_by are last if present
1080 for r in ("signer", "authorized_by"):
1081 try:
1082 recipients.remove(r)
1083 except ValueError:
1084 pass
1085 else:
1086 recipients.append(r)
1088 # Compute the set of addresses of the recipients
1089 addresses = set() # Name + email
1090 emails = set() # Email only, used to avoid duplicates
1091 for recipient in recipients:
1092 if recipient.startswith("mail:"): # Email hardcoded in config 1092 ↛ 1093line 1092 didn't jump to line 1093, because the condition on line 1092 was never true
1093 address = recipient[5:]
1094 elif recipient == "maintainer":
1095 address = maintainer
1096 elif recipient == "changed_by":
1097 address = changed_by
1098 elif recipient == "signer" or recipient == "authorized_by": 1098 ↛ 1108line 1098 didn't jump to line 1108, because the condition on line 1098 was never false
1099 fpr = fingerprint if recipient == "signer" else authorized_by_fingerprint
1100 if not fpr: 1100 ↛ 1101line 1100 didn't jump to line 1101, because the condition on line 1100 was never true
1101 continue
1102 fpr_addresses = gpg_get_key_addresses(fpr)
1103 address = fpr_addresses[0] if fpr_addresses else None
1104 if any(x in emails for x in fpr_addresses):
1105 # The signer already gets a copy via another email
1106 address = None
1107 else:
1108 raise Exception(
1109 "Unsupported entry in {0}: {1}".format(
1110 "Dinstall::UploadMailRecipients", recipient
1111 )
1112 )
1114 if address is not None:
1115 mail = fix_maintainer(address)[3]
1116 if mail not in emails:
1117 addresses.add(address)
1118 emails.add(mail)
1120 encoded_addresses = [fix_maintainer(e)[1] for e in addresses]
1121 return encoded_addresses
1124################################################################################
1127def call_editor_for_file(path: str) -> None:
1128 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor"))
1129 subprocess.check_call([editor, path])
1132################################################################################
1135def call_editor(text: str = "", suffix: str = ".txt") -> str:
1136 """run editor and return the result as a string
1138 :param text: initial text
1139 :param suffix: extension for temporary file
1140 :return: string with the edited text
1141 """
1142 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh:
1143 print(text, end="", file=fh)
1144 fh.flush()
1145 call_editor_for_file(fh.name)
1146 fh.seek(0)
1147 return fh.read()
1150################################################################################
1153def check_reverse_depends(
1154 removals: Iterable[str],
1155 suite: str,
1156 arches: Optional[Iterable[Architecture]] = None,
1157 session=None,
1158 cruft: bool = False,
1159 quiet: bool = False,
1160 include_arch_all: bool = True,
1161) -> bool:
1162 dbsuite = get_suite(suite, session)
1163 overridesuite = dbsuite
1164 if dbsuite.overridesuite is not None: 1164 ↛ 1165line 1164 didn't jump to line 1165, because the condition on line 1164 was never true
1165 overridesuite = get_suite(dbsuite.overridesuite, session)
1166 dep_problem = False
1167 p2c = {}
1168 all_broken = defaultdict(lambda: defaultdict(set)) 1168 ↛ exitline 1168 didn't run the lambda on line 1168
1169 if arches: 1169 ↛ 1170line 1169 didn't jump to line 1170, because the condition on line 1169 was never true
1170 all_arches = set(arches)
1171 else:
1172 all_arches = set(x.arch_string for x in get_suite_architectures(suite))
1173 all_arches -= set(["source", "all"])
1174 removal_set = set(removals)
1175 metakey_d = get_or_set_metadatakey("Depends", session)
1176 metakey_p = get_or_set_metadatakey("Provides", session)
1177 params = {
1178 "suite_id": dbsuite.suite_id,
1179 "metakey_d_id": metakey_d.key_id,
1180 "metakey_p_id": metakey_p.key_id,
1181 }
1182 if include_arch_all: 1182 ↛ 1185line 1182 didn't jump to line 1185, because the condition on line 1182 was never false
1183 rdep_architectures = all_arches | set(["all"])
1184 else:
1185 rdep_architectures = all_arches
1186 for architecture in rdep_architectures:
1187 deps = {}
1188 sources = {}
1189 virtual_packages = {}
1190 try:
1191 params["arch_id"] = get_architecture(architecture, session).arch_id
1192 except AttributeError:
1193 continue
1195 statement = sql.text(
1196 """
1197 SELECT b.package, s.source, c.name as component,
1198 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends,
1199 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides
1200 FROM binaries b
1201 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id
1202 JOIN source s ON b.source = s.id
1203 JOIN files_archive_map af ON b.file = af.file_id
1204 JOIN component c ON af.component_id = c.id
1205 WHERE b.architecture = :arch_id"""
1206 )
1207 query = (
1208 session.query(
1209 sql.column("package"),
1210 sql.column("source"),
1211 sql.column("component"),
1212 sql.column("depends"),
1213 sql.column("provides"),
1214 )
1215 .from_statement(statement)
1216 .params(params)
1217 )
1218 for package, source, component, depends, provides in query:
1219 sources[package] = source
1220 p2c[package] = component
1221 if depends is not None: 1221 ↛ 1222line 1221 didn't jump to line 1222, because the condition on line 1221 was never true
1222 deps[package] = depends
1223 # Maintain a counter for each virtual package. If a
1224 # Provides: exists, set the counter to 0 and count all
1225 # provides by a package not in the list for removal.
1226 # If the counter stays 0 at the end, we know that only
1227 # the to-be-removed packages provided this virtual
1228 # package.
1229 if provides is not None: 1229 ↛ 1230line 1229 didn't jump to line 1230, because the condition on line 1229 was never true
1230 for virtual_pkg in provides.split(","):
1231 virtual_pkg = virtual_pkg.strip()
1232 if virtual_pkg == package:
1233 continue
1234 if virtual_pkg not in virtual_packages:
1235 virtual_packages[virtual_pkg] = 0
1236 if package not in removals:
1237 virtual_packages[virtual_pkg] += 1
1239 # If a virtual package is only provided by the to-be-removed
1240 # packages, treat the virtual package as to-be-removed too.
1241 removal_set.update(
1242 virtual_pkg
1243 for virtual_pkg in virtual_packages
1244 if not virtual_packages[virtual_pkg]
1245 )
1247 # Check binary dependencies (Depends)
1248 for package in deps: 1248 ↛ 1249line 1248 didn't jump to line 1249, because the loop on line 1248 never started
1249 if package in removals:
1250 continue
1251 try:
1252 parsed_dep = apt_pkg.parse_depends(deps[package])
1253 except ValueError as e:
1254 print("Error for package %s: %s" % (package, e))
1255 parsed_dep = []
1256 for dep in parsed_dep:
1257 # Check for partial breakage. If a package has a ORed
1258 # dependency, there is only a dependency problem if all
1259 # packages in the ORed depends will be removed.
1260 unsat = 0
1261 for dep_package, _, _ in dep:
1262 if dep_package in removals:
1263 unsat += 1
1264 if unsat == len(dep):
1265 component = p2c[package]
1266 source = sources[package]
1267 if component != "main":
1268 source = "%s/%s" % (source, component)
1269 all_broken[source][package].add(architecture)
1270 dep_problem = True
1272 if all_broken and not quiet: 1272 ↛ 1273line 1272 didn't jump to line 1273, because the condition on line 1272 was never true
1273 if cruft:
1274 print(" - broken Depends:")
1275 else:
1276 print("# Broken Depends:")
1277 for source, bindict in sorted(all_broken.items()):
1278 lines = []
1279 for binary, arches in sorted(bindict.items()):
1280 if arches == all_arches or "all" in arches:
1281 lines.append(binary)
1282 else:
1283 lines.append("%s [%s]" % (binary, " ".join(sorted(arches))))
1284 if cruft:
1285 print(" %s: %s" % (source, lines[0]))
1286 else:
1287 print("%s: %s" % (source, lines[0]))
1288 for line in lines[1:]:
1289 if cruft:
1290 print(" " + " " * (len(source) + 2) + line)
1291 else:
1292 print(" " * (len(source) + 2) + line)
1293 if not cruft:
1294 print()
1296 # Check source dependencies (Build-Depends and Build-Depends-Indep)
1297 all_broken = defaultdict(set)
1298 metakey_bd = get_or_set_metadatakey("Build-Depends", session)
1299 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session)
1300 if include_arch_all: 1300 ↛ 1303line 1300 didn't jump to line 1303, because the condition on line 1300 was never false
1301 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id)
1302 else:
1303 metakey_ids = (metakey_bd.key_id,)
1305 params = {
1306 "suite_id": dbsuite.suite_id,
1307 "metakey_ids": metakey_ids,
1308 }
1309 statement = sql.text(
1310 """
1311 SELECT s.source, string_agg(sm.value, ', ') as build_dep
1312 FROM source s
1313 JOIN source_metadata sm ON s.id = sm.src_id
1314 WHERE s.id in
1315 (SELECT src FROM newest_src_association
1316 WHERE suite = :suite_id)
1317 AND sm.key_id in :metakey_ids
1318 GROUP BY s.id, s.source"""
1319 )
1320 query = (
1321 session.query(sql.column("source"), sql.column("build_dep"))
1322 .from_statement(statement)
1323 .params(params)
1324 )
1325 for source, build_dep in query:
1326 if source in removals:
1327 continue
1328 parsed_dep = []
1329 if build_dep is not None: 1329 ↛ 1336line 1329 didn't jump to line 1336, because the condition on line 1329 was never false
1330 # Remove [arch] information since we want to see breakage on all arches
1331 build_dep = re_build_dep_arch.sub("", build_dep)
1332 try:
1333 parsed_dep = apt_pkg.parse_src_depends(build_dep)
1334 except ValueError as e:
1335 print("Error for source %s: %s" % (source, e))
1336 for dep in parsed_dep:
1337 unsat = 0
1338 for dep_package, _, _ in dep:
1339 if dep_package in removals: 1339 ↛ 1340line 1339 didn't jump to line 1340, because the condition on line 1339 was never true
1340 unsat += 1
1341 if unsat == len(dep): 1341 ↛ 1342line 1341 didn't jump to line 1342
1342 (component,) = (
1343 session.query(Component.component_name)
1344 .join(Component.overrides)
1345 .filter(Override.suite == overridesuite)
1346 .filter(
1347 Override.package
1348 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source)
1349 )
1350 .join(Override.overridetype)
1351 .filter(OverrideType.overridetype == "dsc")
1352 .first()
1353 )
1354 key = source
1355 if component != "main":
1356 key = "%s/%s" % (source, component)
1357 all_broken[key].add(pp_deps(dep))
1358 dep_problem = True
1360 if all_broken and not quiet: 1360 ↛ 1361line 1360 didn't jump to line 1361, because the condition on line 1360 was never true
1361 if cruft:
1362 print(" - broken Build-Depends:")
1363 else:
1364 print("# Broken Build-Depends:")
1365 for source, bdeps in sorted(all_broken.items()):
1366 bdeps = sorted(bdeps)
1367 if cruft:
1368 print(" %s: %s" % (source, bdeps[0]))
1369 else:
1370 print("%s: %s" % (source, bdeps[0]))
1371 for bdep in bdeps[1:]:
1372 if cruft:
1373 print(" " + " " * (len(source) + 2) + bdep)
1374 else:
1375 print(" " * (len(source) + 2) + bdep)
1376 if not cruft:
1377 print()
1379 return dep_problem
1382################################################################################
1385def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]:
1386 """source packages referenced via Built-Using
1388 :param control: control file to take Built-Using field from
1389 :return: list of (source_name, source_version) pairs
1390 """
1391 built_using = control.get("Built-Using", None)
1392 if built_using is None:
1393 return []
1395 bu = []
1396 for dep in apt_pkg.parse_depends(built_using):
1397 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field"
1398 source_name, source_version, comp = dep[0]
1399 assert comp == "=", "Built-Using must contain strict dependencies"
1400 bu.append((source_name, source_version))
1402 return bu
1405################################################################################
1408def is_in_debug_section(control: Mapping[str, str]) -> bool:
1409 """binary package is a debug package
1411 :param control: control file of binary package
1412 :return: True if the binary package is a debug package
1413 """
1414 section = control["Section"].split("/", 1)[-1]
1415 auto_built_package = control.get("Auto-Built-Package")
1416 return section == "debug" and auto_built_package == "debug-symbols"
1419################################################################################
1422def find_possibly_compressed_file(filename: str) -> str:
1423 """
1425 :param filename: path to a control file (Sources, Packages, etc) to
1426 look for
1427 :return: path to the (possibly compressed) control file, or null if the
1428 file doesn't exist
1429 """
1430 _compressions = ("", ".xz", ".gz", ".bz2")
1432 for ext in _compressions: 1432 ↛ 1437line 1432 didn't jump to line 1437, because the loop on line 1432 didn't complete
1433 _file = filename + ext
1434 if os.path.exists(_file):
1435 return _file
1437 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
1440################################################################################
1443def parse_boolean_from_user(value: str) -> bool:
1444 value = value.lower()
1445 if value in {"yes", "true", "enable", "enabled"}:
1446 return True
1447 if value in {"no", "false", "disable", "disabled"}: 1447 ↛ 1449line 1447 didn't jump to line 1449, because the condition on line 1447 was never false
1448 return False
1449 raise ValueError("Not sure whether %s should be a True or a False" % value)
1452def suite_suffix(suite_name: str) -> str:
1453 """Return suite_suffix for the given suite"""
1454 suffix = Cnf.find("Dinstall::SuiteSuffix", "")
1455 if suffix == "": 1455 ↛ 1457line 1455 didn't jump to line 1457, because the condition on line 1455 was never false
1456 return ""
1457 elif "Dinstall::SuiteSuffixSuites" not in Cnf:
1458 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future
1459 return suffix
1460 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"):
1461 return suffix
1462 return ""
1465################################################################################
1468def process_buildinfos(
1469 directory: str,
1470 buildinfo_files: "Iterable[daklib.upload.HashedFile]",
1471 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1472 logger: "daklib.daklog.Logger",
1473) -> None:
1474 """Copy buildinfo files into Dir::BuildinfoArchive
1476 :param directory: directory where .changes is stored
1477 :param buildinfo_files: names of buildinfo files
1478 :param fs_transaction: FilesystemTransaction instance
1479 :param logger: logger instance
1480 """
1482 if "Dir::BuildinfoArchive" not in Cnf:
1483 return
1485 target_dir = os.path.join(
1486 Cnf["Dir::BuildinfoArchive"],
1487 datetime.datetime.now().strftime("%Y/%m/%d"),
1488 )
1490 for f in buildinfo_files:
1491 src = os.path.join(directory, f.filename)
1492 dst = find_next_free(os.path.join(target_dir, f.filename))
1494 logger.log(["Archiving", f.filename])
1495 fs_transaction.copy(src, dst, mode=0o644)
1498################################################################################
1501def move_to_morgue(
1502 morguesubdir: str,
1503 filenames: Iterable[str],
1504 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1505 logger: "daklib.daklog.Logger",
1506):
1507 """Move a file to the correct dir in morgue
1509 :param morguesubdir: subdirectory of morgue where this file needs to go
1510 :param filenames: names of files
1511 :param fs_transaction: FilesystemTransaction instance
1512 :param logger: logger instance
1513 """
1515 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf.get("Dir::Base"), "morgue"))
1517 # Build directory as morguedir/morguesubdir/year/month/day
1518 now = datetime.datetime.now()
1519 dest = os.path.join(
1520 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day
1521 )
1523 for filename in filenames:
1524 dest_filename = dest + "/" + os.path.basename(filename)
1525 # If the destination file exists; try to find another filename to use
1526 if os.path.lexists(dest_filename): 1526 ↛ 1527line 1526 didn't jump to line 1527, because the condition on line 1526 was never true
1527 dest_filename = find_next_free(dest_filename)
1528 logger.log(["move to morgue", filename, dest_filename])
1529 fs_transaction.move(filename, dest_filename)