Coverage for daklib/utils.py: 59%
758 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1# vim:set et ts=4 sw=4:
3"""Utility functions
5@contact: Debian FTP Master <ftpmaster@debian.org>
6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
7@license: GNU General Public License version 2 or later
8"""
10# This program is free software; you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation; either version 2 of the License, or
13# (at your option) any later version.
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
20# You should have received a copy of the GNU General Public License
21# along with this program; if not, write to the Free Software
22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24import datetime
25import email.policy
26import errno
27import functools
28import grp
29import os
30import pwd
31import re
32import shutil
33import subprocess
34import sys
35import tempfile
36from collections import defaultdict
37from collections.abc import Collection, Iterable, Mapping, Sequence
38from pathlib import Path
39from typing import TYPE_CHECKING, Any, Literal, NoReturn, Optional, Union, override
41import apt_inst
42import apt_pkg
43import sqlalchemy.sql as sql
45import daklib.config as config
46import daklib.mail
47from daklib.dbconn import (
48 Component,
49 DBConn,
50 MetadataProxy,
51 Override,
52 OverrideType,
53 get_active_keyring_paths,
54 get_architecture,
55 get_component,
56 get_or_set_metadatakey,
57 get_suite,
58 get_suite_architectures,
59)
61from .dak_exceptions import (
62 InvalidDscError,
63 NoFilesFieldError,
64 NoFreeFilenameError,
65 ParseChangesError,
66 SendmailFailedError,
67 UnknownFormatError,
68)
69from .formats import parse_format, validate_changes_format
70from .gpg import SignedFile
71from .regexes import (
72 re_build_dep_arch,
73 re_issource,
74 re_multi_line_field,
75 re_parse_maintainer,
76 re_re_mark,
77 re_single_line_field,
78 re_srchasver,
79 re_whitespace_comment,
80)
81from .srcformats import get_format_from_string
82from .textutils import fix_maintainer
84if TYPE_CHECKING:
85 from sqlalchemy.orm import Session
87 import daklib.daklog
88 import daklib.fstransactions
89 import daklib.upload
91type StrPath = str | os.PathLike[str]
94################################################################################
96key_uid_email_cache: dict[str, list[str]] = (
97 {}
98) #: Cache for email addresses from gpg key uids
100################################################################################
103def input_or_exit(prompt: Optional[str] = None) -> str:
104 try:
105 return input(prompt)
106 except EOFError:
107 sys.exit("\nUser interrupt (^D).")
110################################################################################
113def extract_component_from_section(section: str) -> tuple[str, str]:
114 """split "section" into "section", "component" parts
116 If "component" is not given, "main" is used instead.
118 :return: tuple (section, component)
119 """
120 if section.find("/") != -1:
121 return section, section.split("/", 1)[0]
122 return section, "main"
125################################################################################
128def parse_deb822(
129 armored_contents: bytes,
130 signing_rules: Literal[-1, 0, 1] = 0,
131 keyrings: Collection[str] | None = None,
132) -> dict[str, str]:
133 require_signature = True
134 if keyrings is None: 134 ↛ 138line 134 didn't jump to line 138 because the condition on line 134 was always true
135 keyrings = []
136 require_signature = False
138 signed_file = SignedFile(
139 armored_contents, keyrings=keyrings, require_signature=require_signature
140 )
141 contents = signed_file.contents.decode("utf-8")
143 error = ""
144 changes = {}
146 # Split the lines in the input, keeping the linebreaks.
147 lines = contents.splitlines(True)
149 if len(lines) == 0:
150 raise ParseChangesError("[Empty changes file]")
152 # Reindex by line number so we can easily verify the format of
153 # .dsc files...
154 index = 0
155 indexed_lines = {}
156 for line in lines:
157 index += 1
158 indexed_lines[index] = line[:-1]
160 num_of_lines = len(indexed_lines)
161 index = 0
162 first = -1
163 while index < num_of_lines:
164 index += 1
165 line = indexed_lines[index]
166 if line == "" and signing_rules == 1: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 if index != num_of_lines:
168 raise InvalidDscError(index)
169 break
170 if slf := re_single_line_field.match(line):
171 field = slf.groups()[0].lower()
172 changes[field] = slf.groups()[1]
173 first = 1
174 continue
175 if line == " .":
176 changes[field] += "\n"
177 continue
178 if mlf := re_multi_line_field.match(line):
179 if first == -1: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 raise ParseChangesError(
181 "'%s'\n [Multi-line field continuing on from nothing?]" % (line)
182 )
183 if first == 1 and changes[field] != "":
184 changes[field] += "\n"
185 first = 0
186 changes[field] += mlf.groups()[0] + "\n"
187 continue
188 error += line
190 changes["filecontents"] = armored_contents.decode()
192 if "source" in changes:
193 # Strip the source version in brackets from the source field,
194 # put it in the "source-version" field instead.
195 if srcver := re_srchasver.search(changes["source"]): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 changes["source"] = srcver.group(1)
197 changes["source-version"] = srcver.group(2)
199 if error: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 raise ParseChangesError(error)
202 return changes
205################################################################################
208def parse_changes(
209 filename: str,
210 signing_rules: Literal[-1, 0, 1] = 0,
211 dsc_file: bool = False,
212 keyrings: Collection[str] | None = None,
213) -> dict[str, str]:
214 """
215 Parses a changes or source control (.dsc) file and returns a dictionary
216 where each field is a key. The mandatory first argument is the
217 filename of the .changes file.
219 signing_rules is an optional argument:
221 - If signing_rules == -1, no signature is required.
222 - If signing_rules == 0 (the default), a signature is required.
223 - If signing_rules == 1, it turns on the same strict format checking
224 as dpkg-source.
226 The rules for (signing_rules == 1)-mode are:
228 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----"
229 followed by any PGP header data and must end with a blank line.
231 - The data section must end with a blank line and must be followed by
232 "-----BEGIN PGP SIGNATURE-----".
234 :param dsc_file: `filename` is a Debian source control (.dsc) file
235 """
237 with open(filename, "rb") as changes_in:
238 content = changes_in.read()
239 changes = parse_deb822(content, signing_rules, keyrings=keyrings)
241 if not dsc_file:
242 # Finally ensure that everything needed for .changes is there
243 must_keywords = (
244 "Format",
245 "Date",
246 "Source",
247 "Architecture",
248 "Version",
249 "Distribution",
250 "Maintainer",
251 "Changes",
252 "Files",
253 )
255 missingfields = []
256 for keyword in must_keywords:
257 if keyword.lower() not in changes: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 missingfields.append(keyword)
260 if len(missingfields):
261 raise ParseChangesError(
262 "Missing mandatory field(s) in changes file (policy 5.5): %s"
263 % (missingfields)
264 )
266 return changes
269################################################################################
272def check_dsc_files(
273 dsc_filename: str,
274 dsc: Mapping[str, str],
275 dsc_files: Iterable[str],
276) -> list[str]:
277 """
278 Verify that the files listed in the Files field of the .dsc are
279 those expected given the announced Format.
281 :param dsc_filename: path of .dsc file
282 :param dsc: the content of the .dsc parsed by :func:`parse_changes`
283 :param dsc_files: the file list returned by :func:`build_file_list`
284 :return: all errors detected
285 """
286 rejmsg = []
288 # Ensure .dsc lists proper set of source files according to the format
289 # announced
290 has: defaultdict[str, int] = defaultdict(lambda: 0)
292 ftype_lookup = (
293 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)),
294 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")),
295 (r"diff\.gz", ("debian_diff",)),
296 (r"tar\.gz", ("native_tar_gz", "native_tar")),
297 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)),
298 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)),
299 (r"tar\.(gz|bz2|xz)", ("native_tar",)),
300 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)),
301 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)),
302 )
304 for f in dsc_files:
305 m = re_issource.match(f)
306 if not m: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 rejmsg.append(
308 "%s: %s in Files field not recognised as source." % (dsc_filename, f)
309 )
310 continue
312 # Populate 'has' dictionary by resolving keys in lookup table
313 matched = False
314 for regex, keys in ftype_lookup: 314 ↛ 322line 314 didn't jump to line 322 because the loop on line 314 didn't complete
315 if re.match(regex, m.group(3)):
316 matched = True
317 for key in keys:
318 has[key] += 1
319 break
321 # File does not match anything in lookup table; reject
322 if not matched: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f))
324 break
326 # Check for multiple files
327 for file_type in (
328 "orig_tar",
329 "orig_tar_sig",
330 "native_tar",
331 "debian_tar",
332 "debian_diff",
333 ):
334 if has[file_type] > 1: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type))
337 # Source format specific tests
338 try:
339 format = get_format_from_string(dsc["format"])
340 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)])
342 except UnknownFormatError:
343 # Not an error here for now
344 pass
346 return rejmsg
349################################################################################
351# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl
354def build_file_list(
355 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum"
356) -> dict[str, dict[str, str]]:
357 files = {}
359 # Make sure we have a Files: field to parse...
360 if field not in changes: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true
361 raise NoFilesFieldError
363 # Validate .changes Format: field
364 if not is_a_dsc: 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true
365 validate_changes_format(parse_format(changes["format"]), field)
367 includes_section = (not is_a_dsc) and field == "files"
369 # Parse each entry/line:
370 for i in changes[field].split("\n"): 370 ↛ 395line 370 didn't jump to line 395 because the loop on line 370 didn't complete
371 if not i:
372 break
373 s = i.split()
374 section = priority = ""
375 try:
376 if includes_section: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 (md5, size, section, priority, name) = s
378 else:
379 (md5, size, name) = s
380 except ValueError:
381 raise ParseChangesError(i)
383 if section == "": 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true
384 section = "-"
385 if priority == "": 385 ↛ 388line 385 didn't jump to line 388 because the condition on line 385 was always true
386 priority = "-"
388 (section, component) = extract_component_from_section(section)
390 files[name] = dict(
391 size=size, section=section, priority=priority, component=component
392 )
393 files[name][hashname] = md5
395 return files
398################################################################################
401def send_mail(message: str, whitelists: Optional[list[str | None]] = None) -> None:
402 """sendmail wrapper, takes a message string
404 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists
405 everything, otherwise an address is whitelisted if it is
406 included in any of the lists.
407 In addition a global whitelist can be specified in
408 Dinstall::MailWhiteList.
409 """
411 msg = daklib.mail.parse_mail(message)
413 # The incoming message might be UTF-8, but outgoing mail should
414 # use a legacy-compatible encoding. Set the content to the
415 # text to make sure this is the case.
416 # Note that this does not work with multipart messages.
417 msg.set_content(msg.get_payload(), cte="quoted-printable")
419 # Check whether we're supposed to be sending mail
420 call_sendmail = True
421 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]:
422 call_sendmail = False
424 if whitelists is None or None in whitelists:
425 whitelists = []
426 if Cnf.get("Dinstall::MailWhiteList", ""): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 whitelists.append(Cnf["Dinstall::MailWhiteList"])
428 if len(whitelists) != 0: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 whitelist = []
430 for path in whitelists:
431 assert path is not None
432 with open(path, "r") as whitelist_in:
433 for line in whitelist_in:
434 if not re_whitespace_comment.match(line):
435 if re_re_mark.match(line):
436 whitelist.append(
437 re.compile(re_re_mark.sub("", line.strip(), 1))
438 )
439 else:
440 whitelist.append(re.compile(re.escape(line.strip())))
442 # Fields to check.
443 fields = ["To", "Bcc", "Cc"]
444 for field in fields:
445 # Check each field
446 value = msg.get(field, None)
447 if value is not None:
448 match = []
449 for item in value.split(","):
450 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer(
451 item.strip()
452 )
453 mail_whitelisted = 0
454 for wr in whitelist:
455 if wr.match(mail):
456 mail_whitelisted = 1
457 break
458 if not mail_whitelisted:
459 print("Skipping {0} since it's not whitelisted".format(item))
460 continue
461 match.append(item)
463 # Doesn't have any mail in whitelist so remove the header
464 if len(match) == 0:
465 del msg[field]
466 else:
467 msg.replace_header(field, ", ".join(match))
469 # Change message fields in order if we don't have a To header
470 if "To" not in msg:
471 fields.reverse()
472 for field in fields:
473 if field in msg:
474 msg[fields[-1]] = msg[field]
475 del msg[field]
476 break
477 else:
478 # return, as we removed all recipients.
479 call_sendmail = False
481 # sign mail
482 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 482 ↛ 491line 482 didn't jump to line 491 because the condition on line 482 was always true
483 msg = daklib.mail.sign_mail(
484 msg,
485 keyids=[mailkey],
486 pubring=Cnf.get("Dinstall::SigningPubKeyring") or None,
487 homedir=Cnf.get("Dinstall::SigningHomedir") or None,
488 passphrase_file=Cnf.get("Dinstall::SigningPassphraseFile") or None,
489 )
491 msg_bytes = msg.as_bytes(policy=email.policy.default)
493 maildir = Cnf.get("Dir::Mail")
494 if maildir: 494 ↛ 501line 494 didn't jump to line 501 because the condition on line 494 was always true
495 path = os.path.join(maildir, datetime.datetime.now().isoformat())
496 path = find_next_free(path)
497 with open(path, "wb") as fh:
498 fh.write(msg_bytes)
500 # Invoke sendmail
501 if not call_sendmail:
502 return
503 try:
504 subprocess.run(
505 Cnf["Dinstall::SendmailCommand"].split(),
506 input=msg_bytes,
507 check=True,
508 stdout=subprocess.PIPE,
509 stderr=subprocess.STDOUT,
510 )
511 except subprocess.CalledProcessError as e:
512 raise SendmailFailedError(e.output.decode().rstrip())
515################################################################################
518def poolify(source: str) -> str:
519 """convert `source` name into directory path used in pool structure"""
520 if source[:3] == "lib": 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 return source[:4] + "/" + source + "/"
522 else:
523 return source[:1] + "/" + source + "/"
526################################################################################
529def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None:
530 if os.path.exists(dest) and os.path.isdir(dest):
531 dest_dir = dest
532 else:
533 dest_dir = os.path.dirname(dest)
534 if not os.path.lexists(dest_dir):
535 umask = os.umask(00000)
536 os.makedirs(dest_dir, 0o2775)
537 os.umask(umask)
538 # print "Moving %s to %s..." % (src, dest)
539 if os.path.exists(dest) and os.path.isdir(dest):
540 dest += "/" + os.path.basename(src)
541 # Don't overwrite unless forced to
542 if os.path.lexists(dest):
543 if not overwrite:
544 fubar("Can't move %s to %s - file already exists." % (src, dest))
545 else:
546 if not os.access(dest, os.W_OK):
547 fubar(
548 "Can't move %s to %s - can't write to existing file." % (src, dest)
549 )
550 shutil.copy2(src, dest)
551 os.chmod(dest, perms)
552 os.unlink(src)
555################################################################################
558def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str:
559 """Perform a substition of template"""
560 with open(filename) as templatefile:
561 template = templatefile.read()
562 for k, v in subst_map.items():
563 template = template.replace(k, str(v))
564 return template
567################################################################################
570def fubar(msg: str, exit_code: int = 1) -> NoReturn:
571 """print error message and exit program"""
572 print("E:", msg, file=sys.stderr)
573 sys.exit(exit_code)
576def warn(msg: str) -> None:
577 """print warning message"""
578 print("W:", msg, file=sys.stderr)
581################################################################################
584def whoami() -> str:
585 """get user name
587 Returns the user name with a laughable attempt at rfc822 conformancy
588 (read: removing stray periods).
589 """
590 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "")
593def getusername() -> str:
594 """get login name"""
595 return pwd.getpwuid(os.getuid())[0]
598################################################################################
601def size_type(c: Union[int, float]) -> str:
602 t = " B"
603 if c > 10240:
604 c = c / 1024
605 t = " KB"
606 if c > 10240: 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true
607 c = c / 1024
608 t = " MB"
609 return "%d%s" % (c, t)
612################################################################################
615def find_next_free(dest: str, too_many: int = 100) -> str:
616 extra = 0
617 orig_dest = dest
618 while os.path.lexists(dest) and extra < too_many:
619 dest = orig_dest + "." + repr(extra)
620 extra += 1
621 if extra >= too_many: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true
622 raise NoFreeFilenameError
623 return dest
626################################################################################
629def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str:
630 return sep.join(x if x is not None else "" for x in original)
633################################################################################
636def prefix_multi_line_string(
637 lines: str, prefix: str, include_blank_lines: bool = False
638) -> str:
639 """prepend `prefix` to each line in `lines`"""
640 return "\n".join(
641 prefix + cleaned_line
642 for line in lines.split("\n")
643 if (cleaned_line := line.strip()) or include_blank_lines
644 )
647################################################################################
650def join_with_commas_and(list: Sequence[str]) -> str:
651 if len(list) == 0: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true
652 return "nothing"
653 if len(list) == 1: 653 ↛ 655line 653 didn't jump to line 655 because the condition on line 653 was always true
654 return list[0]
655 return ", ".join(list[:-1]) + " and " + list[-1]
658################################################################################
661def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str:
662 pp_deps = (
663 f"{pkg} ({constraint} {version})" if constraint else pkg
664 for pkg, constraint, version in deps
665 )
666 return " |".join(pp_deps)
669################################################################################
672def get_conf() -> apt_pkg.Configuration:
673 return Cnf
676################################################################################
679def parse_args(Options: apt_pkg.Configuration) -> tuple[str, str, str, bool]:
680 """Handle -a, -c and -s arguments; returns them as SQL constraints"""
681 # XXX: This should go away and everything which calls it be converted
682 # to use SQLA properly. For now, we'll just fix it not to use
683 # the old Pg interface though
684 session = DBConn().session()
685 # Process suite
686 if Options["Suite"]: 686 ↛ 704line 686 didn't jump to line 704 because the condition on line 686 was always true
687 suite_ids_list = []
688 for suitename in split_args(Options["Suite"]):
689 suite = get_suite(suitename, session=session)
690 if not suite or suite.suite_id is None: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 warn(
692 "suite '%s' not recognised."
693 % (suite and suite.suite_name or suitename)
694 )
695 else:
696 suite_ids_list.append(suite.suite_id)
697 if suite_ids_list: 697 ↛ 702line 697 didn't jump to line 702 because the condition on line 697 was always true
698 con_suites = "AND su.id IN (%s)" % ", ".join(
699 [str(i) for i in suite_ids_list]
700 )
701 else:
702 fubar("No valid suite given.")
703 else:
704 con_suites = ""
706 # Process component
707 if Options["Component"]: 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true
708 component_ids_list = []
709 for componentname in split_args(Options["Component"]):
710 component = get_component(componentname, session=session)
711 if component is None:
712 warn("component '%s' not recognised." % (componentname))
713 else:
714 component_ids_list.append(component.component_id)
715 if component_ids_list:
716 con_components = "AND c.id IN (%s)" % ", ".join(
717 [str(i) for i in component_ids_list]
718 )
719 else:
720 fubar("No valid component given.")
721 else:
722 con_components = ""
724 # Process architecture
725 con_architectures = ""
726 check_source = False
727 if Options["Architecture"]: 727 ↛ 728line 727 didn't jump to line 728 because the condition on line 727 was never true
728 arch_ids_list = []
729 for archname in split_args(Options["Architecture"]):
730 if archname == "source":
731 check_source = True
732 else:
733 arch = get_architecture(archname, session=session)
734 if arch is None:
735 warn("architecture '%s' not recognised." % (archname))
736 else:
737 arch_ids_list.append(arch.arch_id)
738 if arch_ids_list:
739 con_architectures = "AND a.id IN (%s)" % ", ".join(
740 [str(i) for i in arch_ids_list]
741 )
742 else:
743 if not check_source:
744 fubar("No valid architecture given.")
745 else:
746 check_source = True
748 return (con_suites, con_architectures, con_components, check_source)
751################################################################################
754@functools.total_ordering
755class ArchKey:
756 """
757 Key object for use in sorting lists of architectures.
759 Sorts normally except that 'source' dominates all others.
760 """
762 __slots__ = ["arch", "issource"]
764 def __init__(self, arch: str, *args):
765 self.arch = arch
766 self.issource = arch == "source"
768 def __lt__(self, other: "ArchKey") -> bool:
769 if self.issource:
770 return not other.issource
771 if other.issource:
772 return False
773 return self.arch < other.arch
775 @override
776 def __eq__(self, other: object) -> bool:
777 if not isinstance(other, ArchKey): 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true
778 return NotImplemented
779 return self.arch == other.arch
782################################################################################
785def split_args(s: str, dwim: bool = True) -> list[str]:
786 """
787 Split command line arguments which can be separated by either commas
788 or whitespace. If dwim is set, it will complain about string ending
789 in comma since this usually means someone did 'dak ls -a i386, m68k
790 foo' or something and the inevitable confusion resulting from 'm68k'
791 being treated as an argument is undesirable.
792 """
794 if s.find(",") == -1: 794 ↛ 797line 794 didn't jump to line 797 because the condition on line 794 was always true
795 return s.split()
796 else:
797 if s[-1:] == "," and dwim:
798 fubar("split_args: found trailing comma, spurious space maybe?")
799 return s.split(",")
802################################################################################
805def split_args_or_none(s: str | None, dwim: bool = True) -> list[str] | None:
806 """
807 Split command line arguments like `split_args`, but return `None` for empty string
808 """
809 if not s:
810 return None
811 return split_args(s, dwim)
814################################################################################
817def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]:
818 if keyrings is None: 818 ↛ 821line 818 didn't jump to line 821 because the condition on line 818 was always true
819 keyrings = get_active_keyring_paths()
821 return ["--keyring={}".format(path) for path in keyrings]
824################################################################################
827def _gpg_get_addresses_from_listing(output: bytes) -> list[str]:
828 addresses: list[str] = []
830 for line in output.split(b"\n"):
831 parts = line.split(b":")
832 if parts[0] not in (b"uid", b"pub"):
833 continue
834 if parts[1] in (b"i", b"d", b"r"): 834 ↛ 836line 834 didn't jump to line 836 because the condition on line 834 was never true
835 # Skip uid that is invalid, disabled or revoked
836 continue
837 try:
838 uid_bytes = parts[9]
839 except IndexError:
840 continue
841 try:
842 uid = uid_bytes.decode(encoding="utf-8")
843 except UnicodeDecodeError:
844 # If the uid is not valid UTF-8, we assume it is an old uid
845 # still encoding in Latin-1.
846 uid = uid_bytes.decode(encoding="latin1")
847 m = re_parse_maintainer.match(uid)
848 if not m:
849 continue
850 address = m.group(2)
851 if address.endswith("@debian.org"): 851 ↛ 854line 851 didn't jump to line 854 because the condition on line 851 was never true
852 # prefer @debian.org addresses
853 # TODO: maybe not hardcode the domain
854 addresses.insert(0, address)
855 else:
856 addresses.append(address)
858 return addresses
861def gpg_get_key_addresses(fingerprint: str) -> list[str]:
862 """retreive email addresses from gpg key uids for a given fingerprint"""
863 addresses = key_uid_email_cache.get(fingerprint)
864 if addresses is not None:
865 return addresses
867 try:
868 cmd = ["gpg", "--no-default-keyring"]
869 cmd.extend(gpg_keyring_args())
870 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint])
871 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
872 except subprocess.CalledProcessError:
873 addresses = []
874 else:
875 addresses = _gpg_get_addresses_from_listing(output)
877 key_uid_email_cache[fingerprint] = addresses
878 return addresses
881################################################################################
884def open_ldap_connection() -> Any:
885 """open connection to the configured LDAP server"""
886 import ldap # type: ignore
888 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"]
889 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile")
891 conn = ldap.initialize(LDAPServer)
893 if ca_cert_file:
894 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
895 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file)
896 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True)
897 conn.start_tls_s()
899 conn.simple_bind_s("", "")
901 return conn
904################################################################################
907def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]:
908 """retrieve login from LDAP linked to a given fingerprint"""
909 import ldap
911 conn = open_ldap_connection()
912 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
913 Attrs = conn.search_s(
914 LDAPDn,
915 ldap.SCOPE_ONELEVEL,
916 "(keyfingerprint=%s)" % fingerprint,
917 ["uid", "keyfingerprint"],
918 )
919 login: dict[str, str] = {}
920 for elem in Attrs:
921 fpr = elem[1]["keyFingerPrint"][0].decode()
922 uid = elem[1]["uid"][0].decode()
923 login[fpr] = uid
924 return login
927################################################################################
930def get_users_from_ldap() -> dict[str, str]:
931 """retrieve login and user names from LDAP"""
932 import ldap
934 conn = open_ldap_connection()
935 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"]
936 Attrs = conn.search_s(
937 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"]
938 )
939 users: dict[str, str] = {}
940 for elem in Attrs:
941 elem = elem[1]
942 name = []
943 for k in ("cn", "mn", "sn"):
944 try:
945 value = elem[k][0].decode()
946 if value and value[0] != "-":
947 name.append(value)
948 except KeyError:
949 pass
950 users[" ".join(name)] = elem["uid"][0]
951 return users
954################################################################################
957def clean_symlink(src: str, dest: str, root: str) -> str:
958 """
959 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'.
960 Returns fixed 'src'
961 """
962 src = src.replace(root, "", 1)
963 dest = dest.replace(root, "", 1)
964 dest = os.path.dirname(dest)
965 new_src = "../" * len(dest.split("/"))
966 return new_src + src
969################################################################################
972def temp_dirname(
973 parent: Optional[str] = None,
974 prefix: str = "dak",
975 suffix: str = "",
976 mode: Optional[int] = None,
977 group: Optional[str] = None,
978) -> str:
979 """
980 Return a secure and unique directory by pre-creating it.
982 :param parent: If non-null it will be the directory the directory is pre-created in.
983 :param prefix: The filename will be prefixed with this string
984 :param suffix: The filename will end with this string
985 :param mode: If set the file will get chmodded to those permissions
986 :param group: If set the file will get chgrped to the specified group.
987 :return: Returns a pair (fd, name)
989 """
991 tfname = tempfile.mkdtemp(suffix, prefix, parent)
992 if mode is not None: 992 ↛ 994line 992 didn't jump to line 994 because the condition on line 992 was always true
993 os.chmod(tfname, mode)
994 if group is not None: 994 ↛ 995line 994 didn't jump to line 995 because the condition on line 994 was never true
995 gid = grp.getgrnam(group).gr_gid
996 os.chown(tfname, -1, gid)
997 return tfname
1000################################################################################
1003def get_changes_files(from_dir: str) -> list[str]:
1004 """
1005 Takes a directory and lists all .changes files in it (as well as chdir'ing
1006 to the directory; this is due to broken behaviour on the part of p-u/p-a
1007 when you're not in the right place)
1009 Returns a list of filenames
1010 """
1011 try:
1012 # Much of the rest of p-u/p-a depends on being in the right place
1013 os.chdir(from_dir)
1014 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")]
1015 except OSError as e:
1016 fubar("Failed to read list from directory %s (%s)" % (from_dir, e))
1018 return changes_files
1021################################################################################
1024Cnf: apt_pkg.Configuration = config.Config().Cnf
1026################################################################################
1029def parse_wnpp_bug_file(
1030 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm",
1031) -> dict[str, list[str]]:
1032 """
1033 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm
1034 Well, actually it parsed a local copy, but let's document the source
1035 somewhere ;)
1037 returns a dict associating source package name with a list of open wnpp
1038 bugs (Yes, there might be more than one)
1039 """
1041 try:
1042 with open(file) as f:
1043 lines = f.readlines()
1044 except OSError:
1045 print(
1046 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any."
1047 % file
1048 )
1049 lines = []
1050 wnpp = {}
1052 for line in lines:
1053 splited_line = line.split(": ", 1)
1054 if len(splited_line) > 1:
1055 wnpp[splited_line[0]] = splited_line[1].split("|")
1057 for source in wnpp:
1058 bugs = []
1059 for wnpp_bug in wnpp[source]:
1060 m = re.search(r"(\d)+", wnpp_bug)
1061 assert m is not None
1062 bug_no = m.group()
1063 if bug_no:
1064 bugs.append(bug_no)
1065 wnpp[source] = bugs
1066 return wnpp
1069################################################################################
1072def deb_extract_control(path: str) -> bytes:
1073 """extract DEBIAN/control from a binary package"""
1074 return apt_inst.DebFile(path).control.extractdata("control")
1077################################################################################
1080def mail_addresses_for_upload(
1081 maintainer: str,
1082 changed_by: str,
1083 fingerprint: str,
1084 authorized_by_fingerprint: Optional[str],
1085) -> list[str]:
1086 """mail addresses to contact for an upload
1088 :param maintainer: Maintainer field of the .changes file
1089 :param changed_by: Changed-By field of the .changes file
1090 :param fingerprint: fingerprint of the key used to sign the upload
1091 :return: list of RFC 2047-encoded mail addresses to contact regarding
1092 this upload
1093 """
1094 recipients = Cnf.value_list("Dinstall::UploadMailRecipients")
1095 if not recipients: 1095 ↛ 1104line 1095 didn't jump to line 1104 because the condition on line 1095 was always true
1096 recipients = [
1097 "maintainer",
1098 "changed_by",
1099 "signer",
1100 "authorized_by",
1101 ]
1103 # Ensure signer and authorized_by are last if present
1104 for r in ("signer", "authorized_by"):
1105 try:
1106 recipients.remove(r)
1107 except ValueError:
1108 pass
1109 else:
1110 recipients.append(r)
1112 # Compute the set of addresses of the recipients
1113 addresses = set() # Name + email
1114 emails = set() # Email only, used to avoid duplicates
1115 for recipient in recipients:
1116 address: str | None
1117 if recipient.startswith("mail:"): # Email hardcoded in config 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true
1118 address = recipient[5:]
1119 elif recipient == "maintainer":
1120 address = maintainer
1121 elif recipient == "changed_by":
1122 address = changed_by
1123 elif recipient == "signer" or recipient == "authorized_by": 1123 ↛ 1133line 1123 didn't jump to line 1133 because the condition on line 1123 was always true
1124 fpr = fingerprint if recipient == "signer" else authorized_by_fingerprint
1125 if not fpr: 1125 ↛ 1126line 1125 didn't jump to line 1126 because the condition on line 1125 was never true
1126 continue
1127 fpr_addresses = gpg_get_key_addresses(fpr)
1128 address = fpr_addresses[0] if fpr_addresses else None
1129 if any(x in emails for x in fpr_addresses):
1130 # The signer already gets a copy via another email
1131 address = None
1132 else:
1133 raise Exception(
1134 "Unsupported entry in {0}: {1}".format(
1135 "Dinstall::UploadMailRecipients", recipient
1136 )
1137 )
1139 if address is not None:
1140 mail = fix_maintainer(address)[3]
1141 if mail not in emails:
1142 addresses.add(address)
1143 emails.add(mail)
1145 encoded_addresses = [fix_maintainer(e)[1] for e in addresses]
1146 return encoded_addresses
1149################################################################################
1152def call_editor_for_file(path: str) -> None:
1153 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor"))
1154 subprocess.check_call([editor, path])
1157################################################################################
1160def call_editor(text: str = "", suffix: str = ".txt") -> str:
1161 """run editor and return the result as a string
1163 :param text: initial text
1164 :param suffix: extension for temporary file
1165 :return: string with the edited text
1166 """
1167 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh:
1168 print(text, end="", file=fh)
1169 fh.flush()
1170 call_editor_for_file(fh.name)
1171 fh.seek(0)
1172 return fh.read()
1175################################################################################
1178def check_reverse_depends(
1179 removals: Iterable[str],
1180 suite: str,
1181 arches: Optional[Iterable[str]] = None,
1182 session: "Session | None" = None,
1183 cruft: bool = False,
1184 quiet: bool = False,
1185 include_arch_all: bool = True,
1186) -> bool:
1187 assert session is not None # TODO: remove default value...
1189 dbsuite = get_suite(suite, session)
1190 assert dbsuite is not None
1191 overridesuite = (
1192 get_suite(dbsuite.overridesuite, session) if dbsuite.overridesuite else dbsuite
1193 )
1194 assert overridesuite is not None
1195 dep_problem = False
1196 p2c = {}
1197 all_broken: defaultdict[str, defaultdict[str, set[str]]] = defaultdict( 1197 ↛ exitline 1197 didn't jump to the function exit
1198 lambda: defaultdict(set)
1199 )
1200 if arches: 1200 ↛ 1201line 1200 didn't jump to line 1201 because the condition on line 1200 was never true
1201 all_arches = set(arches)
1202 else:
1203 all_arches = set(
1204 x.arch_string for x in get_suite_architectures(suite, session=session)
1205 )
1206 all_arches -= set(["source", "all"])
1207 removal_set = set(removals)
1208 metakey_d = get_or_set_metadatakey("Depends", session)
1209 metakey_p = get_or_set_metadatakey("Provides", session)
1210 if include_arch_all: 1210 ↛ 1213line 1210 didn't jump to line 1213 because the condition on line 1210 was always true
1211 rdep_architectures = all_arches | set(["all"])
1212 else:
1213 rdep_architectures = all_arches
1214 for architecture in rdep_architectures:
1215 deps = {}
1216 sources = {}
1217 virtual_packages = {}
1219 params: dict[str, object] = {
1220 "suite_id": dbsuite.suite_id,
1221 "metakey_d_id": metakey_d.key_id,
1222 "metakey_p_id": metakey_p.key_id,
1223 }
1224 arch = get_architecture(architecture, session)
1225 if arch is None: 1225 ↛ 1226line 1225 didn't jump to line 1226 because the condition on line 1225 was never true
1226 continue
1227 params["arch_id"] = arch.arch_id
1229 statement = sql.text(
1230 """
1231 SELECT b.package, s.source, c.name as component,
1232 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends,
1233 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides
1234 FROM binaries b
1235 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id
1236 JOIN source s ON b.source = s.id
1237 JOIN files_archive_map af ON b.file = af.file_id
1238 JOIN component c ON af.component_id = c.id
1239 WHERE b.architecture = :arch_id"""
1240 )
1241 query = session.execute(statement, params)
1242 for package, source, component, depends, provides in query:
1243 sources[package] = source
1244 p2c[package] = component
1245 if depends is not None: 1245 ↛ 1246line 1245 didn't jump to line 1246 because the condition on line 1245 was never true
1246 deps[package] = depends
1247 # Maintain a counter for each virtual package. If a
1248 # Provides: exists, set the counter to 0 and count all
1249 # provides by a package not in the list for removal.
1250 # If the counter stays 0 at the end, we know that only
1251 # the to-be-removed packages provided this virtual
1252 # package.
1253 if provides is not None: 1253 ↛ 1254line 1253 didn't jump to line 1254 because the condition on line 1253 was never true
1254 for virtual_pkg in provides.split(","):
1255 virtual_pkg = virtual_pkg.strip()
1256 if virtual_pkg == package:
1257 continue
1258 if virtual_pkg not in virtual_packages:
1259 virtual_packages[virtual_pkg] = 0
1260 if package not in removals:
1261 virtual_packages[virtual_pkg] += 1
1263 # If a virtual package is only provided by the to-be-removed
1264 # packages, treat the virtual package as to-be-removed too.
1265 removal_set.update(
1266 virtual_pkg
1267 for virtual_pkg in virtual_packages
1268 if not virtual_packages[virtual_pkg]
1269 )
1271 # Check binary dependencies (Depends)
1272 for package in deps: 1272 ↛ 1273line 1272 didn't jump to line 1273 because the loop on line 1272 never started
1273 if package in removals:
1274 continue
1275 try:
1276 parsed_dep = apt_pkg.parse_depends(deps[package])
1277 except ValueError as e:
1278 print("Error for package %s: %s" % (package, e))
1279 parsed_dep = []
1280 for dep in parsed_dep:
1281 # Check for partial breakage. If a package has a ORed
1282 # dependency, there is only a dependency problem if all
1283 # packages in the ORed depends will be removed.
1284 unsat = 0
1285 for dep_package, _, _ in dep:
1286 if dep_package in removals:
1287 unsat += 1
1288 if unsat == len(dep):
1289 component = p2c[package]
1290 source = sources[package]
1291 if component != "main":
1292 source = "%s/%s" % (source, component)
1293 all_broken[source][package].add(architecture)
1294 dep_problem = True
1296 if all_broken and not quiet: 1296 ↛ 1297line 1296 didn't jump to line 1297 because the condition on line 1296 was never true
1297 if cruft:
1298 print(" - broken Depends:")
1299 else:
1300 print("# Broken Depends:")
1301 for source, bindict in sorted(all_broken.items()):
1302 lines = []
1303 for binary, bin_arches in sorted(bindict.items()):
1304 if bin_arches == all_arches or "all" in bin_arches:
1305 lines.append(binary)
1306 else:
1307 lines.append("%s [%s]" % (binary, " ".join(sorted(bin_arches))))
1308 if cruft:
1309 print(" %s: %s" % (source, lines[0]))
1310 else:
1311 print("%s: %s" % (source, lines[0]))
1312 for line in lines[1:]:
1313 if cruft:
1314 print(" " + " " * (len(source) + 2) + line)
1315 else:
1316 print(" " * (len(source) + 2) + line)
1317 if not cruft:
1318 print()
1320 # Check source dependencies (Build-Depends and Build-Depends-Indep)
1321 all_broken_bd: dict[str, set[str]] = defaultdict(set)
1322 metakey_bd = get_or_set_metadatakey("Build-Depends", session)
1323 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session)
1324 metakey_ids: tuple[int, ...]
1325 if include_arch_all: 1325 ↛ 1328line 1325 didn't jump to line 1328 because the condition on line 1325 was always true
1326 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id)
1327 else:
1328 metakey_ids = (metakey_bd.key_id,)
1330 params = {
1331 "suite_id": dbsuite.suite_id,
1332 "metakey_ids": metakey_ids,
1333 }
1334 statement = sql.text(
1335 """
1336 SELECT s.source, replace(string_agg(trim(sm.value), ', '), ',,', ',') as build_dep
1337 FROM source s
1338 JOIN source_metadata sm ON s.id = sm.src_id
1339 WHERE s.id in
1340 (SELECT src FROM newest_src_association
1341 WHERE suite = :suite_id)
1342 AND sm.key_id in :metakey_ids
1343 GROUP BY s.id, s.source"""
1344 )
1345 query = session.execute(statement, params)
1346 for source, build_dep in query:
1347 if source in removals:
1348 continue
1349 parsed_dep = []
1350 if build_dep is not None: 1350 ↛ 1357line 1350 didn't jump to line 1357 because the condition on line 1350 was always true
1351 # Remove [arch] information since we want to see breakage on all arches
1352 build_dep = re_build_dep_arch.sub("", build_dep)
1353 try:
1354 parsed_dep = apt_pkg.parse_src_depends(build_dep)
1355 except ValueError as e:
1356 print("Error for source %s: %s" % (source, e))
1357 for dep in parsed_dep:
1358 unsat = 0
1359 for dep_package, _, _ in dep:
1360 if dep_package in removals: 1360 ↛ 1361line 1360 didn't jump to line 1361 because the condition on line 1360 was never true
1361 unsat += 1
1362 if unsat == len(dep): 1362 ↛ 1363line 1362 didn't jump to line 1363
1363 component = (
1364 session.query(Component.component_name)
1365 .join(Component.overrides)
1366 .filter(Override.suite == overridesuite)
1367 .filter(
1368 Override.package
1369 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source)
1370 )
1371 .join(Override.overridetype)
1372 .filter(OverrideType.overridetype == "dsc")
1373 .scalar()
1374 )
1375 key = source
1376 if component != "main":
1377 key = "%s/%s" % (source, component)
1378 all_broken_bd[key].add(pp_deps(dep))
1379 dep_problem = True
1381 if all_broken_bd and not quiet: 1381 ↛ 1382line 1381 didn't jump to line 1382 because the condition on line 1381 was never true
1382 if cruft:
1383 print(" - broken Build-Depends:")
1384 else:
1385 print("# Broken Build-Depends:")
1386 for source, bdeps in sorted(all_broken_bd.items()):
1387 sorted_bdeps = sorted(bdeps)
1388 if cruft:
1389 print(" %s: %s" % (source, sorted_bdeps[0]))
1390 else:
1391 print("%s: %s" % (source, sorted_bdeps[0]))
1392 for bdep in sorted_bdeps[1:]:
1393 if cruft:
1394 print(" " + " " * (len(source) + 2) + bdep)
1395 else:
1396 print(" " * (len(source) + 2) + bdep)
1397 if not cruft:
1398 print()
1400 return dep_problem
1403################################################################################
1406def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]:
1407 """source packages referenced via Built-Using
1409 :param control: control file to take Built-Using field from
1410 :return: list of (source_name, source_version) pairs
1411 """
1412 built_using = control.get("Built-Using", None)
1413 if built_using is None:
1414 return []
1416 bu = []
1417 for dep in apt_pkg.parse_depends(built_using):
1418 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field"
1419 source_name, source_version, comp = dep[0]
1420 assert comp == "=", "Built-Using must contain strict dependencies"
1421 bu.append((source_name, source_version))
1423 return bu
1426################################################################################
1429def is_in_debug_section(control: Mapping[str, str] | MetadataProxy) -> bool:
1430 """binary package is a debug package
1432 :param control: control file of binary package
1433 :return: True if the binary package is a debug package
1434 """
1435 section = control["Section"].split("/", 1)[-1]
1436 auto_built_package = control.get("Auto-Built-Package")
1437 return section == "debug" and auto_built_package == "debug-symbols"
1440################################################################################
1443def find_possibly_compressed_file(filename: str) -> str:
1444 """
1446 :param filename: path to a control file (Sources, Packages, etc) to
1447 look for
1448 :return: path to the (possibly compressed) control file, or null if the
1449 file doesn't exist
1450 """
1451 _compressions = ("", ".xz", ".gz", ".bz2")
1453 for ext in _compressions: 1453 ↛ 1458line 1453 didn't jump to line 1458 because the loop on line 1453 didn't complete
1454 _file = filename + ext
1455 if os.path.exists(_file):
1456 return _file
1458 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
1461################################################################################
1464def parse_boolean_from_user(value: str) -> bool:
1465 value = value.lower()
1466 if value in {"yes", "true", "enable", "enabled"}:
1467 return True
1468 if value in {"no", "false", "disable", "disabled"}: 1468 ↛ 1470line 1468 didn't jump to line 1470 because the condition on line 1468 was always true
1469 return False
1470 raise ValueError("Not sure whether %s should be a True or a False" % value)
1473_DURATION_RE = re.compile(r"(?P<amount>\d+)\s*(?P<unit>[hms])", re.IGNORECASE)
1476def parse_duration(value: str) -> datetime.timedelta:
1477 """Parse duration from strings like "1h", "30m", or "1h 15m 10s"."""
1478 value = value.strip()
1479 if not value:
1480 raise ValueError("Duration must be non-empty")
1482 total_seconds = 0
1483 for match in _DURATION_RE.finditer(value):
1484 amount = int(match.group("amount"))
1485 unit = match.group("unit").lower()
1486 if unit == "h":
1487 total_seconds += amount * 3600
1488 elif unit == "m":
1489 total_seconds += amount * 60
1490 else:
1491 total_seconds += amount
1493 remainder = _DURATION_RE.sub("", value).strip(" ,")
1494 if remainder:
1495 raise ValueError("Invalid duration: %s" % value)
1496 if total_seconds <= 0:
1497 raise ValueError("Duration must be greater than zero")
1498 return datetime.timedelta(seconds=total_seconds)
1501def suite_suffix(suite_name: str) -> str:
1502 """Return suite_suffix for the given suite"""
1503 suffix = Cnf.find("Dinstall::SuiteSuffix", "")
1504 if suffix == "": 1504 ↛ 1506line 1504 didn't jump to line 1506 because the condition on line 1504 was always true
1505 return ""
1506 elif "Dinstall::SuiteSuffixSuites" not in Cnf:
1507 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future
1508 return suffix
1509 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"):
1510 return suffix
1511 return ""
1514################################################################################
1517def process_buildinfos(
1518 directory: str,
1519 buildinfo_files: "Iterable[daklib.upload.HashedFile]",
1520 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1521 logger: "daklib.daklog.Logger",
1522) -> None:
1523 """Copy buildinfo files into Dir::BuildinfoArchive
1525 :param directory: directory where .changes is stored
1526 :param buildinfo_files: names of buildinfo files
1527 :param fs_transaction: FilesystemTransaction instance
1528 :param logger: logger instance
1529 """
1531 if "Dir::BuildinfoArchive" not in Cnf:
1532 return
1534 target_dir = os.path.join(
1535 Cnf["Dir::BuildinfoArchive"],
1536 datetime.datetime.now().strftime("%Y/%m/%d"),
1537 )
1539 for f in buildinfo_files:
1540 src = os.path.join(directory, f.filename)
1541 dst = find_next_free(os.path.join(target_dir, f.filename))
1543 logger.log(["Archiving", f.filename])
1544 fs_transaction.copy(src, dst, mode=0o644)
1547################################################################################
1550def move_to_morgue(
1551 morguesubdir: str,
1552 filenames: Iterable[str],
1553 fs_transaction: "daklib.fstransactions.FilesystemTransaction",
1554 logger: "daklib.daklog.Logger",
1555) -> None:
1556 """Move a file to the correct dir in morgue
1558 :param morguesubdir: subdirectory of morgue where this file needs to go
1559 :param filenames: names of files
1560 :param fs_transaction: FilesystemTransaction instance
1561 :param logger: logger instance
1562 """
1564 assert Cnf["Dir::Base"]
1565 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf["Dir::Base"], "morgue"))
1567 # Build directory as morguedir/morguesubdir/year/month/day
1568 now = datetime.datetime.now()
1569 dest = os.path.join(
1570 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day
1571 )
1573 for filename in filenames:
1574 dest_filename = dest + "/" + os.path.basename(filename)
1575 # If the destination file exists; try to find another filename to use
1576 if os.path.lexists(dest_filename): 1576 ↛ 1577line 1576 didn't jump to line 1577 because the condition on line 1576 was never true
1577 dest_filename = find_next_free(dest_filename)
1578 logger.log(["move to morgue", filename, dest_filename])
1579 fs_transaction.move(filename, dest_filename)
1582################################################################################
1585def resolve_relative_path(base_path: StrPath, relative_path: StrPath) -> Path:
1586 """Resolve absolute path joining base_path and relative_path
1588 Raises:
1589 ValueError: when the resolved path is not relative to base_path
1590 """
1591 base = Path(base_path).resolve()
1592 path = (base / relative_path).resolve()
1594 if not path.is_relative_to(base): 1594 ↛ 1595line 1594 didn't jump to line 1595 because the condition on line 1594 was never true
1595 raise ValueError(f"Path {relative_path} is not relative to {base_path}: {path}")
1597 return path
1600################################################################################
1603def remove_unsafe_symlinks(base_path: StrPath) -> None:
1604 """Remove all unsafe symlinks below base_path"""
1605 base = Path(base_path).resolve()
1606 for dirpath, _, filenames in base.walk():
1607 for filename in filenames:
1608 current_dir = base / dirpath
1609 path = current_dir / filename
1610 if not path.is_symlink(): 1610 ↛ 1612line 1610 didn't jump to line 1612 because the condition on line 1610 was always true
1611 continue
1612 target = (current_dir / path.readlink()).resolve()
1613 if not target.is_relative_to(base):
1614 path.unlink()