Coverage for daklib/utils.py: 59%

758 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1# vim:set et ts=4 sw=4: 

2 

3"""Utility functions 

4 

5@contact: Debian FTP Master <ftpmaster@debian.org> 

6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org> 

7@license: GNU General Public License version 2 or later 

8""" 

9 

10# This program is free software; you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation; either version 2 of the License, or 

13# (at your option) any later version. 

14 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19 

20# You should have received a copy of the GNU General Public License 

21# along with this program; if not, write to the Free Software 

22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

23 

24import datetime 

25import email.policy 

26import errno 

27import functools 

28import grp 

29import os 

30import pwd 

31import re 

32import shutil 

33import subprocess 

34import sys 

35import tempfile 

36from collections import defaultdict 

37from collections.abc import Collection, Iterable, Mapping, Sequence 

38from pathlib import Path 

39from typing import TYPE_CHECKING, Any, Literal, NoReturn, Optional, Union, override 

40 

41import apt_inst 

42import apt_pkg 

43import sqlalchemy.sql as sql 

44 

45import daklib.config as config 

46import daklib.mail 

47from daklib.dbconn import ( 

48 Component, 

49 DBConn, 

50 MetadataProxy, 

51 Override, 

52 OverrideType, 

53 get_active_keyring_paths, 

54 get_architecture, 

55 get_component, 

56 get_or_set_metadatakey, 

57 get_suite, 

58 get_suite_architectures, 

59) 

60 

61from .dak_exceptions import ( 

62 InvalidDscError, 

63 NoFilesFieldError, 

64 NoFreeFilenameError, 

65 ParseChangesError, 

66 SendmailFailedError, 

67 UnknownFormatError, 

68) 

69from .formats import parse_format, validate_changes_format 

70from .gpg import SignedFile 

71from .regexes import ( 

72 re_build_dep_arch, 

73 re_issource, 

74 re_multi_line_field, 

75 re_parse_maintainer, 

76 re_re_mark, 

77 re_single_line_field, 

78 re_srchasver, 

79 re_whitespace_comment, 

80) 

81from .srcformats import get_format_from_string 

82from .textutils import fix_maintainer 

83 

84if TYPE_CHECKING: 

85 from sqlalchemy.orm import Session 

86 

87 import daklib.daklog 

88 import daklib.fstransactions 

89 import daklib.upload 

90 

91type StrPath = str | os.PathLike[str] 

92 

93 

94################################################################################ 

95 

96key_uid_email_cache: dict[str, list[str]] = ( 

97 {} 

98) #: Cache for email addresses from gpg key uids 

99 

100################################################################################ 

101 

102 

103def input_or_exit(prompt: Optional[str] = None) -> str: 

104 try: 

105 return input(prompt) 

106 except EOFError: 

107 sys.exit("\nUser interrupt (^D).") 

108 

109 

110################################################################################ 

111 

112 

113def extract_component_from_section(section: str) -> tuple[str, str]: 

114 """split "section" into "section", "component" parts 

115 

116 If "component" is not given, "main" is used instead. 

117 

118 :return: tuple (section, component) 

119 """ 

120 if section.find("/") != -1: 

121 return section, section.split("/", 1)[0] 

122 return section, "main" 

123 

124 

125################################################################################ 

126 

127 

128def parse_deb822( 

129 armored_contents: bytes, 

130 signing_rules: Literal[-1, 0, 1] = 0, 

131 keyrings: Collection[str] | None = None, 

132) -> dict[str, str]: 

133 require_signature = True 

134 if keyrings is None: 134 ↛ 138line 134 didn't jump to line 138 because the condition on line 134 was always true

135 keyrings = [] 

136 require_signature = False 

137 

138 signed_file = SignedFile( 

139 armored_contents, keyrings=keyrings, require_signature=require_signature 

140 ) 

141 contents = signed_file.contents.decode("utf-8") 

142 

143 error = "" 

144 changes = {} 

145 

146 # Split the lines in the input, keeping the linebreaks. 

147 lines = contents.splitlines(True) 

148 

149 if len(lines) == 0: 

150 raise ParseChangesError("[Empty changes file]") 

151 

152 # Reindex by line number so we can easily verify the format of 

153 # .dsc files... 

154 index = 0 

155 indexed_lines = {} 

156 for line in lines: 

157 index += 1 

158 indexed_lines[index] = line[:-1] 

159 

160 num_of_lines = len(indexed_lines) 

161 index = 0 

162 first = -1 

163 while index < num_of_lines: 

164 index += 1 

165 line = indexed_lines[index] 

166 if line == "" and signing_rules == 1: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 if index != num_of_lines: 

168 raise InvalidDscError(index) 

169 break 

170 if slf := re_single_line_field.match(line): 

171 field = slf.groups()[0].lower() 

172 changes[field] = slf.groups()[1] 

173 first = 1 

174 continue 

175 if line == " .": 

176 changes[field] += "\n" 

177 continue 

178 if mlf := re_multi_line_field.match(line): 

179 if first == -1: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 raise ParseChangesError( 

181 "'%s'\n [Multi-line field continuing on from nothing?]" % (line) 

182 ) 

183 if first == 1 and changes[field] != "": 

184 changes[field] += "\n" 

185 first = 0 

186 changes[field] += mlf.groups()[0] + "\n" 

187 continue 

188 error += line 

189 

190 changes["filecontents"] = armored_contents.decode() 

191 

192 if "source" in changes: 

193 # Strip the source version in brackets from the source field, 

194 # put it in the "source-version" field instead. 

195 if srcver := re_srchasver.search(changes["source"]): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 changes["source"] = srcver.group(1) 

197 changes["source-version"] = srcver.group(2) 

198 

199 if error: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 raise ParseChangesError(error) 

201 

202 return changes 

203 

204 

205################################################################################ 

206 

207 

208def parse_changes( 

209 filename: str, 

210 signing_rules: Literal[-1, 0, 1] = 0, 

211 dsc_file: bool = False, 

212 keyrings: Collection[str] | None = None, 

213) -> dict[str, str]: 

214 """ 

215 Parses a changes or source control (.dsc) file and returns a dictionary 

216 where each field is a key. The mandatory first argument is the 

217 filename of the .changes file. 

218 

219 signing_rules is an optional argument: 

220 

221 - If signing_rules == -1, no signature is required. 

222 - If signing_rules == 0 (the default), a signature is required. 

223 - If signing_rules == 1, it turns on the same strict format checking 

224 as dpkg-source. 

225 

226 The rules for (signing_rules == 1)-mode are: 

227 

228 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----" 

229 followed by any PGP header data and must end with a blank line. 

230 

231 - The data section must end with a blank line and must be followed by 

232 "-----BEGIN PGP SIGNATURE-----". 

233 

234 :param dsc_file: `filename` is a Debian source control (.dsc) file 

235 """ 

236 

237 with open(filename, "rb") as changes_in: 

238 content = changes_in.read() 

239 changes = parse_deb822(content, signing_rules, keyrings=keyrings) 

240 

241 if not dsc_file: 

242 # Finally ensure that everything needed for .changes is there 

243 must_keywords = ( 

244 "Format", 

245 "Date", 

246 "Source", 

247 "Architecture", 

248 "Version", 

249 "Distribution", 

250 "Maintainer", 

251 "Changes", 

252 "Files", 

253 ) 

254 

255 missingfields = [] 

256 for keyword in must_keywords: 

257 if keyword.lower() not in changes: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 missingfields.append(keyword) 

259 

260 if len(missingfields): 

261 raise ParseChangesError( 

262 "Missing mandatory field(s) in changes file (policy 5.5): %s" 

263 % (missingfields) 

264 ) 

265 

266 return changes 

267 

268 

269################################################################################ 

270 

271 

272def check_dsc_files( 

273 dsc_filename: str, 

274 dsc: Mapping[str, str], 

275 dsc_files: Iterable[str], 

276) -> list[str]: 

277 """ 

278 Verify that the files listed in the Files field of the .dsc are 

279 those expected given the announced Format. 

280 

281 :param dsc_filename: path of .dsc file 

282 :param dsc: the content of the .dsc parsed by :func:`parse_changes` 

283 :param dsc_files: the file list returned by :func:`build_file_list` 

284 :return: all errors detected 

285 """ 

286 rejmsg = [] 

287 

288 # Ensure .dsc lists proper set of source files according to the format 

289 # announced 

290 has: defaultdict[str, int] = defaultdict(lambda: 0) 

291 

292 ftype_lookup = ( 

293 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)), 

294 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")), 

295 (r"diff\.gz", ("debian_diff",)), 

296 (r"tar\.gz", ("native_tar_gz", "native_tar")), 

297 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)), 

298 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)), 

299 (r"tar\.(gz|bz2|xz)", ("native_tar",)), 

300 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)), 

301 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)), 

302 ) 

303 

304 for f in dsc_files: 

305 m = re_issource.match(f) 

306 if not m: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 rejmsg.append( 

308 "%s: %s in Files field not recognised as source." % (dsc_filename, f) 

309 ) 

310 continue 

311 

312 # Populate 'has' dictionary by resolving keys in lookup table 

313 matched = False 

314 for regex, keys in ftype_lookup: 314 ↛ 322line 314 didn't jump to line 322 because the loop on line 314 didn't complete

315 if re.match(regex, m.group(3)): 

316 matched = True 

317 for key in keys: 

318 has[key] += 1 

319 break 

320 

321 # File does not match anything in lookup table; reject 

322 if not matched: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f)) 

324 break 

325 

326 # Check for multiple files 

327 for file_type in ( 

328 "orig_tar", 

329 "orig_tar_sig", 

330 "native_tar", 

331 "debian_tar", 

332 "debian_diff", 

333 ): 

334 if has[file_type] > 1: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type)) 

336 

337 # Source format specific tests 

338 try: 

339 format = get_format_from_string(dsc["format"]) 

340 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)]) 

341 

342 except UnknownFormatError: 

343 # Not an error here for now 

344 pass 

345 

346 return rejmsg 

347 

348 

349################################################################################ 

350 

351# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl 

352 

353 

354def build_file_list( 

355 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum" 

356) -> dict[str, dict[str, str]]: 

357 files = {} 

358 

359 # Make sure we have a Files: field to parse... 

360 if field not in changes: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 raise NoFilesFieldError 

362 

363 # Validate .changes Format: field 

364 if not is_a_dsc: 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true

365 validate_changes_format(parse_format(changes["format"]), field) 

366 

367 includes_section = (not is_a_dsc) and field == "files" 

368 

369 # Parse each entry/line: 

370 for i in changes[field].split("\n"): 370 ↛ 395line 370 didn't jump to line 395 because the loop on line 370 didn't complete

371 if not i: 

372 break 

373 s = i.split() 

374 section = priority = "" 

375 try: 

376 if includes_section: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 (md5, size, section, priority, name) = s 

378 else: 

379 (md5, size, name) = s 

380 except ValueError: 

381 raise ParseChangesError(i) 

382 

383 if section == "": 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true

384 section = "-" 

385 if priority == "": 385 ↛ 388line 385 didn't jump to line 388 because the condition on line 385 was always true

386 priority = "-" 

387 

388 (section, component) = extract_component_from_section(section) 

389 

390 files[name] = dict( 

391 size=size, section=section, priority=priority, component=component 

392 ) 

393 files[name][hashname] = md5 

394 

395 return files 

396 

397 

398################################################################################ 

399 

400 

401def send_mail(message: str, whitelists: Optional[list[str | None]] = None) -> None: 

402 """sendmail wrapper, takes a message string 

403 

404 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists 

405 everything, otherwise an address is whitelisted if it is 

406 included in any of the lists. 

407 In addition a global whitelist can be specified in 

408 Dinstall::MailWhiteList. 

409 """ 

410 

411 msg = daklib.mail.parse_mail(message) 

412 

413 # The incoming message might be UTF-8, but outgoing mail should 

414 # use a legacy-compatible encoding. Set the content to the 

415 # text to make sure this is the case. 

416 # Note that this does not work with multipart messages. 

417 msg.set_content(msg.get_payload(), cte="quoted-printable") 

418 

419 # Check whether we're supposed to be sending mail 

420 call_sendmail = True 

421 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]: 

422 call_sendmail = False 

423 

424 if whitelists is None or None in whitelists: 

425 whitelists = [] 

426 if Cnf.get("Dinstall::MailWhiteList", ""): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 whitelists.append(Cnf["Dinstall::MailWhiteList"]) 

428 if len(whitelists) != 0: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 whitelist = [] 

430 for path in whitelists: 

431 assert path is not None 

432 with open(path, "r") as whitelist_in: 

433 for line in whitelist_in: 

434 if not re_whitespace_comment.match(line): 

435 if re_re_mark.match(line): 

436 whitelist.append( 

437 re.compile(re_re_mark.sub("", line.strip(), 1)) 

438 ) 

439 else: 

440 whitelist.append(re.compile(re.escape(line.strip()))) 

441 

442 # Fields to check. 

443 fields = ["To", "Bcc", "Cc"] 

444 for field in fields: 

445 # Check each field 

446 value = msg.get(field, None) 

447 if value is not None: 

448 match = [] 

449 for item in value.split(","): 

450 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer( 

451 item.strip() 

452 ) 

453 mail_whitelisted = 0 

454 for wr in whitelist: 

455 if wr.match(mail): 

456 mail_whitelisted = 1 

457 break 

458 if not mail_whitelisted: 

459 print("Skipping {0} since it's not whitelisted".format(item)) 

460 continue 

461 match.append(item) 

462 

463 # Doesn't have any mail in whitelist so remove the header 

464 if len(match) == 0: 

465 del msg[field] 

466 else: 

467 msg.replace_header(field, ", ".join(match)) 

468 

469 # Change message fields in order if we don't have a To header 

470 if "To" not in msg: 

471 fields.reverse() 

472 for field in fields: 

473 if field in msg: 

474 msg[fields[-1]] = msg[field] 

475 del msg[field] 

476 break 

477 else: 

478 # return, as we removed all recipients. 

479 call_sendmail = False 

480 

481 # sign mail 

482 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 482 ↛ 491line 482 didn't jump to line 491 because the condition on line 482 was always true

483 msg = daklib.mail.sign_mail( 

484 msg, 

485 keyids=[mailkey], 

486 pubring=Cnf.get("Dinstall::SigningPubKeyring") or None, 

487 homedir=Cnf.get("Dinstall::SigningHomedir") or None, 

488 passphrase_file=Cnf.get("Dinstall::SigningPassphraseFile") or None, 

489 ) 

490 

491 msg_bytes = msg.as_bytes(policy=email.policy.default) 

492 

493 maildir = Cnf.get("Dir::Mail") 

494 if maildir: 494 ↛ 501line 494 didn't jump to line 501 because the condition on line 494 was always true

495 path = os.path.join(maildir, datetime.datetime.now().isoformat()) 

496 path = find_next_free(path) 

497 with open(path, "wb") as fh: 

498 fh.write(msg_bytes) 

499 

500 # Invoke sendmail 

501 if not call_sendmail: 

502 return 

503 try: 

504 subprocess.run( 

505 Cnf["Dinstall::SendmailCommand"].split(), 

506 input=msg_bytes, 

507 check=True, 

508 stdout=subprocess.PIPE, 

509 stderr=subprocess.STDOUT, 

510 ) 

511 except subprocess.CalledProcessError as e: 

512 raise SendmailFailedError(e.output.decode().rstrip()) 

513 

514 

515################################################################################ 

516 

517 

518def poolify(source: str) -> str: 

519 """convert `source` name into directory path used in pool structure""" 

520 if source[:3] == "lib": 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 return source[:4] + "/" + source + "/" 

522 else: 

523 return source[:1] + "/" + source + "/" 

524 

525 

526################################################################################ 

527 

528 

529def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None: 

530 if os.path.exists(dest) and os.path.isdir(dest): 

531 dest_dir = dest 

532 else: 

533 dest_dir = os.path.dirname(dest) 

534 if not os.path.lexists(dest_dir): 

535 umask = os.umask(00000) 

536 os.makedirs(dest_dir, 0o2775) 

537 os.umask(umask) 

538 # print "Moving %s to %s..." % (src, dest) 

539 if os.path.exists(dest) and os.path.isdir(dest): 

540 dest += "/" + os.path.basename(src) 

541 # Don't overwrite unless forced to 

542 if os.path.lexists(dest): 

543 if not overwrite: 

544 fubar("Can't move %s to %s - file already exists." % (src, dest)) 

545 else: 

546 if not os.access(dest, os.W_OK): 

547 fubar( 

548 "Can't move %s to %s - can't write to existing file." % (src, dest) 

549 ) 

550 shutil.copy2(src, dest) 

551 os.chmod(dest, perms) 

552 os.unlink(src) 

553 

554 

555################################################################################ 

556 

557 

558def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str: 

559 """Perform a substition of template""" 

560 with open(filename) as templatefile: 

561 template = templatefile.read() 

562 for k, v in subst_map.items(): 

563 template = template.replace(k, str(v)) 

564 return template 

565 

566 

567################################################################################ 

568 

569 

570def fubar(msg: str, exit_code: int = 1) -> NoReturn: 

571 """print error message and exit program""" 

572 print("E:", msg, file=sys.stderr) 

573 sys.exit(exit_code) 

574 

575 

576def warn(msg: str) -> None: 

577 """print warning message""" 

578 print("W:", msg, file=sys.stderr) 

579 

580 

581################################################################################ 

582 

583 

584def whoami() -> str: 

585 """get user name 

586 

587 Returns the user name with a laughable attempt at rfc822 conformancy 

588 (read: removing stray periods). 

589 """ 

590 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "") 

591 

592 

593def getusername() -> str: 

594 """get login name""" 

595 return pwd.getpwuid(os.getuid())[0] 

596 

597 

598################################################################################ 

599 

600 

601def size_type(c: Union[int, float]) -> str: 

602 t = " B" 

603 if c > 10240: 

604 c = c / 1024 

605 t = " KB" 

606 if c > 10240: 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true

607 c = c / 1024 

608 t = " MB" 

609 return "%d%s" % (c, t) 

610 

611 

612################################################################################ 

613 

614 

615def find_next_free(dest: str, too_many: int = 100) -> str: 

616 extra = 0 

617 orig_dest = dest 

618 while os.path.lexists(dest) and extra < too_many: 

619 dest = orig_dest + "." + repr(extra) 

620 extra += 1 

621 if extra >= too_many: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true

622 raise NoFreeFilenameError 

623 return dest 

624 

625 

626################################################################################ 

627 

628 

629def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str: 

630 return sep.join(x if x is not None else "" for x in original) 

631 

632 

633################################################################################ 

634 

635 

636def prefix_multi_line_string( 

637 lines: str, prefix: str, include_blank_lines: bool = False 

638) -> str: 

639 """prepend `prefix` to each line in `lines`""" 

640 return "\n".join( 

641 prefix + cleaned_line 

642 for line in lines.split("\n") 

643 if (cleaned_line := line.strip()) or include_blank_lines 

644 ) 

645 

646 

647################################################################################ 

648 

649 

650def join_with_commas_and(list: Sequence[str]) -> str: 

651 if len(list) == 0: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true

652 return "nothing" 

653 if len(list) == 1: 653 ↛ 655line 653 didn't jump to line 655 because the condition on line 653 was always true

654 return list[0] 

655 return ", ".join(list[:-1]) + " and " + list[-1] 

656 

657 

658################################################################################ 

659 

660 

661def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str: 

662 pp_deps = ( 

663 f"{pkg} ({constraint} {version})" if constraint else pkg 

664 for pkg, constraint, version in deps 

665 ) 

666 return " |".join(pp_deps) 

667 

668 

669################################################################################ 

670 

671 

672def get_conf() -> apt_pkg.Configuration: 

673 return Cnf 

674 

675 

676################################################################################ 

677 

678 

679def parse_args(Options: apt_pkg.Configuration) -> tuple[str, str, str, bool]: 

680 """Handle -a, -c and -s arguments; returns them as SQL constraints""" 

681 # XXX: This should go away and everything which calls it be converted 

682 # to use SQLA properly. For now, we'll just fix it not to use 

683 # the old Pg interface though 

684 session = DBConn().session() 

685 # Process suite 

686 if Options["Suite"]: 686 ↛ 704line 686 didn't jump to line 704 because the condition on line 686 was always true

687 suite_ids_list = [] 

688 for suitename in split_args(Options["Suite"]): 

689 suite = get_suite(suitename, session=session) 

690 if not suite or suite.suite_id is None: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true

691 warn( 

692 "suite '%s' not recognised." 

693 % (suite and suite.suite_name or suitename) 

694 ) 

695 else: 

696 suite_ids_list.append(suite.suite_id) 

697 if suite_ids_list: 697 ↛ 702line 697 didn't jump to line 702 because the condition on line 697 was always true

698 con_suites = "AND su.id IN (%s)" % ", ".join( 

699 [str(i) for i in suite_ids_list] 

700 ) 

701 else: 

702 fubar("No valid suite given.") 

703 else: 

704 con_suites = "" 

705 

706 # Process component 

707 if Options["Component"]: 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true

708 component_ids_list = [] 

709 for componentname in split_args(Options["Component"]): 

710 component = get_component(componentname, session=session) 

711 if component is None: 

712 warn("component '%s' not recognised." % (componentname)) 

713 else: 

714 component_ids_list.append(component.component_id) 

715 if component_ids_list: 

716 con_components = "AND c.id IN (%s)" % ", ".join( 

717 [str(i) for i in component_ids_list] 

718 ) 

719 else: 

720 fubar("No valid component given.") 

721 else: 

722 con_components = "" 

723 

724 # Process architecture 

725 con_architectures = "" 

726 check_source = False 

727 if Options["Architecture"]: 727 ↛ 728line 727 didn't jump to line 728 because the condition on line 727 was never true

728 arch_ids_list = [] 

729 for archname in split_args(Options["Architecture"]): 

730 if archname == "source": 

731 check_source = True 

732 else: 

733 arch = get_architecture(archname, session=session) 

734 if arch is None: 

735 warn("architecture '%s' not recognised." % (archname)) 

736 else: 

737 arch_ids_list.append(arch.arch_id) 

738 if arch_ids_list: 

739 con_architectures = "AND a.id IN (%s)" % ", ".join( 

740 [str(i) for i in arch_ids_list] 

741 ) 

742 else: 

743 if not check_source: 

744 fubar("No valid architecture given.") 

745 else: 

746 check_source = True 

747 

748 return (con_suites, con_architectures, con_components, check_source) 

749 

750 

751################################################################################ 

752 

753 

754@functools.total_ordering 

755class ArchKey: 

756 """ 

757 Key object for use in sorting lists of architectures. 

758 

759 Sorts normally except that 'source' dominates all others. 

760 """ 

761 

762 __slots__ = ["arch", "issource"] 

763 

764 def __init__(self, arch: str, *args): 

765 self.arch = arch 

766 self.issource = arch == "source" 

767 

768 def __lt__(self, other: "ArchKey") -> bool: 

769 if self.issource: 

770 return not other.issource 

771 if other.issource: 

772 return False 

773 return self.arch < other.arch 

774 

775 @override 

776 def __eq__(self, other: object) -> bool: 

777 if not isinstance(other, ArchKey): 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true

778 return NotImplemented 

779 return self.arch == other.arch 

780 

781 

782################################################################################ 

783 

784 

785def split_args(s: str, dwim: bool = True) -> list[str]: 

786 """ 

787 Split command line arguments which can be separated by either commas 

788 or whitespace. If dwim is set, it will complain about string ending 

789 in comma since this usually means someone did 'dak ls -a i386, m68k 

790 foo' or something and the inevitable confusion resulting from 'm68k' 

791 being treated as an argument is undesirable. 

792 """ 

793 

794 if s.find(",") == -1: 794 ↛ 797line 794 didn't jump to line 797 because the condition on line 794 was always true

795 return s.split() 

796 else: 

797 if s[-1:] == "," and dwim: 

798 fubar("split_args: found trailing comma, spurious space maybe?") 

799 return s.split(",") 

800 

801 

802################################################################################ 

803 

804 

805def split_args_or_none(s: str | None, dwim: bool = True) -> list[str] | None: 

806 """ 

807 Split command line arguments like `split_args`, but return `None` for empty string 

808 """ 

809 if not s: 

810 return None 

811 return split_args(s, dwim) 

812 

813 

814################################################################################ 

815 

816 

817def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]: 

818 if keyrings is None: 818 ↛ 821line 818 didn't jump to line 821 because the condition on line 818 was always true

819 keyrings = get_active_keyring_paths() 

820 

821 return ["--keyring={}".format(path) for path in keyrings] 

822 

823 

824################################################################################ 

825 

826 

827def _gpg_get_addresses_from_listing(output: bytes) -> list[str]: 

828 addresses: list[str] = [] 

829 

830 for line in output.split(b"\n"): 

831 parts = line.split(b":") 

832 if parts[0] not in (b"uid", b"pub"): 

833 continue 

834 if parts[1] in (b"i", b"d", b"r"): 834 ↛ 836line 834 didn't jump to line 836 because the condition on line 834 was never true

835 # Skip uid that is invalid, disabled or revoked 

836 continue 

837 try: 

838 uid_bytes = parts[9] 

839 except IndexError: 

840 continue 

841 try: 

842 uid = uid_bytes.decode(encoding="utf-8") 

843 except UnicodeDecodeError: 

844 # If the uid is not valid UTF-8, we assume it is an old uid 

845 # still encoding in Latin-1. 

846 uid = uid_bytes.decode(encoding="latin1") 

847 m = re_parse_maintainer.match(uid) 

848 if not m: 

849 continue 

850 address = m.group(2) 

851 if address.endswith("@debian.org"): 851 ↛ 854line 851 didn't jump to line 854 because the condition on line 851 was never true

852 # prefer @debian.org addresses 

853 # TODO: maybe not hardcode the domain 

854 addresses.insert(0, address) 

855 else: 

856 addresses.append(address) 

857 

858 return addresses 

859 

860 

861def gpg_get_key_addresses(fingerprint: str) -> list[str]: 

862 """retreive email addresses from gpg key uids for a given fingerprint""" 

863 addresses = key_uid_email_cache.get(fingerprint) 

864 if addresses is not None: 

865 return addresses 

866 

867 try: 

868 cmd = ["gpg", "--no-default-keyring"] 

869 cmd.extend(gpg_keyring_args()) 

870 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint]) 

871 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) 

872 except subprocess.CalledProcessError: 

873 addresses = [] 

874 else: 

875 addresses = _gpg_get_addresses_from_listing(output) 

876 

877 key_uid_email_cache[fingerprint] = addresses 

878 return addresses 

879 

880 

881################################################################################ 

882 

883 

884def open_ldap_connection() -> Any: 

885 """open connection to the configured LDAP server""" 

886 import ldap # type: ignore 

887 

888 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"] 

889 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile") 

890 

891 conn = ldap.initialize(LDAPServer) 

892 

893 if ca_cert_file: 

894 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD) 

895 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file) 

896 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True) 

897 conn.start_tls_s() 

898 

899 conn.simple_bind_s("", "") 

900 

901 return conn 

902 

903 

904################################################################################ 

905 

906 

907def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]: 

908 """retrieve login from LDAP linked to a given fingerprint""" 

909 import ldap 

910 

911 conn = open_ldap_connection() 

912 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

913 Attrs = conn.search_s( 

914 LDAPDn, 

915 ldap.SCOPE_ONELEVEL, 

916 "(keyfingerprint=%s)" % fingerprint, 

917 ["uid", "keyfingerprint"], 

918 ) 

919 login: dict[str, str] = {} 

920 for elem in Attrs: 

921 fpr = elem[1]["keyFingerPrint"][0].decode() 

922 uid = elem[1]["uid"][0].decode() 

923 login[fpr] = uid 

924 return login 

925 

926 

927################################################################################ 

928 

929 

930def get_users_from_ldap() -> dict[str, str]: 

931 """retrieve login and user names from LDAP""" 

932 import ldap 

933 

934 conn = open_ldap_connection() 

935 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

936 Attrs = conn.search_s( 

937 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"] 

938 ) 

939 users: dict[str, str] = {} 

940 for elem in Attrs: 

941 elem = elem[1] 

942 name = [] 

943 for k in ("cn", "mn", "sn"): 

944 try: 

945 value = elem[k][0].decode() 

946 if value and value[0] != "-": 

947 name.append(value) 

948 except KeyError: 

949 pass 

950 users[" ".join(name)] = elem["uid"][0] 

951 return users 

952 

953 

954################################################################################ 

955 

956 

957def clean_symlink(src: str, dest: str, root: str) -> str: 

958 """ 

959 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'. 

960 Returns fixed 'src' 

961 """ 

962 src = src.replace(root, "", 1) 

963 dest = dest.replace(root, "", 1) 

964 dest = os.path.dirname(dest) 

965 new_src = "../" * len(dest.split("/")) 

966 return new_src + src 

967 

968 

969################################################################################ 

970 

971 

972def temp_dirname( 

973 parent: Optional[str] = None, 

974 prefix: str = "dak", 

975 suffix: str = "", 

976 mode: Optional[int] = None, 

977 group: Optional[str] = None, 

978) -> str: 

979 """ 

980 Return a secure and unique directory by pre-creating it. 

981 

982 :param parent: If non-null it will be the directory the directory is pre-created in. 

983 :param prefix: The filename will be prefixed with this string 

984 :param suffix: The filename will end with this string 

985 :param mode: If set the file will get chmodded to those permissions 

986 :param group: If set the file will get chgrped to the specified group. 

987 :return: Returns a pair (fd, name) 

988 

989 """ 

990 

991 tfname = tempfile.mkdtemp(suffix, prefix, parent) 

992 if mode is not None: 992 ↛ 994line 992 didn't jump to line 994 because the condition on line 992 was always true

993 os.chmod(tfname, mode) 

994 if group is not None: 994 ↛ 995line 994 didn't jump to line 995 because the condition on line 994 was never true

995 gid = grp.getgrnam(group).gr_gid 

996 os.chown(tfname, -1, gid) 

997 return tfname 

998 

999 

1000################################################################################ 

1001 

1002 

1003def get_changes_files(from_dir: str) -> list[str]: 

1004 """ 

1005 Takes a directory and lists all .changes files in it (as well as chdir'ing 

1006 to the directory; this is due to broken behaviour on the part of p-u/p-a 

1007 when you're not in the right place) 

1008 

1009 Returns a list of filenames 

1010 """ 

1011 try: 

1012 # Much of the rest of p-u/p-a depends on being in the right place 

1013 os.chdir(from_dir) 

1014 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")] 

1015 except OSError as e: 

1016 fubar("Failed to read list from directory %s (%s)" % (from_dir, e)) 

1017 

1018 return changes_files 

1019 

1020 

1021################################################################################ 

1022 

1023 

1024Cnf: apt_pkg.Configuration = config.Config().Cnf 

1025 

1026################################################################################ 

1027 

1028 

1029def parse_wnpp_bug_file( 

1030 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm", 

1031) -> dict[str, list[str]]: 

1032 """ 

1033 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm 

1034 Well, actually it parsed a local copy, but let's document the source 

1035 somewhere ;) 

1036 

1037 returns a dict associating source package name with a list of open wnpp 

1038 bugs (Yes, there might be more than one) 

1039 """ 

1040 

1041 try: 

1042 with open(file) as f: 

1043 lines = f.readlines() 

1044 except OSError: 

1045 print( 

1046 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any." 

1047 % file 

1048 ) 

1049 lines = [] 

1050 wnpp = {} 

1051 

1052 for line in lines: 

1053 splited_line = line.split(": ", 1) 

1054 if len(splited_line) > 1: 

1055 wnpp[splited_line[0]] = splited_line[1].split("|") 

1056 

1057 for source in wnpp: 

1058 bugs = [] 

1059 for wnpp_bug in wnpp[source]: 

1060 m = re.search(r"(\d)+", wnpp_bug) 

1061 assert m is not None 

1062 bug_no = m.group() 

1063 if bug_no: 

1064 bugs.append(bug_no) 

1065 wnpp[source] = bugs 

1066 return wnpp 

1067 

1068 

1069################################################################################ 

1070 

1071 

1072def deb_extract_control(path: str) -> bytes: 

1073 """extract DEBIAN/control from a binary package""" 

1074 return apt_inst.DebFile(path).control.extractdata("control") 

1075 

1076 

1077################################################################################ 

1078 

1079 

1080def mail_addresses_for_upload( 

1081 maintainer: str, 

1082 changed_by: str, 

1083 fingerprint: str, 

1084 authorized_by_fingerprint: Optional[str], 

1085) -> list[str]: 

1086 """mail addresses to contact for an upload 

1087 

1088 :param maintainer: Maintainer field of the .changes file 

1089 :param changed_by: Changed-By field of the .changes file 

1090 :param fingerprint: fingerprint of the key used to sign the upload 

1091 :return: list of RFC 2047-encoded mail addresses to contact regarding 

1092 this upload 

1093 """ 

1094 recipients = Cnf.value_list("Dinstall::UploadMailRecipients") 

1095 if not recipients: 1095 ↛ 1104line 1095 didn't jump to line 1104 because the condition on line 1095 was always true

1096 recipients = [ 

1097 "maintainer", 

1098 "changed_by", 

1099 "signer", 

1100 "authorized_by", 

1101 ] 

1102 

1103 # Ensure signer and authorized_by are last if present 

1104 for r in ("signer", "authorized_by"): 

1105 try: 

1106 recipients.remove(r) 

1107 except ValueError: 

1108 pass 

1109 else: 

1110 recipients.append(r) 

1111 

1112 # Compute the set of addresses of the recipients 

1113 addresses = set() # Name + email 

1114 emails = set() # Email only, used to avoid duplicates 

1115 for recipient in recipients: 

1116 address: str | None 

1117 if recipient.startswith("mail:"): # Email hardcoded in config 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 address = recipient[5:] 

1119 elif recipient == "maintainer": 

1120 address = maintainer 

1121 elif recipient == "changed_by": 

1122 address = changed_by 

1123 elif recipient == "signer" or recipient == "authorized_by": 1123 ↛ 1133line 1123 didn't jump to line 1133 because the condition on line 1123 was always true

1124 fpr = fingerprint if recipient == "signer" else authorized_by_fingerprint 

1125 if not fpr: 1125 ↛ 1126line 1125 didn't jump to line 1126 because the condition on line 1125 was never true

1126 continue 

1127 fpr_addresses = gpg_get_key_addresses(fpr) 

1128 address = fpr_addresses[0] if fpr_addresses else None 

1129 if any(x in emails for x in fpr_addresses): 

1130 # The signer already gets a copy via another email 

1131 address = None 

1132 else: 

1133 raise Exception( 

1134 "Unsupported entry in {0}: {1}".format( 

1135 "Dinstall::UploadMailRecipients", recipient 

1136 ) 

1137 ) 

1138 

1139 if address is not None: 

1140 mail = fix_maintainer(address)[3] 

1141 if mail not in emails: 

1142 addresses.add(address) 

1143 emails.add(mail) 

1144 

1145 encoded_addresses = [fix_maintainer(e)[1] for e in addresses] 

1146 return encoded_addresses 

1147 

1148 

1149################################################################################ 

1150 

1151 

1152def call_editor_for_file(path: str) -> None: 

1153 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor")) 

1154 subprocess.check_call([editor, path]) 

1155 

1156 

1157################################################################################ 

1158 

1159 

1160def call_editor(text: str = "", suffix: str = ".txt") -> str: 

1161 """run editor and return the result as a string 

1162 

1163 :param text: initial text 

1164 :param suffix: extension for temporary file 

1165 :return: string with the edited text 

1166 """ 

1167 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh: 

1168 print(text, end="", file=fh) 

1169 fh.flush() 

1170 call_editor_for_file(fh.name) 

1171 fh.seek(0) 

1172 return fh.read() 

1173 

1174 

1175################################################################################ 

1176 

1177 

1178def check_reverse_depends( 

1179 removals: Iterable[str], 

1180 suite: str, 

1181 arches: Optional[Iterable[str]] = None, 

1182 session: "Session | None" = None, 

1183 cruft: bool = False, 

1184 quiet: bool = False, 

1185 include_arch_all: bool = True, 

1186) -> bool: 

1187 assert session is not None # TODO: remove default value... 

1188 

1189 dbsuite = get_suite(suite, session) 

1190 assert dbsuite is not None 

1191 overridesuite = ( 

1192 get_suite(dbsuite.overridesuite, session) if dbsuite.overridesuite else dbsuite 

1193 ) 

1194 assert overridesuite is not None 

1195 dep_problem = False 

1196 p2c = {} 

1197 all_broken: defaultdict[str, defaultdict[str, set[str]]] = defaultdict( 1197 ↛ exitline 1197 didn't jump to the function exit

1198 lambda: defaultdict(set) 

1199 ) 

1200 if arches: 1200 ↛ 1201line 1200 didn't jump to line 1201 because the condition on line 1200 was never true

1201 all_arches = set(arches) 

1202 else: 

1203 all_arches = set( 

1204 x.arch_string for x in get_suite_architectures(suite, session=session) 

1205 ) 

1206 all_arches -= set(["source", "all"]) 

1207 removal_set = set(removals) 

1208 metakey_d = get_or_set_metadatakey("Depends", session) 

1209 metakey_p = get_or_set_metadatakey("Provides", session) 

1210 if include_arch_all: 1210 ↛ 1213line 1210 didn't jump to line 1213 because the condition on line 1210 was always true

1211 rdep_architectures = all_arches | set(["all"]) 

1212 else: 

1213 rdep_architectures = all_arches 

1214 for architecture in rdep_architectures: 

1215 deps = {} 

1216 sources = {} 

1217 virtual_packages = {} 

1218 

1219 params: dict[str, object] = { 

1220 "suite_id": dbsuite.suite_id, 

1221 "metakey_d_id": metakey_d.key_id, 

1222 "metakey_p_id": metakey_p.key_id, 

1223 } 

1224 arch = get_architecture(architecture, session) 

1225 if arch is None: 1225 ↛ 1226line 1225 didn't jump to line 1226 because the condition on line 1225 was never true

1226 continue 

1227 params["arch_id"] = arch.arch_id 

1228 

1229 statement = sql.text( 

1230 """ 

1231 SELECT b.package, s.source, c.name as component, 

1232 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends, 

1233 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides 

1234 FROM binaries b 

1235 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id 

1236 JOIN source s ON b.source = s.id 

1237 JOIN files_archive_map af ON b.file = af.file_id 

1238 JOIN component c ON af.component_id = c.id 

1239 WHERE b.architecture = :arch_id""" 

1240 ) 

1241 query = session.execute(statement, params) 

1242 for package, source, component, depends, provides in query: 

1243 sources[package] = source 

1244 p2c[package] = component 

1245 if depends is not None: 1245 ↛ 1246line 1245 didn't jump to line 1246 because the condition on line 1245 was never true

1246 deps[package] = depends 

1247 # Maintain a counter for each virtual package. If a 

1248 # Provides: exists, set the counter to 0 and count all 

1249 # provides by a package not in the list for removal. 

1250 # If the counter stays 0 at the end, we know that only 

1251 # the to-be-removed packages provided this virtual 

1252 # package. 

1253 if provides is not None: 1253 ↛ 1254line 1253 didn't jump to line 1254 because the condition on line 1253 was never true

1254 for virtual_pkg in provides.split(","): 

1255 virtual_pkg = virtual_pkg.strip() 

1256 if virtual_pkg == package: 

1257 continue 

1258 if virtual_pkg not in virtual_packages: 

1259 virtual_packages[virtual_pkg] = 0 

1260 if package not in removals: 

1261 virtual_packages[virtual_pkg] += 1 

1262 

1263 # If a virtual package is only provided by the to-be-removed 

1264 # packages, treat the virtual package as to-be-removed too. 

1265 removal_set.update( 

1266 virtual_pkg 

1267 for virtual_pkg in virtual_packages 

1268 if not virtual_packages[virtual_pkg] 

1269 ) 

1270 

1271 # Check binary dependencies (Depends) 

1272 for package in deps: 1272 ↛ 1273line 1272 didn't jump to line 1273 because the loop on line 1272 never started

1273 if package in removals: 

1274 continue 

1275 try: 

1276 parsed_dep = apt_pkg.parse_depends(deps[package]) 

1277 except ValueError as e: 

1278 print("Error for package %s: %s" % (package, e)) 

1279 parsed_dep = [] 

1280 for dep in parsed_dep: 

1281 # Check for partial breakage. If a package has a ORed 

1282 # dependency, there is only a dependency problem if all 

1283 # packages in the ORed depends will be removed. 

1284 unsat = 0 

1285 for dep_package, _, _ in dep: 

1286 if dep_package in removals: 

1287 unsat += 1 

1288 if unsat == len(dep): 

1289 component = p2c[package] 

1290 source = sources[package] 

1291 if component != "main": 

1292 source = "%s/%s" % (source, component) 

1293 all_broken[source][package].add(architecture) 

1294 dep_problem = True 

1295 

1296 if all_broken and not quiet: 1296 ↛ 1297line 1296 didn't jump to line 1297 because the condition on line 1296 was never true

1297 if cruft: 

1298 print(" - broken Depends:") 

1299 else: 

1300 print("# Broken Depends:") 

1301 for source, bindict in sorted(all_broken.items()): 

1302 lines = [] 

1303 for binary, bin_arches in sorted(bindict.items()): 

1304 if bin_arches == all_arches or "all" in bin_arches: 

1305 lines.append(binary) 

1306 else: 

1307 lines.append("%s [%s]" % (binary, " ".join(sorted(bin_arches)))) 

1308 if cruft: 

1309 print(" %s: %s" % (source, lines[0])) 

1310 else: 

1311 print("%s: %s" % (source, lines[0])) 

1312 for line in lines[1:]: 

1313 if cruft: 

1314 print(" " + " " * (len(source) + 2) + line) 

1315 else: 

1316 print(" " * (len(source) + 2) + line) 

1317 if not cruft: 

1318 print() 

1319 

1320 # Check source dependencies (Build-Depends and Build-Depends-Indep) 

1321 all_broken_bd: dict[str, set[str]] = defaultdict(set) 

1322 metakey_bd = get_or_set_metadatakey("Build-Depends", session) 

1323 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session) 

1324 metakey_ids: tuple[int, ...] 

1325 if include_arch_all: 1325 ↛ 1328line 1325 didn't jump to line 1328 because the condition on line 1325 was always true

1326 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id) 

1327 else: 

1328 metakey_ids = (metakey_bd.key_id,) 

1329 

1330 params = { 

1331 "suite_id": dbsuite.suite_id, 

1332 "metakey_ids": metakey_ids, 

1333 } 

1334 statement = sql.text( 

1335 """ 

1336 SELECT s.source, replace(string_agg(trim(sm.value), ', '), ',,', ',') as build_dep 

1337 FROM source s 

1338 JOIN source_metadata sm ON s.id = sm.src_id 

1339 WHERE s.id in 

1340 (SELECT src FROM newest_src_association 

1341 WHERE suite = :suite_id) 

1342 AND sm.key_id in :metakey_ids 

1343 GROUP BY s.id, s.source""" 

1344 ) 

1345 query = session.execute(statement, params) 

1346 for source, build_dep in query: 

1347 if source in removals: 

1348 continue 

1349 parsed_dep = [] 

1350 if build_dep is not None: 1350 ↛ 1357line 1350 didn't jump to line 1357 because the condition on line 1350 was always true

1351 # Remove [arch] information since we want to see breakage on all arches 

1352 build_dep = re_build_dep_arch.sub("", build_dep) 

1353 try: 

1354 parsed_dep = apt_pkg.parse_src_depends(build_dep) 

1355 except ValueError as e: 

1356 print("Error for source %s: %s" % (source, e)) 

1357 for dep in parsed_dep: 

1358 unsat = 0 

1359 for dep_package, _, _ in dep: 

1360 if dep_package in removals: 1360 ↛ 1361line 1360 didn't jump to line 1361 because the condition on line 1360 was never true

1361 unsat += 1 

1362 if unsat == len(dep): 1362 ↛ 1363line 1362 didn't jump to line 1363

1363 component = ( 

1364 session.query(Component.component_name) 

1365 .join(Component.overrides) 

1366 .filter(Override.suite == overridesuite) 

1367 .filter( 

1368 Override.package 

1369 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source) 

1370 ) 

1371 .join(Override.overridetype) 

1372 .filter(OverrideType.overridetype == "dsc") 

1373 .scalar() 

1374 ) 

1375 key = source 

1376 if component != "main": 

1377 key = "%s/%s" % (source, component) 

1378 all_broken_bd[key].add(pp_deps(dep)) 

1379 dep_problem = True 

1380 

1381 if all_broken_bd and not quiet: 1381 ↛ 1382line 1381 didn't jump to line 1382 because the condition on line 1381 was never true

1382 if cruft: 

1383 print(" - broken Build-Depends:") 

1384 else: 

1385 print("# Broken Build-Depends:") 

1386 for source, bdeps in sorted(all_broken_bd.items()): 

1387 sorted_bdeps = sorted(bdeps) 

1388 if cruft: 

1389 print(" %s: %s" % (source, sorted_bdeps[0])) 

1390 else: 

1391 print("%s: %s" % (source, sorted_bdeps[0])) 

1392 for bdep in sorted_bdeps[1:]: 

1393 if cruft: 

1394 print(" " + " " * (len(source) + 2) + bdep) 

1395 else: 

1396 print(" " * (len(source) + 2) + bdep) 

1397 if not cruft: 

1398 print() 

1399 

1400 return dep_problem 

1401 

1402 

1403################################################################################ 

1404 

1405 

1406def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]: 

1407 """source packages referenced via Built-Using 

1408 

1409 :param control: control file to take Built-Using field from 

1410 :return: list of (source_name, source_version) pairs 

1411 """ 

1412 built_using = control.get("Built-Using", None) 

1413 if built_using is None: 

1414 return [] 

1415 

1416 bu = [] 

1417 for dep in apt_pkg.parse_depends(built_using): 

1418 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field" 

1419 source_name, source_version, comp = dep[0] 

1420 assert comp == "=", "Built-Using must contain strict dependencies" 

1421 bu.append((source_name, source_version)) 

1422 

1423 return bu 

1424 

1425 

1426################################################################################ 

1427 

1428 

1429def is_in_debug_section(control: Mapping[str, str] | MetadataProxy) -> bool: 

1430 """binary package is a debug package 

1431 

1432 :param control: control file of binary package 

1433 :return: True if the binary package is a debug package 

1434 """ 

1435 section = control["Section"].split("/", 1)[-1] 

1436 auto_built_package = control.get("Auto-Built-Package") 

1437 return section == "debug" and auto_built_package == "debug-symbols" 

1438 

1439 

1440################################################################################ 

1441 

1442 

1443def find_possibly_compressed_file(filename: str) -> str: 

1444 """ 

1445 

1446 :param filename: path to a control file (Sources, Packages, etc) to 

1447 look for 

1448 :return: path to the (possibly compressed) control file, or null if the 

1449 file doesn't exist 

1450 """ 

1451 _compressions = ("", ".xz", ".gz", ".bz2") 

1452 

1453 for ext in _compressions: 1453 ↛ 1458line 1453 didn't jump to line 1458 because the loop on line 1453 didn't complete

1454 _file = filename + ext 

1455 if os.path.exists(_file): 

1456 return _file 

1457 

1458 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename) 

1459 

1460 

1461################################################################################ 

1462 

1463 

1464def parse_boolean_from_user(value: str) -> bool: 

1465 value = value.lower() 

1466 if value in {"yes", "true", "enable", "enabled"}: 

1467 return True 

1468 if value in {"no", "false", "disable", "disabled"}: 1468 ↛ 1470line 1468 didn't jump to line 1470 because the condition on line 1468 was always true

1469 return False 

1470 raise ValueError("Not sure whether %s should be a True or a False" % value) 

1471 

1472 

1473_DURATION_RE = re.compile(r"(?P<amount>\d+)\s*(?P<unit>[hms])", re.IGNORECASE) 

1474 

1475 

1476def parse_duration(value: str) -> datetime.timedelta: 

1477 """Parse duration from strings like "1h", "30m", or "1h 15m 10s".""" 

1478 value = value.strip() 

1479 if not value: 

1480 raise ValueError("Duration must be non-empty") 

1481 

1482 total_seconds = 0 

1483 for match in _DURATION_RE.finditer(value): 

1484 amount = int(match.group("amount")) 

1485 unit = match.group("unit").lower() 

1486 if unit == "h": 

1487 total_seconds += amount * 3600 

1488 elif unit == "m": 

1489 total_seconds += amount * 60 

1490 else: 

1491 total_seconds += amount 

1492 

1493 remainder = _DURATION_RE.sub("", value).strip(" ,") 

1494 if remainder: 

1495 raise ValueError("Invalid duration: %s" % value) 

1496 if total_seconds <= 0: 

1497 raise ValueError("Duration must be greater than zero") 

1498 return datetime.timedelta(seconds=total_seconds) 

1499 

1500 

1501def suite_suffix(suite_name: str) -> str: 

1502 """Return suite_suffix for the given suite""" 

1503 suffix = Cnf.find("Dinstall::SuiteSuffix", "") 

1504 if suffix == "": 1504 ↛ 1506line 1504 didn't jump to line 1506 because the condition on line 1504 was always true

1505 return "" 

1506 elif "Dinstall::SuiteSuffixSuites" not in Cnf: 

1507 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future 

1508 return suffix 

1509 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"): 

1510 return suffix 

1511 return "" 

1512 

1513 

1514################################################################################ 

1515 

1516 

1517def process_buildinfos( 

1518 directory: str, 

1519 buildinfo_files: "Iterable[daklib.upload.HashedFile]", 

1520 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1521 logger: "daklib.daklog.Logger", 

1522) -> None: 

1523 """Copy buildinfo files into Dir::BuildinfoArchive 

1524 

1525 :param directory: directory where .changes is stored 

1526 :param buildinfo_files: names of buildinfo files 

1527 :param fs_transaction: FilesystemTransaction instance 

1528 :param logger: logger instance 

1529 """ 

1530 

1531 if "Dir::BuildinfoArchive" not in Cnf: 

1532 return 

1533 

1534 target_dir = os.path.join( 

1535 Cnf["Dir::BuildinfoArchive"], 

1536 datetime.datetime.now().strftime("%Y/%m/%d"), 

1537 ) 

1538 

1539 for f in buildinfo_files: 

1540 src = os.path.join(directory, f.filename) 

1541 dst = find_next_free(os.path.join(target_dir, f.filename)) 

1542 

1543 logger.log(["Archiving", f.filename]) 

1544 fs_transaction.copy(src, dst, mode=0o644) 

1545 

1546 

1547################################################################################ 

1548 

1549 

1550def move_to_morgue( 

1551 morguesubdir: str, 

1552 filenames: Iterable[str], 

1553 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1554 logger: "daklib.daklog.Logger", 

1555) -> None: 

1556 """Move a file to the correct dir in morgue 

1557 

1558 :param morguesubdir: subdirectory of morgue where this file needs to go 

1559 :param filenames: names of files 

1560 :param fs_transaction: FilesystemTransaction instance 

1561 :param logger: logger instance 

1562 """ 

1563 

1564 assert Cnf["Dir::Base"] 

1565 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf["Dir::Base"], "morgue")) 

1566 

1567 # Build directory as morguedir/morguesubdir/year/month/day 

1568 now = datetime.datetime.now() 

1569 dest = os.path.join( 

1570 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day 

1571 ) 

1572 

1573 for filename in filenames: 

1574 dest_filename = dest + "/" + os.path.basename(filename) 

1575 # If the destination file exists; try to find another filename to use 

1576 if os.path.lexists(dest_filename): 1576 ↛ 1577line 1576 didn't jump to line 1577 because the condition on line 1576 was never true

1577 dest_filename = find_next_free(dest_filename) 

1578 logger.log(["move to morgue", filename, dest_filename]) 

1579 fs_transaction.move(filename, dest_filename) 

1580 

1581 

1582################################################################################ 

1583 

1584 

1585def resolve_relative_path(base_path: StrPath, relative_path: StrPath) -> Path: 

1586 """Resolve absolute path joining base_path and relative_path 

1587 

1588 Raises: 

1589 ValueError: when the resolved path is not relative to base_path 

1590 """ 

1591 base = Path(base_path).resolve() 

1592 path = (base / relative_path).resolve() 

1593 

1594 if not path.is_relative_to(base): 1594 ↛ 1595line 1594 didn't jump to line 1595 because the condition on line 1594 was never true

1595 raise ValueError(f"Path {relative_path} is not relative to {base_path}: {path}") 

1596 

1597 return path 

1598 

1599 

1600################################################################################ 

1601 

1602 

1603def remove_unsafe_symlinks(base_path: StrPath) -> None: 

1604 """Remove all unsafe symlinks below base_path""" 

1605 base = Path(base_path).resolve() 

1606 for dirpath, _, filenames in base.walk(): 

1607 for filename in filenames: 

1608 current_dir = base / dirpath 

1609 path = current_dir / filename 

1610 if not path.is_symlink(): 1610 ↛ 1612line 1610 didn't jump to line 1612 because the condition on line 1610 was always true

1611 continue 

1612 target = (current_dir / path.readlink()).resolve() 

1613 if not target.is_relative_to(base): 

1614 path.unlink()