Coverage for daklib/utils.py: 57%

719 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# vim:set et ts=4 sw=4: 

2 

3"""Utility functions 

4 

5@contact: Debian FTP Master <ftpmaster@debian.org> 

6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org> 

7@license: GNU General Public License version 2 or later 

8""" 

9 

10# This program is free software; you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation; either version 2 of the License, or 

13# (at your option) any later version. 

14 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19 

20# You should have received a copy of the GNU General Public License 

21# along with this program; if not, write to the Free Software 

22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

23 

24import datetime 

25import email.policy 

26import errno 

27import functools 

28import grp 

29import os 

30import pwd 

31import re 

32import shutil 

33import subprocess 

34import sys 

35import tempfile 

36from collections import defaultdict 

37from collections.abc import Collection, Iterable, Mapping, Sequence 

38from typing import TYPE_CHECKING, Any, Literal, NoReturn, Optional, Union, override 

39 

40import apt_inst 

41import apt_pkg 

42import sqlalchemy.sql as sql 

43 

44import daklib.config as config 

45import daklib.mail 

46from daklib.dbconn import ( 

47 Component, 

48 DBConn, 

49 MetadataProxy, 

50 Override, 

51 OverrideType, 

52 get_active_keyring_paths, 

53 get_architecture, 

54 get_component, 

55 get_or_set_metadatakey, 

56 get_suite, 

57 get_suite_architectures, 

58) 

59 

60from .dak_exceptions import ( 

61 InvalidDscError, 

62 NoFilesFieldError, 

63 NoFreeFilenameError, 

64 ParseChangesError, 

65 SendmailFailedError, 

66 UnknownFormatError, 

67) 

68from .formats import parse_format, validate_changes_format 

69from .gpg import SignedFile 

70from .regexes import ( 

71 re_build_dep_arch, 

72 re_issource, 

73 re_multi_line_field, 

74 re_parse_maintainer, 

75 re_re_mark, 

76 re_single_line_field, 

77 re_srchasver, 

78 re_whitespace_comment, 

79) 

80from .srcformats import get_format_from_string 

81from .textutils import fix_maintainer 

82 

83if TYPE_CHECKING: 

84 from sqlalchemy.orm import Session 

85 

86 import daklib.daklog 

87 import daklib.fstransactions 

88 import daklib.upload 

89 

90################################################################################ 

91 

92key_uid_email_cache: dict[str, list[str]] = ( 

93 {} 

94) #: Cache for email addresses from gpg key uids 

95 

96################################################################################ 

97 

98 

99def input_or_exit(prompt: Optional[str] = None) -> str: 

100 try: 

101 return input(prompt) 

102 except EOFError: 

103 sys.exit("\nUser interrupt (^D).") 

104 

105 

106################################################################################ 

107 

108 

109def extract_component_from_section(section: str) -> tuple[str, str]: 

110 """split "section" into "section", "component" parts 

111 

112 If "component" is not given, "main" is used instead. 

113 

114 :return: tuple (section, component) 

115 """ 

116 if section.find("/") != -1: 

117 return section, section.split("/", 1)[0] 

118 return section, "main" 

119 

120 

121################################################################################ 

122 

123 

124def parse_deb822( 

125 armored_contents: bytes, 

126 signing_rules: Literal[-1, 0, 1] = 0, 

127 keyrings: Collection[str] | None = None, 

128) -> dict[str, str]: 

129 require_signature = True 

130 if keyrings is None: 130 ↛ 134line 130 didn't jump to line 134 because the condition on line 130 was always true

131 keyrings = [] 

132 require_signature = False 

133 

134 signed_file = SignedFile( 

135 armored_contents, keyrings=keyrings, require_signature=require_signature 

136 ) 

137 contents = signed_file.contents.decode("utf-8") 

138 

139 error = "" 

140 changes = {} 

141 

142 # Split the lines in the input, keeping the linebreaks. 

143 lines = contents.splitlines(True) 

144 

145 if len(lines) == 0: 

146 raise ParseChangesError("[Empty changes file]") 

147 

148 # Reindex by line number so we can easily verify the format of 

149 # .dsc files... 

150 index = 0 

151 indexed_lines = {} 

152 for line in lines: 

153 index += 1 

154 indexed_lines[index] = line[:-1] 

155 

156 num_of_lines = len(indexed_lines) 

157 index = 0 

158 first = -1 

159 while index < num_of_lines: 

160 index += 1 

161 line = indexed_lines[index] 

162 if line == "" and signing_rules == 1: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 if index != num_of_lines: 

164 raise InvalidDscError(index) 

165 break 

166 if slf := re_single_line_field.match(line): 

167 field = slf.groups()[0].lower() 

168 changes[field] = slf.groups()[1] 

169 first = 1 

170 continue 

171 if line == " .": 

172 changes[field] += "\n" 

173 continue 

174 if mlf := re_multi_line_field.match(line): 

175 if first == -1: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 raise ParseChangesError( 

177 "'%s'\n [Multi-line field continuing on from nothing?]" % (line) 

178 ) 

179 if first == 1 and changes[field] != "": 

180 changes[field] += "\n" 

181 first = 0 

182 changes[field] += mlf.groups()[0] + "\n" 

183 continue 

184 error += line 

185 

186 changes["filecontents"] = armored_contents.decode() 

187 

188 if "source" in changes: 

189 # Strip the source version in brackets from the source field, 

190 # put it in the "source-version" field instead. 

191 if srcver := re_srchasver.search(changes["source"]): 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 changes["source"] = srcver.group(1) 

193 changes["source-version"] = srcver.group(2) 

194 

195 if error: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 raise ParseChangesError(error) 

197 

198 return changes 

199 

200 

201################################################################################ 

202 

203 

204def parse_changes( 

205 filename: str, 

206 signing_rules: Literal[-1, 0, 1] = 0, 

207 dsc_file: bool = False, 

208 keyrings: Collection[str] | None = None, 

209) -> dict[str, str]: 

210 """ 

211 Parses a changes or source control (.dsc) file and returns a dictionary 

212 where each field is a key. The mandatory first argument is the 

213 filename of the .changes file. 

214 

215 signing_rules is an optional argument: 

216 

217 - If signing_rules == -1, no signature is required. 

218 - If signing_rules == 0 (the default), a signature is required. 

219 - If signing_rules == 1, it turns on the same strict format checking 

220 as dpkg-source. 

221 

222 The rules for (signing_rules == 1)-mode are: 

223 

224 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----" 

225 followed by any PGP header data and must end with a blank line. 

226 

227 - The data section must end with a blank line and must be followed by 

228 "-----BEGIN PGP SIGNATURE-----". 

229 

230 :param dsc_file: `filename` is a Debian source control (.dsc) file 

231 """ 

232 

233 with open(filename, "rb") as changes_in: 

234 content = changes_in.read() 

235 changes = parse_deb822(content, signing_rules, keyrings=keyrings) 

236 

237 if not dsc_file: 

238 # Finally ensure that everything needed for .changes is there 

239 must_keywords = ( 

240 "Format", 

241 "Date", 

242 "Source", 

243 "Architecture", 

244 "Version", 

245 "Distribution", 

246 "Maintainer", 

247 "Changes", 

248 "Files", 

249 ) 

250 

251 missingfields = [] 

252 for keyword in must_keywords: 

253 if keyword.lower() not in changes: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 missingfields.append(keyword) 

255 

256 if len(missingfields): 

257 raise ParseChangesError( 

258 "Missing mandatory field(s) in changes file (policy 5.5): %s" 

259 % (missingfields) 

260 ) 

261 

262 return changes 

263 

264 

265################################################################################ 

266 

267 

268def check_dsc_files( 

269 dsc_filename: str, 

270 dsc: Mapping[str, str], 

271 dsc_files: Iterable[str], 

272) -> list[str]: 

273 """ 

274 Verify that the files listed in the Files field of the .dsc are 

275 those expected given the announced Format. 

276 

277 :param dsc_filename: path of .dsc file 

278 :param dsc: the content of the .dsc parsed by :func:`parse_changes` 

279 :param dsc_files: the file list returned by :func:`build_file_list` 

280 :return: all errors detected 

281 """ 

282 rejmsg = [] 

283 

284 # Ensure .dsc lists proper set of source files according to the format 

285 # announced 

286 has: defaultdict[str, int] = defaultdict(lambda: 0) 

287 

288 ftype_lookup = ( 

289 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)), 

290 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")), 

291 (r"diff\.gz", ("debian_diff",)), 

292 (r"tar\.gz", ("native_tar_gz", "native_tar")), 

293 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)), 

294 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)), 

295 (r"tar\.(gz|bz2|xz)", ("native_tar",)), 

296 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)), 

297 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)), 

298 ) 

299 

300 for f in dsc_files: 

301 m = re_issource.match(f) 

302 if not m: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 rejmsg.append( 

304 "%s: %s in Files field not recognised as source." % (dsc_filename, f) 

305 ) 

306 continue 

307 

308 # Populate 'has' dictionary by resolving keys in lookup table 

309 matched = False 

310 for regex, keys in ftype_lookup: 310 ↛ 318line 310 didn't jump to line 318 because the loop on line 310 didn't complete

311 if re.match(regex, m.group(3)): 

312 matched = True 

313 for key in keys: 

314 has[key] += 1 

315 break 

316 

317 # File does not match anything in lookup table; reject 

318 if not matched: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f)) 

320 break 

321 

322 # Check for multiple files 

323 for file_type in ( 

324 "orig_tar", 

325 "orig_tar_sig", 

326 "native_tar", 

327 "debian_tar", 

328 "debian_diff", 

329 ): 

330 if has[file_type] > 1: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type)) 

332 

333 # Source format specific tests 

334 try: 

335 format = get_format_from_string(dsc["format"]) 

336 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)]) 

337 

338 except UnknownFormatError: 

339 # Not an error here for now 

340 pass 

341 

342 return rejmsg 

343 

344 

345################################################################################ 

346 

347# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl 

348 

349 

350def build_file_list( 

351 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum" 

352) -> dict[str, dict[str, str]]: 

353 files = {} 

354 

355 # Make sure we have a Files: field to parse... 

356 if field not in changes: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 raise NoFilesFieldError 

358 

359 # Validate .changes Format: field 

360 if not is_a_dsc: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 validate_changes_format(parse_format(changes["format"]), field) 

362 

363 includes_section = (not is_a_dsc) and field == "files" 

364 

365 # Parse each entry/line: 

366 for i in changes[field].split("\n"): 366 ↛ 391line 366 didn't jump to line 391 because the loop on line 366 didn't complete

367 if not i: 

368 break 

369 s = i.split() 

370 section = priority = "" 

371 try: 

372 if includes_section: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 (md5, size, section, priority, name) = s 

374 else: 

375 (md5, size, name) = s 

376 except ValueError: 

377 raise ParseChangesError(i) 

378 

379 if section == "": 379 ↛ 381line 379 didn't jump to line 381 because the condition on line 379 was always true

380 section = "-" 

381 if priority == "": 381 ↛ 384line 381 didn't jump to line 384 because the condition on line 381 was always true

382 priority = "-" 

383 

384 (section, component) = extract_component_from_section(section) 

385 

386 files[name] = dict( 

387 size=size, section=section, priority=priority, component=component 

388 ) 

389 files[name][hashname] = md5 

390 

391 return files 

392 

393 

394################################################################################ 

395 

396 

397def send_mail(message: str, whitelists: Optional[list[str | None]] = None) -> None: 

398 """sendmail wrapper, takes a message string 

399 

400 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists 

401 everything, otherwise an address is whitelisted if it is 

402 included in any of the lists. 

403 In addition a global whitelist can be specified in 

404 Dinstall::MailWhiteList. 

405 """ 

406 

407 msg = daklib.mail.parse_mail(message) 

408 

409 # The incoming message might be UTF-8, but outgoing mail should 

410 # use a legacy-compatible encoding. Set the content to the 

411 # text to make sure this is the case. 

412 # Note that this does not work with multipart messages. 

413 msg.set_content(msg.get_payload(), cte="quoted-printable") 

414 

415 # Check whether we're supposed to be sending mail 

416 call_sendmail = True 

417 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]: 

418 call_sendmail = False 

419 

420 if whitelists is None or None in whitelists: 

421 whitelists = [] 

422 if Cnf.get("Dinstall::MailWhiteList", ""): 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 whitelists.append(Cnf["Dinstall::MailWhiteList"]) 

424 if len(whitelists) != 0: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true

425 whitelist = [] 

426 for path in whitelists: 

427 assert path is not None 

428 with open(path, "r") as whitelist_in: 

429 for line in whitelist_in: 

430 if not re_whitespace_comment.match(line): 

431 if re_re_mark.match(line): 

432 whitelist.append( 

433 re.compile(re_re_mark.sub("", line.strip(), 1)) 

434 ) 

435 else: 

436 whitelist.append(re.compile(re.escape(line.strip()))) 

437 

438 # Fields to check. 

439 fields = ["To", "Bcc", "Cc"] 

440 for field in fields: 

441 # Check each field 

442 value = msg.get(field, None) 

443 if value is not None: 

444 match = [] 

445 for item in value.split(","): 

446 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer( 

447 item.strip() 

448 ) 

449 mail_whitelisted = 0 

450 for wr in whitelist: 

451 if wr.match(mail): 

452 mail_whitelisted = 1 

453 break 

454 if not mail_whitelisted: 

455 print("Skipping {0} since it's not whitelisted".format(item)) 

456 continue 

457 match.append(item) 

458 

459 # Doesn't have any mail in whitelist so remove the header 

460 if len(match) == 0: 

461 del msg[field] 

462 else: 

463 msg.replace_header(field, ", ".join(match)) 

464 

465 # Change message fields in order if we don't have a To header 

466 if "To" not in msg: 

467 fields.reverse() 

468 for field in fields: 

469 if field in msg: 

470 msg[fields[-1]] = msg[field] 

471 del msg[field] 

472 break 

473 else: 

474 # return, as we removed all recipients. 

475 call_sendmail = False 

476 

477 # sign mail 

478 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 478 ↛ 487line 478 didn't jump to line 487 because the condition on line 478 was always true

479 msg = daklib.mail.sign_mail( 

480 msg, 

481 keyids=[mailkey], 

482 pubring=Cnf.get("Dinstall::SigningPubKeyring") or None, 

483 homedir=Cnf.get("Dinstall::SigningHomedir") or None, 

484 passphrase_file=Cnf.get("Dinstall::SigningPassphraseFile") or None, 

485 ) 

486 

487 msg_bytes = msg.as_bytes(policy=email.policy.default) 

488 

489 maildir = Cnf.get("Dir::Mail") 

490 if maildir: 490 ↛ 497line 490 didn't jump to line 497 because the condition on line 490 was always true

491 path = os.path.join(maildir, datetime.datetime.now().isoformat()) 

492 path = find_next_free(path) 

493 with open(path, "wb") as fh: 

494 fh.write(msg_bytes) 

495 

496 # Invoke sendmail 

497 if not call_sendmail: 

498 return 

499 try: 

500 subprocess.run( 

501 Cnf["Dinstall::SendmailCommand"].split(), 

502 input=msg_bytes, 

503 check=True, 

504 stdout=subprocess.PIPE, 

505 stderr=subprocess.STDOUT, 

506 ) 

507 except subprocess.CalledProcessError as e: 

508 raise SendmailFailedError(e.output.decode().rstrip()) 

509 

510 

511################################################################################ 

512 

513 

514def poolify(source: str) -> str: 

515 """convert `source` name into directory path used in pool structure""" 

516 if source[:3] == "lib": 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 return source[:4] + "/" + source + "/" 

518 else: 

519 return source[:1] + "/" + source + "/" 

520 

521 

522################################################################################ 

523 

524 

525def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None: 

526 if os.path.exists(dest) and os.path.isdir(dest): 

527 dest_dir = dest 

528 else: 

529 dest_dir = os.path.dirname(dest) 

530 if not os.path.lexists(dest_dir): 

531 umask = os.umask(00000) 

532 os.makedirs(dest_dir, 0o2775) 

533 os.umask(umask) 

534 # print "Moving %s to %s..." % (src, dest) 

535 if os.path.exists(dest) and os.path.isdir(dest): 

536 dest += "/" + os.path.basename(src) 

537 # Don't overwrite unless forced to 

538 if os.path.lexists(dest): 

539 if not overwrite: 

540 fubar("Can't move %s to %s - file already exists." % (src, dest)) 

541 else: 

542 if not os.access(dest, os.W_OK): 

543 fubar( 

544 "Can't move %s to %s - can't write to existing file." % (src, dest) 

545 ) 

546 shutil.copy2(src, dest) 

547 os.chmod(dest, perms) 

548 os.unlink(src) 

549 

550 

551################################################################################ 

552 

553 

554def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str: 

555 """Perform a substition of template""" 

556 with open(filename) as templatefile: 

557 template = templatefile.read() 

558 for k, v in subst_map.items(): 

559 template = template.replace(k, str(v)) 

560 return template 

561 

562 

563################################################################################ 

564 

565 

566def fubar(msg: str, exit_code: int = 1) -> NoReturn: 

567 """print error message and exit program""" 

568 print("E:", msg, file=sys.stderr) 

569 sys.exit(exit_code) 

570 

571 

572def warn(msg: str) -> None: 

573 """print warning message""" 

574 print("W:", msg, file=sys.stderr) 

575 

576 

577################################################################################ 

578 

579 

580def whoami() -> str: 

581 """get user name 

582 

583 Returns the user name with a laughable attempt at rfc822 conformancy 

584 (read: removing stray periods). 

585 """ 

586 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "") 

587 

588 

589def getusername() -> str: 

590 """get login name""" 

591 return pwd.getpwuid(os.getuid())[0] 

592 

593 

594################################################################################ 

595 

596 

597def size_type(c: Union[int, float]) -> str: 

598 t = " B" 

599 if c > 10240: 

600 c = c / 1024 

601 t = " KB" 

602 if c > 10240: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 c = c / 1024 

604 t = " MB" 

605 return "%d%s" % (c, t) 

606 

607 

608################################################################################ 

609 

610 

611def find_next_free(dest: str, too_many: int = 100) -> str: 

612 extra = 0 

613 orig_dest = dest 

614 while os.path.lexists(dest) and extra < too_many: 

615 dest = orig_dest + "." + repr(extra) 

616 extra += 1 

617 if extra >= too_many: 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true

618 raise NoFreeFilenameError 

619 return dest 

620 

621 

622################################################################################ 

623 

624 

625def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str: 

626 return sep.join(x if x is not None else "" for x in original) 

627 

628 

629################################################################################ 

630 

631 

632def prefix_multi_line_string( 

633 lines: str, prefix: str, include_blank_lines: bool = False 

634) -> str: 

635 """prepend `prefix` to each line in `lines`""" 

636 return "\n".join( 

637 prefix + cleaned_line 

638 for line in lines.split("\n") 

639 if (cleaned_line := line.strip()) or include_blank_lines 

640 ) 

641 

642 

643################################################################################ 

644 

645 

646def join_with_commas_and(list: Sequence[str]) -> str: 

647 if len(list) == 0: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true

648 return "nothing" 

649 if len(list) == 1: 649 ↛ 651line 649 didn't jump to line 651 because the condition on line 649 was always true

650 return list[0] 

651 return ", ".join(list[:-1]) + " and " + list[-1] 

652 

653 

654################################################################################ 

655 

656 

657def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str: 

658 pp_deps = ( 

659 f"{pkg} ({constraint} {version})" if constraint else pkg 

660 for pkg, constraint, version in deps 

661 ) 

662 return " |".join(pp_deps) 

663 

664 

665################################################################################ 

666 

667 

668def get_conf() -> apt_pkg.Configuration: 

669 return Cnf 

670 

671 

672################################################################################ 

673 

674 

675def parse_args(Options: apt_pkg.Configuration) -> tuple[str, str, str, bool]: 

676 """Handle -a, -c and -s arguments; returns them as SQL constraints""" 

677 # XXX: This should go away and everything which calls it be converted 

678 # to use SQLA properly. For now, we'll just fix it not to use 

679 # the old Pg interface though 

680 session = DBConn().session() 

681 # Process suite 

682 if Options["Suite"]: 682 ↛ 700line 682 didn't jump to line 700 because the condition on line 682 was always true

683 suite_ids_list = [] 

684 for suitename in split_args(Options["Suite"]): 

685 suite = get_suite(suitename, session=session) 

686 if not suite or suite.suite_id is None: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true

687 warn( 

688 "suite '%s' not recognised." 

689 % (suite and suite.suite_name or suitename) 

690 ) 

691 else: 

692 suite_ids_list.append(suite.suite_id) 

693 if suite_ids_list: 693 ↛ 698line 693 didn't jump to line 698 because the condition on line 693 was always true

694 con_suites = "AND su.id IN (%s)" % ", ".join( 

695 [str(i) for i in suite_ids_list] 

696 ) 

697 else: 

698 fubar("No valid suite given.") 

699 else: 

700 con_suites = "" 

701 

702 # Process component 

703 if Options["Component"]: 703 ↛ 704line 703 didn't jump to line 704 because the condition on line 703 was never true

704 component_ids_list = [] 

705 for componentname in split_args(Options["Component"]): 

706 component = get_component(componentname, session=session) 

707 if component is None: 

708 warn("component '%s' not recognised." % (componentname)) 

709 else: 

710 component_ids_list.append(component.component_id) 

711 if component_ids_list: 

712 con_components = "AND c.id IN (%s)" % ", ".join( 

713 [str(i) for i in component_ids_list] 

714 ) 

715 else: 

716 fubar("No valid component given.") 

717 else: 

718 con_components = "" 

719 

720 # Process architecture 

721 con_architectures = "" 

722 check_source = False 

723 if Options["Architecture"]: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 arch_ids_list = [] 

725 for archname in split_args(Options["Architecture"]): 

726 if archname == "source": 

727 check_source = True 

728 else: 

729 arch = get_architecture(archname, session=session) 

730 if arch is None: 

731 warn("architecture '%s' not recognised." % (archname)) 

732 else: 

733 arch_ids_list.append(arch.arch_id) 

734 if arch_ids_list: 

735 con_architectures = "AND a.id IN (%s)" % ", ".join( 

736 [str(i) for i in arch_ids_list] 

737 ) 

738 else: 

739 if not check_source: 

740 fubar("No valid architecture given.") 

741 else: 

742 check_source = True 

743 

744 return (con_suites, con_architectures, con_components, check_source) 

745 

746 

747################################################################################ 

748 

749 

750@functools.total_ordering 

751class ArchKey: 

752 """ 

753 Key object for use in sorting lists of architectures. 

754 

755 Sorts normally except that 'source' dominates all others. 

756 """ 

757 

758 __slots__ = ["arch", "issource"] 

759 

760 def __init__(self, arch: str, *args): 

761 self.arch = arch 

762 self.issource = arch == "source" 

763 

764 def __lt__(self, other: "ArchKey") -> bool: 

765 if self.issource: 

766 return not other.issource 

767 if other.issource: 

768 return False 

769 return self.arch < other.arch 

770 

771 @override 

772 def __eq__(self, other: object) -> bool: 

773 if not isinstance(other, ArchKey): 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true

774 return NotImplemented 

775 return self.arch == other.arch 

776 

777 

778################################################################################ 

779 

780 

781def split_args(s: str, dwim: bool = True) -> list[str]: 

782 """ 

783 Split command line arguments which can be separated by either commas 

784 or whitespace. If dwim is set, it will complain about string ending 

785 in comma since this usually means someone did 'dak ls -a i386, m68k 

786 foo' or something and the inevitable confusion resulting from 'm68k' 

787 being treated as an argument is undesirable. 

788 """ 

789 

790 if s.find(",") == -1: 790 ↛ 793line 790 didn't jump to line 793 because the condition on line 790 was always true

791 return s.split() 

792 else: 

793 if s[-1:] == "," and dwim: 

794 fubar("split_args: found trailing comma, spurious space maybe?") 

795 return s.split(",") 

796 

797 

798################################################################################ 

799 

800 

801def split_args_or_none(s: str | None, dwim: bool = True) -> list[str] | None: 

802 """ 

803 Split command line arguments like `split_args`, but return `None` for empty string 

804 """ 

805 if not s: 

806 return None 

807 return split_args(s, dwim) 

808 

809 

810################################################################################ 

811 

812 

813def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]: 

814 if keyrings is None: 814 ↛ 817line 814 didn't jump to line 817 because the condition on line 814 was always true

815 keyrings = get_active_keyring_paths() 

816 

817 return ["--keyring={}".format(path) for path in keyrings] 

818 

819 

820################################################################################ 

821 

822 

823def _gpg_get_addresses_from_listing(output: bytes) -> list[str]: 

824 addresses: list[str] = [] 

825 

826 for line in output.split(b"\n"): 

827 parts = line.split(b":") 

828 if parts[0] not in (b"uid", b"pub"): 

829 continue 

830 if parts[1] in (b"i", b"d", b"r"): 830 ↛ 832line 830 didn't jump to line 832 because the condition on line 830 was never true

831 # Skip uid that is invalid, disabled or revoked 

832 continue 

833 try: 

834 uid_bytes = parts[9] 

835 except IndexError: 

836 continue 

837 try: 

838 uid = uid_bytes.decode(encoding="utf-8") 

839 except UnicodeDecodeError: 

840 # If the uid is not valid UTF-8, we assume it is an old uid 

841 # still encoding in Latin-1. 

842 uid = uid_bytes.decode(encoding="latin1") 

843 m = re_parse_maintainer.match(uid) 

844 if not m: 

845 continue 

846 address = m.group(2) 

847 if address.endswith("@debian.org"): 847 ↛ 850line 847 didn't jump to line 850 because the condition on line 847 was never true

848 # prefer @debian.org addresses 

849 # TODO: maybe not hardcode the domain 

850 addresses.insert(0, address) 

851 else: 

852 addresses.append(address) 

853 

854 return addresses 

855 

856 

857def gpg_get_key_addresses(fingerprint: str) -> list[str]: 

858 """retreive email addresses from gpg key uids for a given fingerprint""" 

859 addresses = key_uid_email_cache.get(fingerprint) 

860 if addresses is not None: 

861 return addresses 

862 

863 try: 

864 cmd = ["gpg", "--no-default-keyring"] 

865 cmd.extend(gpg_keyring_args()) 

866 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint]) 

867 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) 

868 except subprocess.CalledProcessError: 

869 addresses = [] 

870 else: 

871 addresses = _gpg_get_addresses_from_listing(output) 

872 

873 key_uid_email_cache[fingerprint] = addresses 

874 return addresses 

875 

876 

877################################################################################ 

878 

879 

880def open_ldap_connection() -> Any: 

881 """open connection to the configured LDAP server""" 

882 import ldap # type: ignore 

883 

884 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"] 

885 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile") 

886 

887 conn = ldap.initialize(LDAPServer) 

888 

889 if ca_cert_file: 

890 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD) 

891 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file) 

892 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True) 

893 conn.start_tls_s() 

894 

895 conn.simple_bind_s("", "") 

896 

897 return conn 

898 

899 

900################################################################################ 

901 

902 

903def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]: 

904 """retrieve login from LDAP linked to a given fingerprint""" 

905 import ldap 

906 

907 conn = open_ldap_connection() 

908 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

909 Attrs = conn.search_s( 

910 LDAPDn, 

911 ldap.SCOPE_ONELEVEL, 

912 "(keyfingerprint=%s)" % fingerprint, 

913 ["uid", "keyfingerprint"], 

914 ) 

915 login: dict[str, str] = {} 

916 for elem in Attrs: 

917 fpr = elem[1]["keyFingerPrint"][0].decode() 

918 uid = elem[1]["uid"][0].decode() 

919 login[fpr] = uid 

920 return login 

921 

922 

923################################################################################ 

924 

925 

926def get_users_from_ldap() -> dict[str, str]: 

927 """retrieve login and user names from LDAP""" 

928 import ldap 

929 

930 conn = open_ldap_connection() 

931 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

932 Attrs = conn.search_s( 

933 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"] 

934 ) 

935 users: dict[str, str] = {} 

936 for elem in Attrs: 

937 elem = elem[1] 

938 name = [] 

939 for k in ("cn", "mn", "sn"): 

940 try: 

941 value = elem[k][0].decode() 

942 if value and value[0] != "-": 

943 name.append(value) 

944 except KeyError: 

945 pass 

946 users[" ".join(name)] = elem["uid"][0] 

947 return users 

948 

949 

950################################################################################ 

951 

952 

953def clean_symlink(src: str, dest: str, root: str) -> str: 

954 """ 

955 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'. 

956 Returns fixed 'src' 

957 """ 

958 src = src.replace(root, "", 1) 

959 dest = dest.replace(root, "", 1) 

960 dest = os.path.dirname(dest) 

961 new_src = "../" * len(dest.split("/")) 

962 return new_src + src 

963 

964 

965################################################################################ 

966 

967 

968def temp_dirname( 

969 parent: Optional[str] = None, 

970 prefix: str = "dak", 

971 suffix: str = "", 

972 mode: Optional[int] = None, 

973 group: Optional[str] = None, 

974) -> str: 

975 """ 

976 Return a secure and unique directory by pre-creating it. 

977 

978 :param parent: If non-null it will be the directory the directory is pre-created in. 

979 :param prefix: The filename will be prefixed with this string 

980 :param suffix: The filename will end with this string 

981 :param mode: If set the file will get chmodded to those permissions 

982 :param group: If set the file will get chgrped to the specified group. 

983 :return: Returns a pair (fd, name) 

984 

985 """ 

986 

987 tfname = tempfile.mkdtemp(suffix, prefix, parent) 

988 if mode is not None: 988 ↛ 990line 988 didn't jump to line 990 because the condition on line 988 was always true

989 os.chmod(tfname, mode) 

990 if group is not None: 990 ↛ 991line 990 didn't jump to line 991 because the condition on line 990 was never true

991 gid = grp.getgrnam(group).gr_gid 

992 os.chown(tfname, -1, gid) 

993 return tfname 

994 

995 

996################################################################################ 

997 

998 

999def get_changes_files(from_dir: str) -> list[str]: 

1000 """ 

1001 Takes a directory and lists all .changes files in it (as well as chdir'ing 

1002 to the directory; this is due to broken behaviour on the part of p-u/p-a 

1003 when you're not in the right place) 

1004 

1005 Returns a list of filenames 

1006 """ 

1007 try: 

1008 # Much of the rest of p-u/p-a depends on being in the right place 

1009 os.chdir(from_dir) 

1010 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")] 

1011 except OSError as e: 

1012 fubar("Failed to read list from directory %s (%s)" % (from_dir, e)) 

1013 

1014 return changes_files 

1015 

1016 

1017################################################################################ 

1018 

1019 

1020Cnf: apt_pkg.Configuration = config.Config().Cnf 

1021 

1022################################################################################ 

1023 

1024 

1025def parse_wnpp_bug_file( 

1026 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm", 

1027) -> dict[str, list[str]]: 

1028 """ 

1029 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm 

1030 Well, actually it parsed a local copy, but let's document the source 

1031 somewhere ;) 

1032 

1033 returns a dict associating source package name with a list of open wnpp 

1034 bugs (Yes, there might be more than one) 

1035 """ 

1036 

1037 try: 

1038 with open(file) as f: 

1039 lines = f.readlines() 

1040 except OSError: 

1041 print( 

1042 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any." 

1043 % file 

1044 ) 

1045 lines = [] 

1046 wnpp = {} 

1047 

1048 for line in lines: 

1049 splited_line = line.split(": ", 1) 

1050 if len(splited_line) > 1: 

1051 wnpp[splited_line[0]] = splited_line[1].split("|") 

1052 

1053 for source in wnpp: 

1054 bugs = [] 

1055 for wnpp_bug in wnpp[source]: 

1056 m = re.search(r"(\d)+", wnpp_bug) 

1057 assert m is not None 

1058 bug_no = m.group() 

1059 if bug_no: 

1060 bugs.append(bug_no) 

1061 wnpp[source] = bugs 

1062 return wnpp 

1063 

1064 

1065################################################################################ 

1066 

1067 

1068def deb_extract_control(path: str) -> bytes: 

1069 """extract DEBIAN/control from a binary package""" 

1070 return apt_inst.DebFile(path).control.extractdata("control") 

1071 

1072 

1073################################################################################ 

1074 

1075 

1076def mail_addresses_for_upload( 

1077 maintainer: str, 

1078 changed_by: str, 

1079 fingerprint: str, 

1080 authorized_by_fingerprint: Optional[str], 

1081) -> list[str]: 

1082 """mail addresses to contact for an upload 

1083 

1084 :param maintainer: Maintainer field of the .changes file 

1085 :param changed_by: Changed-By field of the .changes file 

1086 :param fingerprint: fingerprint of the key used to sign the upload 

1087 :return: list of RFC 2047-encoded mail addresses to contact regarding 

1088 this upload 

1089 """ 

1090 recipients = Cnf.value_list("Dinstall::UploadMailRecipients") 

1091 if not recipients: 1091 ↛ 1100line 1091 didn't jump to line 1100 because the condition on line 1091 was always true

1092 recipients = [ 

1093 "maintainer", 

1094 "changed_by", 

1095 "signer", 

1096 "authorized_by", 

1097 ] 

1098 

1099 # Ensure signer and authorized_by are last if present 

1100 for r in ("signer", "authorized_by"): 

1101 try: 

1102 recipients.remove(r) 

1103 except ValueError: 

1104 pass 

1105 else: 

1106 recipients.append(r) 

1107 

1108 # Compute the set of addresses of the recipients 

1109 addresses = set() # Name + email 

1110 emails = set() # Email only, used to avoid duplicates 

1111 for recipient in recipients: 

1112 address: str | None 

1113 if recipient.startswith("mail:"): # Email hardcoded in config 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true

1114 address = recipient[5:] 

1115 elif recipient == "maintainer": 

1116 address = maintainer 

1117 elif recipient == "changed_by": 

1118 address = changed_by 

1119 elif recipient == "signer" or recipient == "authorized_by": 1119 ↛ 1129line 1119 didn't jump to line 1129 because the condition on line 1119 was always true

1120 fpr = fingerprint if recipient == "signer" else authorized_by_fingerprint 

1121 if not fpr: 1121 ↛ 1122line 1121 didn't jump to line 1122 because the condition on line 1121 was never true

1122 continue 

1123 fpr_addresses = gpg_get_key_addresses(fpr) 

1124 address = fpr_addresses[0] if fpr_addresses else None 

1125 if any(x in emails for x in fpr_addresses): 

1126 # The signer already gets a copy via another email 

1127 address = None 

1128 else: 

1129 raise Exception( 

1130 "Unsupported entry in {0}: {1}".format( 

1131 "Dinstall::UploadMailRecipients", recipient 

1132 ) 

1133 ) 

1134 

1135 if address is not None: 

1136 mail = fix_maintainer(address)[3] 

1137 if mail not in emails: 

1138 addresses.add(address) 

1139 emails.add(mail) 

1140 

1141 encoded_addresses = [fix_maintainer(e)[1] for e in addresses] 

1142 return encoded_addresses 

1143 

1144 

1145################################################################################ 

1146 

1147 

1148def call_editor_for_file(path: str) -> None: 

1149 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor")) 

1150 subprocess.check_call([editor, path]) 

1151 

1152 

1153################################################################################ 

1154 

1155 

1156def call_editor(text: str = "", suffix: str = ".txt") -> str: 

1157 """run editor and return the result as a string 

1158 

1159 :param text: initial text 

1160 :param suffix: extension for temporary file 

1161 :return: string with the edited text 

1162 """ 

1163 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh: 

1164 print(text, end="", file=fh) 

1165 fh.flush() 

1166 call_editor_for_file(fh.name) 

1167 fh.seek(0) 

1168 return fh.read() 

1169 

1170 

1171################################################################################ 

1172 

1173 

1174def check_reverse_depends( 

1175 removals: Iterable[str], 

1176 suite: str, 

1177 arches: Optional[Iterable[str]] = None, 

1178 session: "Session | None" = None, 

1179 cruft: bool = False, 

1180 quiet: bool = False, 

1181 include_arch_all: bool = True, 

1182) -> bool: 

1183 assert session is not None # TODO: remove default value... 

1184 

1185 dbsuite = get_suite(suite, session) 

1186 assert dbsuite is not None 

1187 overridesuite = ( 

1188 get_suite(dbsuite.overridesuite, session) if dbsuite.overridesuite else dbsuite 

1189 ) 

1190 assert overridesuite is not None 

1191 dep_problem = False 

1192 p2c = {} 

1193 all_broken: defaultdict[str, defaultdict[str, set[str]]] = defaultdict( 1193 ↛ exitline 1193 didn't jump to the function exit

1194 lambda: defaultdict(set) 

1195 ) 

1196 if arches: 1196 ↛ 1197line 1196 didn't jump to line 1197 because the condition on line 1196 was never true

1197 all_arches = set(arches) 

1198 else: 

1199 all_arches = set( 

1200 x.arch_string for x in get_suite_architectures(suite, session=session) 

1201 ) 

1202 all_arches -= set(["source", "all"]) 

1203 removal_set = set(removals) 

1204 metakey_d = get_or_set_metadatakey("Depends", session) 

1205 metakey_p = get_or_set_metadatakey("Provides", session) 

1206 if include_arch_all: 1206 ↛ 1209line 1206 didn't jump to line 1209 because the condition on line 1206 was always true

1207 rdep_architectures = all_arches | set(["all"]) 

1208 else: 

1209 rdep_architectures = all_arches 

1210 for architecture in rdep_architectures: 

1211 deps = {} 

1212 sources = {} 

1213 virtual_packages = {} 

1214 

1215 params: dict[str, object] = { 

1216 "suite_id": dbsuite.suite_id, 

1217 "metakey_d_id": metakey_d.key_id, 

1218 "metakey_p_id": metakey_p.key_id, 

1219 } 

1220 arch = get_architecture(architecture, session) 

1221 if arch is None: 1221 ↛ 1222line 1221 didn't jump to line 1222 because the condition on line 1221 was never true

1222 continue 

1223 params["arch_id"] = arch.arch_id 

1224 

1225 statement = sql.text( 

1226 """ 

1227 SELECT b.package, s.source, c.name as component, 

1228 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends, 

1229 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides 

1230 FROM binaries b 

1231 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id 

1232 JOIN source s ON b.source = s.id 

1233 JOIN files_archive_map af ON b.file = af.file_id 

1234 JOIN component c ON af.component_id = c.id 

1235 WHERE b.architecture = :arch_id""" 

1236 ) 

1237 query = session.execute(statement, params) 

1238 for package, source, component, depends, provides in query: 

1239 sources[package] = source 

1240 p2c[package] = component 

1241 if depends is not None: 1241 ↛ 1242line 1241 didn't jump to line 1242 because the condition on line 1241 was never true

1242 deps[package] = depends 

1243 # Maintain a counter for each virtual package. If a 

1244 # Provides: exists, set the counter to 0 and count all 

1245 # provides by a package not in the list for removal. 

1246 # If the counter stays 0 at the end, we know that only 

1247 # the to-be-removed packages provided this virtual 

1248 # package. 

1249 if provides is not None: 1249 ↛ 1250line 1249 didn't jump to line 1250 because the condition on line 1249 was never true

1250 for virtual_pkg in provides.split(","): 

1251 virtual_pkg = virtual_pkg.strip() 

1252 if virtual_pkg == package: 

1253 continue 

1254 if virtual_pkg not in virtual_packages: 

1255 virtual_packages[virtual_pkg] = 0 

1256 if package not in removals: 

1257 virtual_packages[virtual_pkg] += 1 

1258 

1259 # If a virtual package is only provided by the to-be-removed 

1260 # packages, treat the virtual package as to-be-removed too. 

1261 removal_set.update( 

1262 virtual_pkg 

1263 for virtual_pkg in virtual_packages 

1264 if not virtual_packages[virtual_pkg] 

1265 ) 

1266 

1267 # Check binary dependencies (Depends) 

1268 for package in deps: 1268 ↛ 1269line 1268 didn't jump to line 1269 because the loop on line 1268 never started

1269 if package in removals: 

1270 continue 

1271 try: 

1272 parsed_dep = apt_pkg.parse_depends(deps[package]) 

1273 except ValueError as e: 

1274 print("Error for package %s: %s" % (package, e)) 

1275 parsed_dep = [] 

1276 for dep in parsed_dep: 

1277 # Check for partial breakage. If a package has a ORed 

1278 # dependency, there is only a dependency problem if all 

1279 # packages in the ORed depends will be removed. 

1280 unsat = 0 

1281 for dep_package, _, _ in dep: 

1282 if dep_package in removals: 

1283 unsat += 1 

1284 if unsat == len(dep): 

1285 component = p2c[package] 

1286 source = sources[package] 

1287 if component != "main": 

1288 source = "%s/%s" % (source, component) 

1289 all_broken[source][package].add(architecture) 

1290 dep_problem = True 

1291 

1292 if all_broken and not quiet: 1292 ↛ 1293line 1292 didn't jump to line 1293 because the condition on line 1292 was never true

1293 if cruft: 

1294 print(" - broken Depends:") 

1295 else: 

1296 print("# Broken Depends:") 

1297 for source, bindict in sorted(all_broken.items()): 

1298 lines = [] 

1299 for binary, bin_arches in sorted(bindict.items()): 

1300 if bin_arches == all_arches or "all" in bin_arches: 

1301 lines.append(binary) 

1302 else: 

1303 lines.append("%s [%s]" % (binary, " ".join(sorted(bin_arches)))) 

1304 if cruft: 

1305 print(" %s: %s" % (source, lines[0])) 

1306 else: 

1307 print("%s: %s" % (source, lines[0])) 

1308 for line in lines[1:]: 

1309 if cruft: 

1310 print(" " + " " * (len(source) + 2) + line) 

1311 else: 

1312 print(" " * (len(source) + 2) + line) 

1313 if not cruft: 

1314 print() 

1315 

1316 # Check source dependencies (Build-Depends and Build-Depends-Indep) 

1317 all_broken_bd: dict[str, set[str]] = defaultdict(set) 

1318 metakey_bd = get_or_set_metadatakey("Build-Depends", session) 

1319 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session) 

1320 metakey_ids: tuple[int, ...] 

1321 if include_arch_all: 1321 ↛ 1324line 1321 didn't jump to line 1324 because the condition on line 1321 was always true

1322 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id) 

1323 else: 

1324 metakey_ids = (metakey_bd.key_id,) 

1325 

1326 params = { 

1327 "suite_id": dbsuite.suite_id, 

1328 "metakey_ids": metakey_ids, 

1329 } 

1330 statement = sql.text( 

1331 """ 

1332 SELECT s.source, replace(string_agg(trim(sm.value), ', '), ',,', ',') as build_dep 

1333 FROM source s 

1334 JOIN source_metadata sm ON s.id = sm.src_id 

1335 WHERE s.id in 

1336 (SELECT src FROM newest_src_association 

1337 WHERE suite = :suite_id) 

1338 AND sm.key_id in :metakey_ids 

1339 GROUP BY s.id, s.source""" 

1340 ) 

1341 query = session.execute(statement, params) 

1342 for source, build_dep in query: 

1343 if source in removals: 

1344 continue 

1345 parsed_dep = [] 

1346 if build_dep is not None: 1346 ↛ 1353line 1346 didn't jump to line 1353 because the condition on line 1346 was always true

1347 # Remove [arch] information since we want to see breakage on all arches 

1348 build_dep = re_build_dep_arch.sub("", build_dep) 

1349 try: 

1350 parsed_dep = apt_pkg.parse_src_depends(build_dep) 

1351 except ValueError as e: 

1352 print("Error for source %s: %s" % (source, e)) 

1353 for dep in parsed_dep: 

1354 unsat = 0 

1355 for dep_package, _, _ in dep: 

1356 if dep_package in removals: 1356 ↛ 1357line 1356 didn't jump to line 1357 because the condition on line 1356 was never true

1357 unsat += 1 

1358 if unsat == len(dep): 1358 ↛ 1359line 1358 didn't jump to line 1359

1359 component = ( 

1360 session.query(Component.component_name) 

1361 .join(Component.overrides) 

1362 .filter(Override.suite == overridesuite) 

1363 .filter( 

1364 Override.package 

1365 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source) 

1366 ) 

1367 .join(Override.overridetype) 

1368 .filter(OverrideType.overridetype == "dsc") 

1369 .scalar() 

1370 ) 

1371 key = source 

1372 if component != "main": 

1373 key = "%s/%s" % (source, component) 

1374 all_broken_bd[key].add(pp_deps(dep)) 

1375 dep_problem = True 

1376 

1377 if all_broken_bd and not quiet: 1377 ↛ 1378line 1377 didn't jump to line 1378 because the condition on line 1377 was never true

1378 if cruft: 

1379 print(" - broken Build-Depends:") 

1380 else: 

1381 print("# Broken Build-Depends:") 

1382 for source, bdeps in sorted(all_broken_bd.items()): 

1383 sorted_bdeps = sorted(bdeps) 

1384 if cruft: 

1385 print(" %s: %s" % (source, sorted_bdeps[0])) 

1386 else: 

1387 print("%s: %s" % (source, sorted_bdeps[0])) 

1388 for bdep in sorted_bdeps[1:]: 

1389 if cruft: 

1390 print(" " + " " * (len(source) + 2) + bdep) 

1391 else: 

1392 print(" " * (len(source) + 2) + bdep) 

1393 if not cruft: 

1394 print() 

1395 

1396 return dep_problem 

1397 

1398 

1399################################################################################ 

1400 

1401 

1402def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]: 

1403 """source packages referenced via Built-Using 

1404 

1405 :param control: control file to take Built-Using field from 

1406 :return: list of (source_name, source_version) pairs 

1407 """ 

1408 built_using = control.get("Built-Using", None) 

1409 if built_using is None: 

1410 return [] 

1411 

1412 bu = [] 

1413 for dep in apt_pkg.parse_depends(built_using): 

1414 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field" 

1415 source_name, source_version, comp = dep[0] 

1416 assert comp == "=", "Built-Using must contain strict dependencies" 

1417 bu.append((source_name, source_version)) 

1418 

1419 return bu 

1420 

1421 

1422################################################################################ 

1423 

1424 

1425def is_in_debug_section(control: Mapping[str, str] | MetadataProxy) -> bool: 

1426 """binary package is a debug package 

1427 

1428 :param control: control file of binary package 

1429 :return: True if the binary package is a debug package 

1430 """ 

1431 section = control["Section"].split("/", 1)[-1] 

1432 auto_built_package = control.get("Auto-Built-Package") 

1433 return section == "debug" and auto_built_package == "debug-symbols" 

1434 

1435 

1436################################################################################ 

1437 

1438 

1439def find_possibly_compressed_file(filename: str) -> str: 

1440 """ 

1441 

1442 :param filename: path to a control file (Sources, Packages, etc) to 

1443 look for 

1444 :return: path to the (possibly compressed) control file, or null if the 

1445 file doesn't exist 

1446 """ 

1447 _compressions = ("", ".xz", ".gz", ".bz2") 

1448 

1449 for ext in _compressions: 1449 ↛ 1454line 1449 didn't jump to line 1454 because the loop on line 1449 didn't complete

1450 _file = filename + ext 

1451 if os.path.exists(_file): 

1452 return _file 

1453 

1454 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename) 

1455 

1456 

1457################################################################################ 

1458 

1459 

1460def parse_boolean_from_user(value: str) -> bool: 

1461 value = value.lower() 

1462 if value in {"yes", "true", "enable", "enabled"}: 

1463 return True 

1464 if value in {"no", "false", "disable", "disabled"}: 1464 ↛ 1466line 1464 didn't jump to line 1466 because the condition on line 1464 was always true

1465 return False 

1466 raise ValueError("Not sure whether %s should be a True or a False" % value) 

1467 

1468 

1469def suite_suffix(suite_name: str) -> str: 

1470 """Return suite_suffix for the given suite""" 

1471 suffix = Cnf.find("Dinstall::SuiteSuffix", "") 

1472 if suffix == "": 1472 ↛ 1474line 1472 didn't jump to line 1474 because the condition on line 1472 was always true

1473 return "" 

1474 elif "Dinstall::SuiteSuffixSuites" not in Cnf: 

1475 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future 

1476 return suffix 

1477 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"): 

1478 return suffix 

1479 return "" 

1480 

1481 

1482################################################################################ 

1483 

1484 

1485def process_buildinfos( 

1486 directory: str, 

1487 buildinfo_files: "Iterable[daklib.upload.HashedFile]", 

1488 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1489 logger: "daklib.daklog.Logger", 

1490) -> None: 

1491 """Copy buildinfo files into Dir::BuildinfoArchive 

1492 

1493 :param directory: directory where .changes is stored 

1494 :param buildinfo_files: names of buildinfo files 

1495 :param fs_transaction: FilesystemTransaction instance 

1496 :param logger: logger instance 

1497 """ 

1498 

1499 if "Dir::BuildinfoArchive" not in Cnf: 

1500 return 

1501 

1502 target_dir = os.path.join( 

1503 Cnf["Dir::BuildinfoArchive"], 

1504 datetime.datetime.now().strftime("%Y/%m/%d"), 

1505 ) 

1506 

1507 for f in buildinfo_files: 

1508 src = os.path.join(directory, f.filename) 

1509 dst = find_next_free(os.path.join(target_dir, f.filename)) 

1510 

1511 logger.log(["Archiving", f.filename]) 

1512 fs_transaction.copy(src, dst, mode=0o644) 

1513 

1514 

1515################################################################################ 

1516 

1517 

1518def move_to_morgue( 

1519 morguesubdir: str, 

1520 filenames: Iterable[str], 

1521 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1522 logger: "daklib.daklog.Logger", 

1523) -> None: 

1524 """Move a file to the correct dir in morgue 

1525 

1526 :param morguesubdir: subdirectory of morgue where this file needs to go 

1527 :param filenames: names of files 

1528 :param fs_transaction: FilesystemTransaction instance 

1529 :param logger: logger instance 

1530 """ 

1531 

1532 assert Cnf["Dir::Base"] 

1533 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf["Dir::Base"], "morgue")) 

1534 

1535 # Build directory as morguedir/morguesubdir/year/month/day 

1536 now = datetime.datetime.now() 

1537 dest = os.path.join( 

1538 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day 

1539 ) 

1540 

1541 for filename in filenames: 

1542 dest_filename = dest + "/" + os.path.basename(filename) 

1543 # If the destination file exists; try to find another filename to use 

1544 if os.path.lexists(dest_filename): 1544 ↛ 1545line 1544 didn't jump to line 1545 because the condition on line 1544 was never true

1545 dest_filename = find_next_free(dest_filename) 

1546 logger.log(["move to morgue", filename, dest_filename]) 

1547 fs_transaction.move(filename, dest_filename)