Coverage for daklib/utils.py: 57%
719 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1# vim:set et ts=4 sw=4:
3"""Utility functions
5@contact: Debian FTP Master <ftpmaster@debian.org>
6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
7@license: GNU General Public License version 2 or later
8"""
10# This program is free software; you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation; either version 2 of the License, or
13# (at your option) any later version.
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
20# You should have received a copy of the GNU General Public License
21# along with this program; if not, write to the Free Software
22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24import datetime
25import email.policy
26import errno
27import functools
28import grp
29import os
30import pwd
31import re
32import shutil
33import subprocess
34import sys
35import tempfile
36from collections import defaultdict
37from collections.abc import Collection, Iterable, Mapping, Sequence
38from typing import TYPE_CHECKING, Any, Literal, NoReturn, Optional, Union, override
40import apt_inst
41import apt_pkg
42import sqlalchemy.sql as sql
44import daklib.config as config
45import daklib.mail
46from daklib.dbconn import (
47 Component,
48 DBConn,
49 MetadataProxy,
50 Override,
51 OverrideType,
52 get_active_keyring_paths,
53 get_architecture,
54 get_component,
55 get_or_set_metadatakey,
56 get_suite,
57 get_suite_architectures,
58)
60from .dak_exceptions import (
61 InvalidDscError,
62 NoFilesFieldError,
63 NoFreeFilenameError,
64 ParseChangesError,
65 SendmailFailedError,
66 UnknownFormatError,
67)
68from .formats import parse_format, validate_changes_format
69from .gpg import SignedFile
70from .regexes import (
71 re_build_dep_arch,
72 re_issource,
73 re_multi_line_field,
74 re_parse_maintainer,
75 re_re_mark,
76 re_single_line_field,
77 re_srchasver,
78 re_whitespace_comment,
79)
80from .srcformats import get_format_from_string
81from .textutils import fix_maintainer
83if TYPE_CHECKING:
84 from sqlalchemy.orm import Session
86 import daklib.daklog
87 import daklib.fstransactions
88 import daklib.upload
90################################################################################
92key_uid_email_cache: dict[str, list[str]] = (
93 {}
94) #: Cache for email addresses from gpg key uids
96################################################################################
99def input_or_exit(prompt: Optional[str] = None) -> str:
100 try:
101 return input(prompt)
102 except EOFError:
103 sys.exit("\nUser interrupt (^D).")
106################################################################################
109def extract_component_from_section(section: str) -> tuple[str, str]:
110 """split "section" into "section", "component" parts
112 If "component" is not given, "main" is used instead.
114 :return: tuple (section, component)
115 """
116 if section.find("/") != -1:
117 return section, section.split("/", 1)[0]
118 return section, "main"
121################################################################################
124def parse_deb822(
125 armored_contents: bytes,
126 signing_rules: Literal[-1, 0, 1] = 0,
127 keyrings: Collection[str] | None = None,
128) -> dict[str, str]:
129 require_signature = True
130 if keyrings is None: 130 ↛ 134line 130 didn't jump to line 134 because the condition on line 130 was always true
131 keyrings = []
132 require_signature = False
134 signed_file = SignedFile(
135 armored_contents, keyrings=keyrings, require_signature=require_signature
136 )
137 contents = signed_file.contents.decode("utf-8")
139 error = ""
140 changes = {}
142 # Split the lines in the input, keeping the linebreaks.
143 lines = contents.splitlines(True)
145 if len(lines) == 0:
146 raise ParseChangesError("[Empty changes file]")
148 # Reindex by line number so we can easily verify the format of
149 # .dsc files...
150 index = 0
151 indexed_lines = {}
152 for line in lines:
153 index += 1
154 indexed_lines[index] = line[:-1]
156 num_of_lines = len(indexed_lines)
157 index = 0
158 first = -1
159 while index < num_of_lines:
160 index += 1
161 line = indexed_lines[index]
162 if line == "" and signing_rules == 1: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 if index != num_of_lines:
164 raise InvalidDscError(index)
165 break
166 if slf := re_single_line_field.match(line):
167 field = slf.groups()[0].lower()
168 changes[field] = slf.groups()[1]
169 first = 1
170 continue
171 if line == " .":
172 changes[field] += "\n"
173 continue
174 if mlf := re_multi_line_field.match(line):
175 if first == -1: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 raise ParseChangesError(
177 "'%s'\n [Multi-line field continuing on from nothing?]" % (line)
178 )
179 if first == 1 and changes[field] != "":
180 changes[field] += "\n"
181 first = 0
182 changes[field] += mlf.groups()[0] + "\n"
183 continue
184 error += line
186 changes["filecontents"] = armored_contents.decode()
188 if "source" in changes:
189 # Strip the source version in brackets from the source field,
190 # put it in the "source-version" field instead.
191 if srcver := re_srchasver.search(changes["source"]): 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 changes["source"] = srcver.group(1)
193 changes["source-version"] = srcver.group(2)
195 if error: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 raise ParseChangesError(error)
198 return changes
201################################################################################
204def parse_changes(
205 filename: str,
206 signing_rules: Literal[-1, 0, 1] = 0,
207 dsc_file: bool = False,
208 keyrings: Collection[str] | None = None,
209) -> dict[str, str]:
210 """
211 Parses a changes or source control (.dsc) file and returns a dictionary
212 where each field is a key. The mandatory first argument is the
213 filename of the .changes file.
215 signing_rules is an optional argument:
217 - If signing_rules == -1, no signature is required.
218 - If signing_rules == 0 (the default), a signature is required.
219 - If signing_rules == 1, it turns on the same strict format checking
220 as dpkg-source.
222 The rules for (signing_rules == 1)-mode are:
224 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----"
225 followed by any PGP header data and must end with a blank line.
227 - The data section must end with a blank line and must be followed by
228 "-----BEGIN PGP SIGNATURE-----".
230 :param dsc_file: `filename` is a Debian source control (.dsc) file
231 """
233 with open(filename, "rb") as changes_in:
234 content = changes_in.read()
235 changes = parse_deb822(content, signing_rules, keyrings=keyrings)
237 if not dsc_file:
238 # Finally ensure that everything needed for .changes is there
239 must_keywords = (
240 "Format",
241 "Date",
242 "Source",
243 "Architecture",
244 "Version",
245 "Distribution",
246 "Maintainer",
247 "Changes",
248 "Files",
249 )
251 missingfields = []
252 for keyword in must_keywords:
253 if keyword.lower() not in changes: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 missingfields.append(keyword)
256 if len(missingfields):
257 raise ParseChangesError(
258 "Missing mandatory field(s) in changes file (policy 5.5): %s"
259 % (missingfields)
260 )
262 return changes
265################################################################################
268def check_dsc_files(
269 dsc_filename: str,
270 dsc: Mapping[str, str],
271 dsc_files: Iterable[str],
272) -> list[str]:
273 """
274 Verify that the files listed in the Files field of the .dsc are
275 those expected given the announced Format.
277 :param dsc_filename: path of .dsc file
278 :param dsc: the content of the .dsc parsed by :func:`parse_changes`
279 :param dsc_files: the file list returned by :func:`build_file_list`
280 :return: all errors detected
281 """
282 rejmsg = []
284 # Ensure .dsc lists proper set of source files according to the format
285 # announced
286 has: defaultdict[str, int] = defaultdict(lambda: 0)
288 ftype_lookup = (
289 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)),
290 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")),
291 (r"diff\.gz", ("debian_diff",)),
292 (r"tar\.gz", ("native_tar_gz", "native_tar")),
293 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)),
294 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)),
295 (r"tar\.(gz|bz2|xz)", ("native_tar",)),
296 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)),
297 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)),
298 )
300 for f in dsc_files:
301 m = re_issource.match(f)
302 if not m: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 rejmsg.append(
304 "%s: %s in Files field not recognised as source." % (dsc_filename, f)
305 )
306 continue
308 # Populate 'has' dictionary by resolving keys in lookup table
309 matched = False
310 for regex, keys in ftype_lookup: 310 ↛ 318line 310 didn't jump to line 318 because the loop on line 310 didn't complete
311 if re.match(regex, m.group(3)):
312 matched = True
313 for key in keys:
314 has[key] += 1
315 break
317 # File does not match anything in lookup table; reject
318 if not matched: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f))
320 break
322 # Check for multiple files
323 for file_type in (
324 "orig_tar",
325 "orig_tar_sig",
326 "native_tar",
327 "debian_tar",
328 "debian_diff",
329 ):
330 if has[file_type] > 1: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type))
333 # Source format specific tests
334 try:
335 format = get_format_from_string(dsc["format"])
336 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)])
338 except UnknownFormatError:
339 # Not an error here for now
340 pass
342 return rejmsg
345################################################################################
347# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl
350def build_file_list(
351 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum"
352) -> dict[str, dict[str, str]]:
353 files = {}
355 # Make sure we have a Files: field to parse...
356 if field not in changes: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true
357 raise NoFilesFieldError
359 # Validate .changes Format: field
360 if not is_a_dsc: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true
361 validate_changes_format(parse_format(changes["format"]), field)
363 includes_section = (not is_a_dsc) and field == "files"
365 # Parse each entry/line:
366 for i in changes[field].split("\n"): 366 ↛ 391line 366 didn't jump to line 391 because the loop on line 366 didn't complete
367 if not i:
368 break
369 s = i.split()
370 section = priority = ""
371 try:
372 if includes_section: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 (md5, size, section, priority, name) = s
374 else:
375 (md5, size, name) = s
376 except ValueError:
377 raise ParseChangesError(i)
379 if section == "": 379 ↛ 381line 379 didn't jump to line 381 because the condition on line 379 was always true
380 section = "-"
381 if priority == "": 381 ↛ 384line 381 didn't jump to line 384 because the condition on line 381 was always true
382 priority = "-"
384 (section, component) = extract_component_from_section(section)
386 files[name] = dict(
387 size=size, section=section, priority=priority, component=component
388 )
389 files[name][hashname] = md5
391 return files
394################################################################################
397def send_mail(message: str, whitelists: Optional[list[str | None]] = None) -> None:
398 """sendmail wrapper, takes a message string
400 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists
401 everything, otherwise an address is whitelisted if it is
402 included in any of the lists.
403 In addition a global whitelist can be specified in
404 Dinstall::MailWhiteList.
405 """
407 msg = daklib.mail.parse_mail(message)
409 # The incoming message might be UTF-8, but outgoing mail should
410 # use a legacy-compatible encoding. Set the content to the
411 # text to make sure this is the case.
412 # Note that this does not work with multipart messages.
413 msg.set_content(msg.get_payload(), cte="quoted-printable")
415 # Check whether we're supposed to be sending mail
416 call_sendmail = True
417 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]:
418 call_sendmail = False
420 if whitelists is None or None in whitelists:
421 whitelists = []
422 if Cnf.get("Dinstall::MailWhiteList", ""): 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 whitelists.append(Cnf["Dinstall::MailWhiteList"])
424 if len(whitelists) != 0: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true
425 whitelist = []
426 for path in whitelists:
427 assert path is not None
428 with open(path, "r") as whitelist_in:
429 for line in whitelist_in:
430 if not re_whitespace_comment.match(line):
431 if re_re_mark.match(line):
432 whitelist.append(
433 re.compile(re_re_mark.sub("", line.strip(), 1))
434 )
435 else:
436 whitelist.append(re.compile(re.escape(line.strip())))
438 # Fields to check.
439 fields = ["To", "Bcc", "Cc"]
440 for field in fields:
441 # Check each field
442 value = msg.get(field, None)
443 if value is not None:
444 match = []
445 for item in value.split(","):
446 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer(
447 item.strip()
448 )
449 mail_whitelisted = 0
450 for wr in whitelist:
451 if wr.match(mail):
452 mail_whitelisted = 1
453 break
454 if not mail_whitelisted:
455 print("Skipping {0} since it's not whitelisted".format(item))
456 continue
457 match.append(item)
459 # Doesn't have any mail in whitelist so remove the header
460 if len(match) == 0:
461 del msg[field]
462 else:
463 msg.replace_header(field, ", ".join(match))
465 # Change message fields in order if we don't have a To header
466 if "To" not in msg:
467 fields.reverse()
468 for field in fields:
469 if field in msg:
470 msg[fields[-1]] = msg[field]
471 del msg[field]
472 break
473 else:
474 # return, as we removed all recipients.
475 call_sendmail = False
477 # sign mail
478 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 478 ↛ 487line 478 didn't jump to line 487 because the condition on line 478 was always true
479 msg = daklib.mail.sign_mail(
480 msg,
481 keyids=[mailkey],
482 pubring=Cnf.get("Dinstall::SigningPubKeyring") or None,
483 homedir=Cnf.get("Dinstall::SigningHomedir") or None,
484 passphrase_file=Cnf.get("Dinstall::SigningPassphraseFile") or None,
485 )
487 msg_bytes = msg.as_bytes(policy=email.policy.default)
489 maildir = Cnf.get("Dir::Mail")
490 if maildir: 490 ↛ 497line 490 didn't jump to line 497 because the condition on line 490 was always true
491 path = os.path.join(maildir, datetime.datetime.now().isoformat())
492 path = find_next_free(path)
493 with open(path, "wb") as fh:
494 fh.write(msg_bytes)
496 # Invoke sendmail
497 if not call_sendmail:
498 return
499 try:
500 subprocess.run(
501 Cnf["Dinstall::SendmailCommand"].split(),
502 input=msg_bytes,
503 check=True,
504 stdout=subprocess.PIPE,
505 stderr=subprocess.STDOUT,
506 )
507 except subprocess.CalledProcessError as e:
508 raise SendmailFailedError(e.output.decode().rstrip())
511################################################################################
514def poolify(source: str) -> str:
515 """convert `source` name into directory path used in pool structure"""
516 if source[:3] == "lib": 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 return source[:4] + "/" + source + "/"
518 else:
519 return source[:1] + "/" + source + "/"
522################################################################################
525def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None:
526 if os.path.exists(dest) and os.path.isdir(dest):
527 dest_dir = dest
528 else:
529 dest_dir = os.path.dirname(dest)
530 if not os.path.lexists(dest_dir):
531 umask = os.umask(00000)
532 os.makedirs(dest_dir, 0o2775)
533 os.umask(umask)
534 # print "Moving %s to %s..." % (src, dest)
535 if os.path.exists(dest) and os.path.isdir(dest):
536 dest += "/" + os.path.basename(src)
537 # Don't overwrite unless forced to
538 if os.path.lexists(dest):
539 if not overwrite:
540 fubar("Can't move %s to %s - file already exists." % (src, dest))
541 else:
542 if not os.access(dest, os.W_OK):
543 fubar(
544 "Can't move %s to %s - can't write to existing file." % (src, dest)
545 )
546 shutil.copy2(src, dest)
547 os.chmod(dest, perms)
548 os.unlink(src)
551################################################################################
554def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str:
555 """Perform a substition of template"""
556 with open(filename) as templatefile:
557 template = templatefile.read()
558 for k, v in subst_map.items():
559 template = template.replace(k, str(v))
560 return template
563################################################################################
566def fubar(msg: str, exit_code: int = 1) -> NoReturn:
567 """print error message and exit program"""
568 print("E:", msg, file=sys.stderr)
569 sys.exit(exit_code)
572def warn(msg: str) -> None:
573 """print warning message"""
574 print("W:", msg, file=sys.stderr)
577################################################################################
580def whoami() -> str:
581 """get user name
583 Returns the user name with a laughable attempt at rfc822 conformancy
584 (read: removing stray periods).
585 """
586 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "")
589def getusername() -> str:
590 """get login name"""
591 return pwd.getpwuid(os.getuid())[0]
594################################################################################
597def size_type(c: Union[int, float]) -> str:
598 t = " B"
599 if c > 10240:
600 c = c / 1024
601 t = " KB"
602 if c > 10240: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 c = c / 1024
604 t = " MB"
605 return "%d%s" % (c, t)
608################################################################################
611def find_next_free(dest: str, too_many: int = 100) -> str:
612 extra = 0
613 orig_dest = dest
614 while os.path.lexists(dest) and extra < too_many:
615 dest = orig_dest + "." + repr(extra)
616 extra += 1
617 if extra >= too_many: 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true
618 raise NoFreeFilenameError
619 return dest
622################################################################################
625def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str:
626 return sep.join(x if x is not None else "" for x in original)
629################################################################################
632def prefix_multi_line_string(
633 lines: str, prefix: str, include_blank_lines: bool = False
634) -> str:
635 """prepend `prefix` to each line in `lines`"""
636 return "\n".join(
637 prefix + cleaned_line
638 for line in lines.split("\n")
639 if (cleaned_line := line.strip()) or include_blank_lines
640 )
643################################################################################
646def join_with_commas_and(list: Sequence[str]) -> str:
647 if len(list) == 0: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true
648 return "nothing"
649 if len(list) == 1: 649 ↛ 651line 649 didn't jump to line 651 because the condition on line 649 was always true
650 return list[0]
651 return ", ".join(list[:-1]) + " and " + list[-1]
654################################################################################
657def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str:
658 pp_deps = (
659 f"{pkg} ({constraint} {version})" if constraint else pkg
660 for pkg, constraint, version in deps
661 )
662 return " |".join(pp_deps)
665################################################################################
668def get_conf() -> apt_pkg.Configuration:
669 return Cnf
672################################################################################
675def parse_args(Options: apt_pkg.Configuration) -> tuple[str, str, str, bool]:
676 """Handle -a, -c and -s arguments; returns them as SQL constraints"""
677 # XXX: This should go away and everything which calls it be converted
678 # to use SQLA properly. For now, we'll just fix it not to use
679 # the old Pg interface though
680 session = DBConn().session()
681 # Process suite
682 if Options["Suite"]: 682 ↛ 700line 682 didn't jump to line 700 because the condition on line 682 was always true
683 suite_ids_list = []
684 for suitename in split_args(Options["Suite"]):
685 suite = get_suite(suitename, session=session)
686 if not suite or suite.suite_id is None: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true
687 warn(
688 "suite '%s' not recognised."
689 % (suite and suite.suite_name or suitename)
690 )
691 else:
692 suite_ids_list.append(suite.suite_id)
693 if suite_ids_list: 693 ↛ 698line 693 didn't jump to line 698 because the condition on line 693 was always true
694 con_suites = "AND su.id IN (%s)" % ", ".join(
695 [str(i) for i in suite_ids_list]
696 )
697 else:
698 fubar("No valid suite given.")
699 else:
700 con_suites = ""
702 # Process component
703 if Options["Component"]: 703 ↛ 704line 703 didn't jump to line 704 because the condition on line 703 was never true
704 component_ids_list = []
705 for componentname in split_args(Options["Component"]):
706 component = get_component(componentname, session=session)
707 if component is None:
708 warn("component '%s' not recognised." % (componentname))
709 else:
710 component_ids_list.append(component.component_id)
711 if component_ids_list:
712 con_components = "AND c.id IN (%s)" % ", ".join(
713 [str(i) for i in component_ids_list]
714 )
715 else:
716 fubar("No valid component given.")
717 else:
718 con_components = ""
720 # Process architecture
721 con_architectures = ""
722 check_source = False
723 if Options["Architecture"]: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 arch_ids_list = []
725 for archname in split_args(Options["Architecture"]):
726 if archname == "source":
727 check_source = True
728 else:
729 arch = get_architecture(archname, session=session)
730 if arch is None:
731 warn("architecture '%s' not recognised." % (archname))
732 else:
733 arch_ids_list.append(arch.arch_id)
734 if arch_ids_list:
735 con_architectures = "AND a.id IN (%s)" % ", ".join(
736 [str(i) for i in arch_ids_list]
737 )
738 else:
739 if not check_source:
740 fubar("No valid architecture given.")
741 else:
742 check_source = True
744 return (con_suites, con_architectures, con_components, check_source)
747################################################################################
750@functools.total_ordering
751class ArchKey:
752 """
753 Key object for use in sorting lists of architectures.
755 Sorts normally except that 'source' dominates all others.
756 """
758 __slots__ = ["arch", "issource"]
760 def __init__(self, arch: str, *args):
761 self.arch = arch
762 self.issource = arch == "source"
764 def __lt__(self, other: "ArchKey") -> bool:
765 if self.issource:
766 return not other.issource
767 if other.issource:
768 return False
769 return self.arch < other.arch
771 @override
772 def __eq__(self, other: object) -> bool:
773 if not isinstance(other, ArchKey): 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true
774 return NotImplemented
775 return self.arch == other.arch
778################################################################################
781def split_args(s: str, dwim: bool = True) -> list[str]:
782 """
783 Split command line arguments which can be separated by either commas
784 or whitespace. If dwim is set, it will complain about string ending
785 in comma since this usually means someone did 'dak ls -a i386, m68k
786 foo' or something and the inevitable confusion resulting from 'm68k'
787 being treated as an argument is undesirable.
788 """
790 if s.find(",") == -1: 790 ↛ 793line 790 didn't jump to line 793 because the condition on line 790 was always true
791 return s.split()
792 else:
793 if s[-1:] == "," and dwim:
794 fubar("split_args: found trailing comma, spurious space maybe?")
795 return s.split(",")
798################################################################################
801def split_args_or_none(s: str | None, dwim: bool = True) -> list[str] | None:
802 """
803 Split command line arguments like `split_args`, but return `None` for empty string
804 """
805 if not s:
806 return None
807 return split_args(s, dwim)
810################################################################################
813def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]:
814 if keyrings is None: 814 ↛ 817line 814 didn't jump to line 817 because the condition on line 814 was always true
815 keyrings = get_active_keyring_paths()
817 return ["--keyring={}".format(path) for path in keyrings]
820################################################################################
823def _gpg_get_addresses_from_listing(output: bytes) -> list[str]:
824 addresses: list[str] = []
826 for line in output.split(b"\n"):
827 parts = line.split(b":")
828 if parts[0] not in (b"uid", b"pub"):
829 continue
830 if parts[1] in (b"i", b"d", b"r"): 830 ↛ 832line 830 didn't jump to line 832 because the condition on line 830 was never true
831 # Skip uid that is invalid, disabled or revoked
832 continue
833 try:
834 uid_bytes = parts[9]
835 except IndexError:
836 continue
837 try:
838 uid = uid_bytes.decode(encoding="utf-8")
839 except UnicodeDecodeError:
840 # If the uid is not valid UTF-8, we assume it is an old uid
841 # still encoding in Latin-1.
842 uid = uid_bytes.decode(encoding="latin1")
843 m = re_parse_maintainer.match(uid)
844 if not m:
845 continue
846 address = m.group(2)
847 if address.endswith("@debian.org"): 847 ↛ 850line 847 didn't jump to line 850 because the condition on line 847 was never true
848 # prefer @debian.org addresses
849 # TODO: maybe not hardcode the domain
850 addresses.insert(0, address)
851 else:
852 addresses.append(address)
854 return addresses
857def gpg_get_key_addresses(fingerprint: str) -> list[str]:
858 """retreive email addresses from gpg key uids for a given fingerprint"""
859 addresses = key_uid_email_cache.get(fingerprint)
860 if addresses is not None:
861 return addresses
863 try:
864 cmd = ["gpg", "--no-default-keyring"]
865 cmd.extend(gpg_keyring_args())
866 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint])
867 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
868 except subprocess.CalledProcessError:
869 addresses = []
870 else:
871 addresses = _gpg_get_addresses_from_listing(output)
873 key_uid_email_cache[fingerprint] = addresses
874 return addresses
877################################################################################
880def open_ldap_connection() -> Any:
881 """open connection to the configured LDAP server"""
882 import ldap # type: ignore
884 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"]
885 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile")
887 conn = ldap.initialize(LDAPServer)
889 if ca_cert_file:
890 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
891 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file)
892 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True)
893 conn.start_tls_s()
895 conn.simple_bind_s("", "")
897 return conn
900################################################################################
903def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]:
904 """retrieve login from LDAP linked to a given fingerprint"""
905 import ldap
907 conn = open_ldap_connection()
908 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
909 Attrs = conn.search_s(
910 LDAPDn,
911 ldap.SCOPE_ONELEVEL,
912 "(keyfingerprint=%s)" % fingerprint,
913 ["uid", "keyfingerprint"],
914 )
915 login: dict[str, str] = {}
916 for elem in Attrs:
917 fpr = elem[1]["keyFingerPrint"][0].decode()
918 uid = elem[1]["uid"][0].decode()
919 login[fpr] = uid
920 return login
923################################################################################
926def get_users_from_ldap() -> dict[str, str]:
927 """retrieve login and user names from LDAP"""
928 import ldap
930 conn = open_ldap_connection()
931 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
932 Attrs = conn.search_s(
933 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"]
934 )
935 users: dict[str, str] = {}
936 for elem in Attrs:
937 elem = elem[1]
938 name = []
939 for k in ("cn", "mn", "sn"):
940 try:
941 value = elem[k][0].decode()
942 if value and value[0] != "-":
943 name.append(value)
944 except KeyError:
945 pass
946 users[" ".join(name)] = elem["uid"][0]
947 return users
950################################################################################
953def clean_symlink(src: str, dest: str, root: str) -> str:
954 """
955 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'.
956 Returns fixed 'src'
957 """
958 src = src.replace(root, "", 1)
959 dest = dest.replace(root, "", 1)
960 dest = os.path.dirname(dest)
961 new_src = "../" * len(dest.split("/"))
962 return new_src + src
965################################################################################
968def temp_dirname(
969 parent: Optional[str] = None,
970 prefix: str = "dak",
971 suffix: str = "",
972 mode: Optional[int] = None,
973 group: Optional[str] = None,
974) -> str:
975 """
976 Return a secure and unique directory by pre-creating it.
978 :param parent: If non-null it will be the directory the directory is pre-created in.
979 :param prefix: The filename will be prefixed with this string
980 :param suffix: The filename will end with this string
981 :param mode: If set the file will get chmodded to those permissions
982 :param group: If set the file will get chgrped to the specified group.
983 :return: Returns a pair (fd, name)
985 """
987 tfname = tempfile.mkdtemp(suffix, prefix, parent)
988 if mode is not None: 988 ↛ 990line 988 didn't jump to line 990 because the condition on line 988 was always true
989 os.chmod(tfname, mode)
990 if group is not None: 990 ↛ 991line 990 didn't jump to line 991 because the condition on line 990 was never true
991 gid = grp.getgrnam(group).gr_gid
992 os.chown(tfname, -1, gid)
993 return tfname
996################################################################################
999def get_changes_files(from_dir: str) -> list[str]:
1000 """
1001 Takes a directory and lists all .changes files in it (as well as chdir'ing
1002 to the directory; this is due to broken behaviour on the part of p-u/p-a
1003 when you're not in the right place)
1005 Returns a list of filenames
1006 """
1007 try:
1008 # Much of the rest of p-u/p-a depends on being in the right place
1009 os.chdir(from_dir)
1010 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")]
1011 except OSError as e:
1012 fubar("Failed to read list from directory %s (%s)" % (from_dir, e))
1014 return changes_files
1017################################################################################
1020Cnf: apt_pkg.Configuration = config.Config().Cnf
1022################################################################################
1025def parse_wnpp_bug_file(
1026 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm",
1027) -> dict[str, list[str]]:
1028 """
1029 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm
1030 Well, actually it parsed a local copy, but let's document the source
1031 somewhere ;)
1033 returns a dict associating source package name with a list of open wnpp
1034 bugs (Yes, there might be more than one)
1035 """
1037 try:
1038 with open(file) as f:
1039 lines = f.readlines()
1040 except OSError:
1041 print(
1042 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any."
1043 % file
1044 )
1045 lines = []
1046 wnpp = {}
1048 for line in lines:
1049 splited_line = line.split(": ", 1)
1050 if len(splited_line) > 1:
1051 wnpp[splited_line[0]] = splited_line[1].split("|")
1053 for source in wnpp:
1054 bugs = []
1055 for wnpp_bug in wnpp[source]:
1056 m = re.search(r"(\d)+", wnpp_bug)
1057 assert m is not None
1058 bug_no = m.group()
1059 if bug_no:
1060 bugs.append(bug_no)
1061 wnpp[source] = bugs
1062 return wnpp
1065################################################################################
1068def deb_extract_control(path: str) -> bytes:
1069 """extract DEBIAN/control from a binary package"""
1070 return apt_inst.DebFile(path).control.extractdata("control")
1073################################################################################
1076def mail_addresses_for_upload(
1077 maintainer: str,
1078 changed_by: str,
1079 fingerprint: str,
1080 authorized_by_fingerprint: Optional[str],
1081) -> list[str]:
1082 """mail addresses to contact for an upload
1084 :param maintainer: Maintainer field of the .changes file
1085 :param changed_by: Changed-By field of the .changes file
1086 :param fingerprint: fingerprint of the key used to sign the upload
1087 :return: list of RFC 2047-encoded mail addresses to contact regarding
1088 this upload
1089 """
1090 recipients = Cnf.value_list("Dinstall::UploadMailRecipients")
1091 if not recipients: 1091 ↛ 1100line 1091 didn't jump to line 1100 because the condition on line 1091 was always true
1092 recipients = [
1093 "maintainer",
1094 "changed_by",
1095 "signer",
1096 "authorized_by",
1097 ]
1099 # Ensure signer and authorized_by are last if present
1100 for r in ("signer", "authorized_by"):
1101 try:
1102 recipients.remove(r)
1103 except ValueError:
1104 pass
1105 else:
1106 recipients.append(r)
1108 # Compute the set of addresses of the recipients
1109 addresses = set() # Name + email
1110 emails = set() # Email only, used to avoid duplicates
1111 for recipient in recipients:
1112 address: str | None
1113 if recipient.startswith("mail:"): # Email hardcoded in config 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true
1114 address = recipient[5:]
1115 elif recipient == "maintainer":
1116 address = maintainer
1117 elif recipient == "changed_by":
1118 address = changed_by
1119 elif recipient == "signer" or recipient == "authorized_by": 1119 ↛ 1129line 1119 didn't jump to line 1129 because the condition on line 1119 was always true
1120 fpr = fingerprint if recipient == "signer" else authorized_by_fingerprint
1121 if not fpr: 1121 ↛ 1122line 1121 didn't jump to line 1122 because the condition on line 1121 was never true
1122 continue
1123 fpr_addresses = gpg_get_key_addresses(fpr)
1124 address = fpr_addresses[0] if fpr_addresses else None
1125 if any(x in emails for x in fpr_addresses):
1126 # The signer already gets a copy via another email
1127 address = None
1128 else:
1129 raise Exception(
1130 "Unsupported entry in {0}: {1}".format(
1131 "Dinstall::UploadMailRecipients", recipient
1132 )
1133 )
1135 if address is not None:
1136 mail = fix_maintainer(address)[3]
1137 if mail not in emails:
1138 addresses.add(address)
1139 emails.add(mail)
1141 encoded_addresses = [fix_maintainer(e)[1] for e in addresses]
1142 return encoded_addresses
1145################################################################################
1148def call_editor_for_file(path: str) -> None:
1149 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor"))
1150 subprocess.check_call([editor, path])
1153################################################################################
1156def call_editor(text: str = "", suffix: str = ".txt") -> str:
1157 """run editor and return the result as a string
1159 :param text: initial text
1160 :param suffix: extension for temporary file
1161 :return: string with the edited text
1162 """
1163 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh:
1164 print(text, end="", file=fh)
1165 fh.flush()
1166 call_editor_for_file(fh.name)
1167 fh.seek(0)
1168 return fh.read()
1171################################################################################
1174def check_reverse_depends(
1175 removals: Iterable[str],
1176 suite: str,
1177 arches: Optional[Iterable[str]] = None,
1178 session: "Session | None" = None,
1179 cruft: bool = False,
1180 quiet: bool = False,
1181 include_arch_all: bool = True,
1182) -> bool:
1183 assert session is not None # TODO: remove default value...
1185 dbsuite = get_suite(suite, session)
1186 assert dbsuite is not None
1187 overridesuite = (
1188 get_suite(dbsuite.overridesuite, session) if dbsuite.overridesuite else dbsuite
1189 )
1190 assert overridesuite is not None
1191 dep_problem = False
1192 p2c = {}
1193 all_broken: defaultdict[str, defaultdict[str, set[str]]] = defaultdict( 1193 ↛ exitline 1193 didn't jump to the function exit
1194 lambda: defaultdict(set)
1195 )
1196 if arches: 1196 ↛ 1197line 1196 didn't jump to line 1197 because the condition on line 1196 was never true
1197 all_arches = set(arches)
1198 else:
1199 all_arches = set(
1200 x.arch_string for x in get_suite_architectures(suite, session=session)
1201 )
1202 all_arches -= set(["source", "all"])
1203 removal_set = set(removals)
1204 metakey_d = get_or_set_metadatakey("Depends", session)
1205 metakey_p = get_or_set_metadatakey("Provides", session)
1206 if include_arch_all: 1206 ↛ 1209line 1206 didn't jump to line 1209 because the condition on line 1206 was always true
1207 rdep_architectures = all_arches | set(["all"])
1208 else:
1209 rdep_architectures = all_arches
1210 for architecture in rdep_architectures:
1211 deps = {}
1212 sources = {}
1213 virtual_packages = {}
1215 params: dict[str, object] = {
1216 "suite_id": dbsuite.suite_id,
1217 "metakey_d_id": metakey_d.key_id,
1218 "metakey_p_id": metakey_p.key_id,
1219 }
1220 arch = get_architecture(architecture, session)
1221 if arch is None: 1221 ↛ 1222line 1221 didn't jump to line 1222 because the condition on line 1221 was never true
1222 continue
1223 params["arch_id"] = arch.arch_id
1225 statement = sql.text(
1226 """
1227 SELECT b.package, s.source, c.name as component,
1228 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends,
1229 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides
1230 FROM binaries b
1231 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id
1232 JOIN source s ON b.source = s.id
1233 JOIN files_archive_map af ON b.file = af.file_id
1234 JOIN component c ON af.component_id = c.id
1235 WHERE b.architecture = :arch_id"""
1236 )
1237 query = session.execute(statement, params)
1238 for package, source, component, depends, provides in query:
1239 sources[package] = source
1240 p2c[package] = component
1241 if depends is not None: 1241 ↛ 1242line 1241 didn't jump to line 1242 because the condition on line 1241 was never true
1242 deps[package] = depends
1243 # Maintain a counter for each virtual package. If a
1244 # Provides: exists, set the counter to 0 and count all
1245 # provides by a package not in the list for removal.
1246 # If the counter stays 0 at the end, we know that only
1247 # the to-be-removed packages provided this virtual
1248 # package.
1249 if provides is not None: 1249 ↛ 1250line 1249 didn't jump to line 1250 because the condition on line 1249 was never true
1250 for virtual_pkg in provides.split(","):
1251 virtual_pkg = virtual_pkg.strip()
1252 if virtual_pkg == package:
1253 continue
1254 if virtual_pkg not in virtual_packages:
1255 virtual_packages[virtual_pkg] = 0
1256 if package not in removals:
1257 virtual_packages[virtual_pkg] += 1
1259 # If a virtual package is only provided by the to-be-removed
1260 # packages, treat the virtual package as to-be-removed too.
1261 removal_set.update(
1262 virtual_pkg
1263 for virtual_pkg in virtual_packages
1264 if not virtual_packages[virtual_pkg]
1265 )
1267 # Check binary dependencies (Depends)
1268 for package in deps: 1268 ↛ 1269line 1268 didn't jump to line 1269 because the loop on line 1268 never started
1269 if package in removals:
1270 continue
1271 try:
1272 parsed_dep = apt_pkg.parse_depends(deps[package])
1273 except ValueError as e:
1274 print("Error for package %s: %s" % (package, e))
1275 parsed_dep = []
1276 for dep in parsed_dep:
1277 # Check for partial breakage. If a package has a ORed
1278 # dependency, there is only a dependency problem if all
1279 # packages in the ORed depends will be removed.
1280 unsat = 0
1281 for dep_package, _, _ in dep:
1282 if dep_package in removals:
1283 unsat += 1
1284 if unsat == len(dep):
1285 component = p2c[package]
1286 source = sources[package]
1287 if component != "main":
1288 source = "%s/%s" % (source, component)
1289 all_broken[source][package].add(architecture)
1290 dep_problem = True
1292 if all_broken and not quiet: 1292 ↛ 1293line 1292 didn't jump to line 1293 because the condition on line 1292 was never true
1293 if cruft:
1294 print(" - broken Depends:")
1295 else:
1296 print("# Broken Depends:")
1297 for source, bindict in sorted(all_broken.items()):
1298 lines = []
1299 for binary, bin_arches in sorted(bindict.items()):
1300 if bin_arches == all_arches or "all" in bin_arches:
1301 lines.append(binary)
1302 else:
1303 lines.append("%s [%s]" % (binary, " ".join(sorted(bin_arches))))
1304 if cruft:
1305 print(" %s: %s" % (source, lines[0]))
1306 else:
1307 print("%s: %s" % (source, lines[0]))
1308 for line in lines[1:]:
1309 if cruft:
1310 print(" " + " " * (len(source) + 2) + line)
1311 else:
1312 print(" " * (len(source) + 2) + line)
1313 if not cruft:
1314 print()
1316 # Check source dependencies (Build-Depends and Build-Depends-Indep)
1317 all_broken_bd: dict[str, set[str]] = defaultdict(set)
1318 metakey_bd = get_or_set_metadatakey("Build-Depends", session)
1319 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session)
1320 metakey_ids: tuple[int, ...]
1321 if include_arch_all: 1321 ↛ 1324line 1321 didn't jump to line 1324 because the condition on line 1321 was always true
1322 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id)
1323 else:
1324 metakey_ids = (metakey_bd.key_id,)
1326 params = {
1327 "suite_id": dbsuite.suite_id,
1328 "metakey_ids": metakey_ids,
1329 }
1330 statement = sql.text(
1331 """
1332 SELECT s.source, replace(string_agg(trim(sm.value), ', '), ',,', ',') as build_dep
1333 FROM source s
1334 JOIN source_metadata sm ON s.id = sm.src_id
1335 WHERE s.id in
1336 (SELECT src FROM newest_src_association
1337 WHERE suite = :suite_id)
1338 AND sm.key_id in :metakey_ids
1339 GROUP BY s.id, s.source"""
1340 )
1341 query = session.execute(statement, params)
1342 for source, build_dep in query:
1343 if source in removals:
1344 continue
1345 parsed_dep = []
1346 if build_dep is not None: 1346 ↛ 1353line 1346 didn't jump to line 1353 because the condition on line 1346 was always true
1347 # Remove [arch] information since we want to see breakage on all arches
1348 build_dep = re_build_dep_arch.sub("", build_dep)
1349 try:
1350 parsed_dep = apt_pkg.parse_src_depends(build_dep)
1351 except ValueError as e:
1352 print("Error for source %s: %s" % (source, e))
1353 for dep in parsed_dep:
1354 unsat = 0
1355 for dep_package, _, _ in dep:
1356 if dep_package in removals: 1356 ↛ 1357line 1356 didn't jump to line 1357 because the condition on line 1356 was never true
1357 unsat += 1
1358 if unsat == len(dep): 1358 ↛ 1359line 1358 didn't jump to line 1359
1359 component = (
1360 session.query(Component.component_name)
1361 .join(Component.overrides)
1362 .filter(Override.suite == overridesuite)
1363 .filter(
1364 Override.package
1365 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source)
1366 )
1367 .join(Override.overridetype)
1368 .filter(OverrideType.overridetype == "dsc")
1369 .scalar()
1370 )
1371 key = source
1372 if component != "main":
1373 key = "%s/%s" % (source, component)
1374 all_broken_bd[key].add(pp_deps(dep))
1375 dep_problem = True
1377 if all_broken_bd and not quiet: 1377 ↛ 1378line 1377 didn't jump to line 1378 because the condition on line 1377 was never true
1378 if cruft:
1379 print(" - broken Build-Depends:")
1380 else:
1381 print("# Broken Build-Depends:")
1382 for source, bdeps in sorted(all_broken_bd.items()):
1383 sorted_bdeps = sorted(bdeps)
1384 if cruft:
1385 print(" %s: %s" % (source, sorted_bdeps[0]))
1386 else:
1387 print("%s: %s" % (source, sorted_bdeps[0]))
1388 for bdep in sorted_bdeps[1:]:
1389 if cruft:
1390 print(" " + " " * (len(source) + 2) + bdep)
1391 else:
1392 print(" " * (len(source) + 2) + bdep)
1393 if not cruft:
1394 print()
1396 return dep_problem
1399################################################################################
1402def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]:
1403 """source packages referenced via Built-Using
1405 :param control: control file to take Built-Using field from
1406 :return: list of (source_name, source_version) pairs
1407 """
1408 built_using = control.get("Built-Using", None)
1409 if built_using is None:
1410 return []
1412 bu = []
1413 for dep in apt_pkg.parse_depends(built_using):
1414 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field"
1415 source_name, source_version, comp = dep[0]
1416 assert comp == "=", "Built-Using must contain strict dependencies"
1417 bu.append((source_name, source_version))
1419 return bu
1422################################################################################
1425def is_in_debug_section(control: Mapping[str, str] | MetadataProxy) -> bool:
1426 """binary package is a debug package
1428 :param control: control file of binary package
1429 :return: True if the binary package is a debug package
1430 """
1431 section = control["Section"].split("/", 1)[-1]
1432 auto_built_package = control.get("Auto-Built-Package")
1433 return section == "debug" and auto_built_package == "debug-symbols"
1436################################################################################
1439def find_possibly_compressed_file(filename: str) -> str:
1440 """
1442 :param filename: path to a control file (Sources, Packages, etc) to
1443 look for
1444 :return: path to the (possibly compressed) control file, or null if the
1445 file doesn't exist
1446 """
1447 _compressions = ("", ".xz", ".gz", ".bz2")
1449 for ext in _compressions: 1449 ↛ 1454line 1449 didn't jump to line 1454 because the loop on line 1449 didn't complete
1450 _file = filename + ext
1451 if os.path.exists(_file):
1452 return _file
1454 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
1457################################################################################
1460def parse_boolean_from_user(value: str) -> bool:
1461 value = value.lower()
1462 if value in {"yes", "true", "enable", "enabled"}:
1463 return True
1464 if value in {"no", "false", "disable", "disabled"}: 1464 ↛ 1466line 1464 didn't jump to line 1466 because the condition on line 1464 was always true
1465 return False
1466 raise ValueError("Not sure whether %s should be a True or a False" % value)
1469def suite_suffix(suite_name: str) -> str:
1470 """Return suite_suffix for the given suite"""
1471 suffix = Cnf.find("Dinstall::SuiteSuffix", "")
1472 if suffix == "": 1472 ↛ 1474line 1472 didn't jump to line 1474 because the condition on line 1472 was always true
1473 return ""
1474 elif "Dinstall::SuiteSuffixSuites" not in Cnf:
1475 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future
1476 return suffix
1477 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"):
1478 return suffix
1479 return ""
1482################################################################################
1485def process_buildinfos(
1486 directory: str,
1487 buildinfo_files: "Iterable[daklib.upload.HashedFile]",
1488 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1489 logger: "daklib.daklog.Logger",
1490) -> None:
1491 """Copy buildinfo files into Dir::BuildinfoArchive
1493 :param directory: directory where .changes is stored
1494 :param buildinfo_files: names of buildinfo files
1495 :param fs_transaction: FilesystemTransaction instance
1496 :param logger: logger instance
1497 """
1499 if "Dir::BuildinfoArchive" not in Cnf:
1500 return
1502 target_dir = os.path.join(
1503 Cnf["Dir::BuildinfoArchive"],
1504 datetime.datetime.now().strftime("%Y/%m/%d"),
1505 )
1507 for f in buildinfo_files:
1508 src = os.path.join(directory, f.filename)
1509 dst = find_next_free(os.path.join(target_dir, f.filename))
1511 logger.log(["Archiving", f.filename])
1512 fs_transaction.copy(src, dst, mode=0o644)
1515################################################################################
1518def move_to_morgue(
1519 morguesubdir: str,
1520 filenames: Iterable[str],
1521 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1522 logger: "daklib.daklog.Logger",
1523) -> None:
1524 """Move a file to the correct dir in morgue
1526 :param morguesubdir: subdirectory of morgue where this file needs to go
1527 :param filenames: names of files
1528 :param fs_transaction: FilesystemTransaction instance
1529 :param logger: logger instance
1530 """
1532 assert Cnf["Dir::Base"]
1533 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf["Dir::Base"], "morgue"))
1535 # Build directory as morguedir/morguesubdir/year/month/day
1536 now = datetime.datetime.now()
1537 dest = os.path.join(
1538 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day
1539 )
1541 for filename in filenames:
1542 dest_filename = dest + "/" + os.path.basename(filename)
1543 # If the destination file exists; try to find another filename to use
1544 if os.path.lexists(dest_filename): 1544 ↛ 1545line 1544 didn't jump to line 1545 because the condition on line 1544 was never true
1545 dest_filename = find_next_free(dest_filename)
1546 logger.log(["move to morgue", filename, dest_filename])
1547 fs_transaction.move(filename, dest_filename)