Coverage for daklib/utils.py: 57%

715 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-08-26 22:11 +0000

1# vim:set et ts=4 sw=4: 

2 

3"""Utility functions 

4 

5@contact: Debian FTP Master <ftpmaster@debian.org> 

6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org> 

7@license: GNU General Public License version 2 or later 

8""" 

9 

10# This program is free software; you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation; either version 2 of the License, or 

13# (at your option) any later version. 

14 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19 

20# You should have received a copy of the GNU General Public License 

21# along with this program; if not, write to the Free Software 

22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

23 

24import datetime 

25import email.policy 

26import errno 

27import functools 

28import grp 

29import os 

30import pwd 

31import re 

32import shutil 

33import subprocess 

34import sys 

35import tempfile 

36from collections import defaultdict 

37from collections.abc import Iterable, Mapping, Sequence 

38from typing import TYPE_CHECKING, Literal, NoReturn, Optional, Union 

39 

40import apt_inst 

41import apt_pkg 

42import sqlalchemy.sql as sql 

43 

44import daklib.config as config 

45import daklib.mail 

46from daklib.dbconn import ( 

47 Architecture, 

48 Component, 

49 DBConn, 

50 Override, 

51 OverrideType, 

52 get_active_keyring_paths, 

53 get_architecture, 

54 get_component, 

55 get_or_set_metadatakey, 

56 get_suite, 

57 get_suite_architectures, 

58) 

59 

60from .dak_exceptions import ( 

61 InvalidDscError, 

62 NoFilesFieldError, 

63 NoFreeFilenameError, 

64 ParseChangesError, 

65 SendmailFailedError, 

66 UnknownFormatError, 

67) 

68from .formats import parse_format, validate_changes_format 

69from .gpg import SignedFile 

70from .regexes import ( 

71 re_build_dep_arch, 

72 re_issource, 

73 re_multi_line_field, 

74 re_parse_maintainer, 

75 re_re_mark, 

76 re_single_line_field, 

77 re_srchasver, 

78 re_whitespace_comment, 

79) 

80from .srcformats import get_format_from_string 

81from .textutils import fix_maintainer 

82 

83if TYPE_CHECKING: 83 ↛ 84line 83 didn't jump to line 84, because the condition on line 83 was never true

84 import daklib.daklog 

85 import daklib.fstransactions 

86 import daklib.upload 

87 

88################################################################################ 

89 

90key_uid_email_cache: dict[str, list[str]] = ( 

91 {} 

92) #: Cache for email addresses from gpg key uids 

93 

94################################################################################ 

95 

96 

97def input_or_exit(prompt: Optional[str] = None) -> str: 

98 try: 

99 return input(prompt) 

100 except EOFError: 

101 sys.exit("\nUser interrupt (^D).") 

102 

103 

104################################################################################ 

105 

106 

107def extract_component_from_section(section: str) -> tuple[str, str]: 

108 """split "section" into "section", "component" parts 

109 

110 If "component" is not given, "main" is used instead. 

111 

112 :return: tuple (section, component) 

113 """ 

114 if section.find("/") != -1: 

115 return section, section.split("/", 1)[0] 

116 return section, "main" 

117 

118 

119################################################################################ 

120 

121 

122def parse_deb822( 

123 armored_contents: bytes, signing_rules: Literal[-1, 0, 1] = 0, keyrings=None 

124) -> dict[str, str]: 

125 require_signature = True 

126 if keyrings is None: 126 ↛ 130line 126 didn't jump to line 130, because the condition on line 126 was never false

127 keyrings = [] 

128 require_signature = False 

129 

130 signed_file = SignedFile( 

131 armored_contents, keyrings=keyrings, require_signature=require_signature 

132 ) 

133 contents = signed_file.contents.decode("utf-8") 

134 

135 error = "" 

136 changes = {} 

137 

138 # Split the lines in the input, keeping the linebreaks. 

139 lines = contents.splitlines(True) 

140 

141 if len(lines) == 0: 

142 raise ParseChangesError("[Empty changes file]") 

143 

144 # Reindex by line number so we can easily verify the format of 

145 # .dsc files... 

146 index = 0 

147 indexed_lines = {} 

148 for line in lines: 

149 index += 1 

150 indexed_lines[index] = line[:-1] 

151 

152 num_of_lines = len(indexed_lines) 

153 index = 0 

154 first = -1 

155 while index < num_of_lines: 

156 index += 1 

157 line = indexed_lines[index] 

158 if line == "" and signing_rules == 1: 158 ↛ 159line 158 didn't jump to line 159, because the condition on line 158 was never true

159 if index != num_of_lines: 

160 raise InvalidDscError(index) 

161 break 

162 if slf := re_single_line_field.match(line): 

163 field = slf.groups()[0].lower() 

164 changes[field] = slf.groups()[1] 

165 first = 1 

166 continue 

167 if line == " .": 

168 changes[field] += "\n" 

169 continue 

170 if mlf := re_multi_line_field.match(line): 

171 if first == -1: 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true

172 raise ParseChangesError( 

173 "'%s'\n [Multi-line field continuing on from nothing?]" % (line) 

174 ) 

175 if first == 1 and changes[field] != "": 

176 changes[field] += "\n" 

177 first = 0 

178 changes[field] += mlf.groups()[0] + "\n" 

179 continue 

180 error += line 

181 

182 changes["filecontents"] = armored_contents.decode() 

183 

184 if "source" in changes: 

185 # Strip the source version in brackets from the source field, 

186 # put it in the "source-version" field instead. 

187 if srcver := re_srchasver.search(changes["source"]): 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true

188 changes["source"] = srcver.group(1) 

189 changes["source-version"] = srcver.group(2) 

190 

191 if error: 191 ↛ 192line 191 didn't jump to line 192, because the condition on line 191 was never true

192 raise ParseChangesError(error) 

193 

194 return changes 

195 

196 

197################################################################################ 

198 

199 

200def parse_changes( 

201 filename: str, 

202 signing_rules: Literal[-1, 0, 1] = 0, 

203 dsc_file: bool = False, 

204 keyrings=None, 

205) -> dict[str, str]: 

206 """ 

207 Parses a changes or source control (.dsc) file and returns a dictionary 

208 where each field is a key. The mandatory first argument is the 

209 filename of the .changes file. 

210 

211 signing_rules is an optional argument: 

212 

213 - If signing_rules == -1, no signature is required. 

214 - If signing_rules == 0 (the default), a signature is required. 

215 - If signing_rules == 1, it turns on the same strict format checking 

216 as dpkg-source. 

217 

218 The rules for (signing_rules == 1)-mode are: 

219 

220 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----" 

221 followed by any PGP header data and must end with a blank line. 

222 

223 - The data section must end with a blank line and must be followed by 

224 "-----BEGIN PGP SIGNATURE-----". 

225 

226 :param dsc_file: `filename` is a Debian source control (.dsc) file 

227 """ 

228 

229 with open(filename, "rb") as changes_in: 

230 content = changes_in.read() 

231 changes = parse_deb822(content, signing_rules, keyrings=keyrings) 

232 

233 if not dsc_file: 

234 # Finally ensure that everything needed for .changes is there 

235 must_keywords = ( 

236 "Format", 

237 "Date", 

238 "Source", 

239 "Architecture", 

240 "Version", 

241 "Distribution", 

242 "Maintainer", 

243 "Changes", 

244 "Files", 

245 ) 

246 

247 missingfields = [] 

248 for keyword in must_keywords: 

249 if keyword.lower() not in changes: 249 ↛ 250line 249 didn't jump to line 250, because the condition on line 249 was never true

250 missingfields.append(keyword) 

251 

252 if len(missingfields): 

253 raise ParseChangesError( 

254 "Missing mandatory field(s) in changes file (policy 5.5): %s" 

255 % (missingfields) 

256 ) 

257 

258 return changes 

259 

260 

261################################################################################ 

262 

263 

264def check_dsc_files( 

265 dsc_filename: str, 

266 dsc: Mapping[str, str], 

267 dsc_files: Mapping[str, Mapping[str, str]], 

268) -> list[str]: 

269 """ 

270 Verify that the files listed in the Files field of the .dsc are 

271 those expected given the announced Format. 

272 

273 :param dsc_filename: path of .dsc file 

274 :param dsc: the content of the .dsc parsed by :func:`parse_changes` 

275 :param dsc_files: the file list returned by :func:`build_file_list` 

276 :return: all errors detected 

277 """ 

278 rejmsg = [] 

279 

280 # Ensure .dsc lists proper set of source files according to the format 

281 # announced 

282 has: defaultdict[str, int] = defaultdict(lambda: 0) 

283 

284 ftype_lookup = ( 

285 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)), 

286 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")), 

287 (r"diff\.gz", ("debian_diff",)), 

288 (r"tar\.gz", ("native_tar_gz", "native_tar")), 

289 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)), 

290 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)), 

291 (r"tar\.(gz|bz2|xz)", ("native_tar",)), 

292 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)), 

293 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)), 

294 ) 

295 

296 for f in dsc_files: 

297 m = re_issource.match(f) 

298 if not m: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true

299 rejmsg.append( 

300 "%s: %s in Files field not recognised as source." % (dsc_filename, f) 

301 ) 

302 continue 

303 

304 # Populate 'has' dictionary by resolving keys in lookup table 

305 matched = False 

306 for regex, keys in ftype_lookup: 306 ↛ 314line 306 didn't jump to line 314, because the loop on line 306 didn't complete

307 if re.match(regex, m.group(3)): 

308 matched = True 

309 for key in keys: 

310 has[key] += 1 

311 break 

312 

313 # File does not match anything in lookup table; reject 

314 if not matched: 314 ↛ 315line 314 didn't jump to line 315, because the condition on line 314 was never true

315 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f)) 

316 break 

317 

318 # Check for multiple files 

319 for file_type in ( 

320 "orig_tar", 

321 "orig_tar_sig", 

322 "native_tar", 

323 "debian_tar", 

324 "debian_diff", 

325 ): 

326 if has[file_type] > 1: 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true

327 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type)) 

328 

329 # Source format specific tests 

330 try: 

331 format = get_format_from_string(dsc["format"]) 

332 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)]) 

333 

334 except UnknownFormatError: 

335 # Not an error here for now 

336 pass 

337 

338 return rejmsg 

339 

340 

341################################################################################ 

342 

343# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl 

344 

345 

346def build_file_list( 

347 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum" 

348) -> dict[str, dict[str, str]]: 

349 files = {} 

350 

351 # Make sure we have a Files: field to parse... 

352 if field not in changes: 352 ↛ 353line 352 didn't jump to line 353, because the condition on line 352 was never true

353 raise NoFilesFieldError 

354 

355 # Validate .changes Format: field 

356 if not is_a_dsc: 356 ↛ 357line 356 didn't jump to line 357, because the condition on line 356 was never true

357 validate_changes_format(parse_format(changes["format"]), field) 

358 

359 includes_section = (not is_a_dsc) and field == "files" 

360 

361 # Parse each entry/line: 

362 for i in changes[field].split("\n"): 362 ↛ 387line 362 didn't jump to line 387, because the loop on line 362 didn't complete

363 if not i: 

364 break 

365 s = i.split() 

366 section = priority = "" 

367 try: 

368 if includes_section: 368 ↛ 369line 368 didn't jump to line 369, because the condition on line 368 was never true

369 (md5, size, section, priority, name) = s 

370 else: 

371 (md5, size, name) = s 

372 except ValueError: 

373 raise ParseChangesError(i) 

374 

375 if section == "": 375 ↛ 377line 375 didn't jump to line 377, because the condition on line 375 was never false

376 section = "-" 

377 if priority == "": 377 ↛ 380line 377 didn't jump to line 380, because the condition on line 377 was never false

378 priority = "-" 

379 

380 (section, component) = extract_component_from_section(section) 

381 

382 files[name] = dict( 

383 size=size, section=section, priority=priority, component=component 

384 ) 

385 files[name][hashname] = md5 

386 

387 return files 

388 

389 

390################################################################################ 

391 

392 

393def send_mail(message: str, whitelists: Optional[list[str]] = None) -> None: 

394 """sendmail wrapper, takes a message string 

395 

396 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists 

397 everything, otherwise an address is whitelisted if it is 

398 included in any of the lists. 

399 In addition a global whitelist can be specified in 

400 Dinstall::MailWhiteList. 

401 """ 

402 

403 msg = daklib.mail.parse_mail(message) 

404 

405 # The incoming message might be UTF-8, but outgoing mail should 

406 # use a legacy-compatible encoding. Set the content to the 

407 # text to make sure this is the case. 

408 # Note that this does not work with multipart messages. 

409 msg.set_content(msg.get_payload(), cte="quoted-printable") 

410 

411 # Check whether we're supposed to be sending mail 

412 call_sendmail = True 

413 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]: 

414 call_sendmail = False 

415 

416 if whitelists is None or None in whitelists: 

417 whitelists = [] 

418 if Cnf.get("Dinstall::MailWhiteList", ""): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true

419 whitelists.append(Cnf["Dinstall::MailWhiteList"]) 

420 if len(whitelists) != 0: 420 ↛ 421line 420 didn't jump to line 421, because the condition on line 420 was never true

421 whitelist = [] 

422 for path in whitelists: 

423 with open(path, "r") as whitelist_in: 

424 for line in whitelist_in: 

425 if not re_whitespace_comment.match(line): 

426 if re_re_mark.match(line): 

427 whitelist.append( 

428 re.compile(re_re_mark.sub("", line.strip(), 1)) 

429 ) 

430 else: 

431 whitelist.append(re.compile(re.escape(line.strip()))) 

432 

433 # Fields to check. 

434 fields = ["To", "Bcc", "Cc"] 

435 for field in fields: 

436 # Check each field 

437 value = msg.get(field, None) 

438 if value is not None: 

439 match = [] 

440 for item in value.split(","): 

441 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer( 

442 item.strip() 

443 ) 

444 mail_whitelisted = 0 

445 for wr in whitelist: 

446 if wr.match(mail): 

447 mail_whitelisted = 1 

448 break 

449 if not mail_whitelisted: 

450 print("Skipping {0} since it's not whitelisted".format(item)) 

451 continue 

452 match.append(item) 

453 

454 # Doesn't have any mail in whitelist so remove the header 

455 if len(match) == 0: 

456 del msg[field] 

457 else: 

458 msg.replace_header(field, ", ".join(match)) 

459 

460 # Change message fields in order if we don't have a To header 

461 if "To" not in msg: 

462 fields.reverse() 

463 for field in fields: 

464 if field in msg: 

465 msg[fields[-1]] = msg[field] 

466 del msg[field] 

467 break 

468 else: 

469 # return, as we removed all recipients. 

470 call_sendmail = False 

471 

472 # sign mail 

473 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 473 ↛ 482line 473 didn't jump to line 482, because the condition on line 473 was never false

474 kwargs = { 

475 "keyids": [mailkey], 

476 "pubring": Cnf.get("Dinstall::SigningPubKeyring") or None, 

477 "homedir": Cnf.get("Dinstall::SigningHomedir") or None, 

478 "passphrase_file": Cnf.get("Dinstall::SigningPassphraseFile") or None, 

479 } 

480 msg = daklib.mail.sign_mail(msg, **kwargs) 

481 

482 msg_bytes = msg.as_bytes(policy=email.policy.default) 

483 

484 maildir = Cnf.get("Dir::Mail") 

485 if maildir: 485 ↛ 492line 485 didn't jump to line 492, because the condition on line 485 was never false

486 path = os.path.join(maildir, datetime.datetime.now().isoformat()) 

487 path = find_next_free(path) 

488 with open(path, "wb") as fh: 

489 fh.write(msg_bytes) 

490 

491 # Invoke sendmail 

492 if not call_sendmail: 

493 return 

494 try: 

495 subprocess.run( 

496 Cnf["Dinstall::SendmailCommand"].split(), 

497 input=msg_bytes, 

498 check=True, 

499 stdout=subprocess.PIPE, 

500 stderr=subprocess.STDOUT, 

501 ) 

502 except subprocess.CalledProcessError as e: 

503 raise SendmailFailedError(e.output.decode().rstrip()) 

504 

505 

506################################################################################ 

507 

508 

509def poolify(source: str) -> str: 

510 """convert `source` name into directory path used in pool structure""" 

511 if source[:3] == "lib": 511 ↛ 512line 511 didn't jump to line 512, because the condition on line 511 was never true

512 return source[:4] + "/" + source + "/" 

513 else: 

514 return source[:1] + "/" + source + "/" 

515 

516 

517################################################################################ 

518 

519 

520def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None: 

521 if os.path.exists(dest) and os.path.isdir(dest): 

522 dest_dir = dest 

523 else: 

524 dest_dir = os.path.dirname(dest) 

525 if not os.path.lexists(dest_dir): 

526 umask = os.umask(00000) 

527 os.makedirs(dest_dir, 0o2775) 

528 os.umask(umask) 

529 # print "Moving %s to %s..." % (src, dest) 

530 if os.path.exists(dest) and os.path.isdir(dest): 

531 dest += "/" + os.path.basename(src) 

532 # Don't overwrite unless forced to 

533 if os.path.lexists(dest): 

534 if not overwrite: 

535 fubar("Can't move %s to %s - file already exists." % (src, dest)) 

536 else: 

537 if not os.access(dest, os.W_OK): 

538 fubar( 

539 "Can't move %s to %s - can't write to existing file." % (src, dest) 

540 ) 

541 shutil.copy2(src, dest) 

542 os.chmod(dest, perms) 

543 os.unlink(src) 

544 

545 

546################################################################################ 

547 

548 

549def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str: 

550 """Perform a substition of template""" 

551 with open(filename) as templatefile: 

552 template = templatefile.read() 

553 for k, v in subst_map.items(): 

554 template = template.replace(k, str(v)) 

555 return template 

556 

557 

558################################################################################ 

559 

560 

561def fubar(msg: str, exit_code: int = 1) -> NoReturn: 

562 """print error message and exit program""" 

563 print("E:", msg, file=sys.stderr) 

564 sys.exit(exit_code) 

565 

566 

567def warn(msg: str) -> None: 

568 """print warning message""" 

569 print("W:", msg, file=sys.stderr) 

570 

571 

572################################################################################ 

573 

574 

575def whoami() -> str: 

576 """get user name 

577 

578 Returns the user name with a laughable attempt at rfc822 conformancy 

579 (read: removing stray periods). 

580 """ 

581 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "") 

582 

583 

584def getusername() -> str: 

585 """get login name""" 

586 return pwd.getpwuid(os.getuid())[0] 

587 

588 

589################################################################################ 

590 

591 

592def size_type(c: Union[int, float]) -> str: 

593 t = " B" 

594 if c > 10240: 

595 c = c / 1024 

596 t = " KB" 

597 if c > 10240: 597 ↛ 598line 597 didn't jump to line 598, because the condition on line 597 was never true

598 c = c / 1024 

599 t = " MB" 

600 return "%d%s" % (c, t) 

601 

602 

603################################################################################ 

604 

605 

606def find_next_free(dest: str, too_many: int = 100) -> str: 

607 extra = 0 

608 orig_dest = dest 

609 while os.path.lexists(dest) and extra < too_many: 

610 dest = orig_dest + "." + repr(extra) 

611 extra += 1 

612 if extra >= too_many: 612 ↛ 613line 612 didn't jump to line 613, because the condition on line 612 was never true

613 raise NoFreeFilenameError 

614 return dest 

615 

616 

617################################################################################ 

618 

619 

620def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str: 

621 return sep.join(x if x is not None else "" for x in original) 

622 

623 

624################################################################################ 

625 

626 

627def prefix_multi_line_string( 

628 lines: str, prefix: str, include_blank_lines: bool = False 

629) -> str: 

630 """prepend `prefix` to each line in `lines`""" 

631 return "\n".join( 

632 prefix + cleaned_line 

633 for line in lines.split("\n") 

634 if (cleaned_line := line.strip()) or include_blank_lines 

635 ) 

636 

637 

638################################################################################ 

639 

640 

641def join_with_commas_and(list: Sequence[str]) -> str: 

642 if len(list) == 0: 642 ↛ 643line 642 didn't jump to line 643, because the condition on line 642 was never true

643 return "nothing" 

644 if len(list) == 1: 644 ↛ 646line 644 didn't jump to line 646, because the condition on line 644 was never false

645 return list[0] 

646 return ", ".join(list[:-1]) + " and " + list[-1] 

647 

648 

649################################################################################ 

650 

651 

652def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str: 

653 pp_deps = ( 

654 f"{pkg} ({constraint} {version})" if constraint else pkg 

655 for pkg, constraint, version in deps 

656 ) 

657 return " |".join(pp_deps) 

658 

659 

660################################################################################ 

661 

662 

663def get_conf(): 

664 return Cnf 

665 

666 

667################################################################################ 

668 

669 

670def parse_args(Options) -> tuple[str, str, str, bool]: 

671 """Handle -a, -c and -s arguments; returns them as SQL constraints""" 

672 # XXX: This should go away and everything which calls it be converted 

673 # to use SQLA properly. For now, we'll just fix it not to use 

674 # the old Pg interface though 

675 session = DBConn().session() 

676 # Process suite 

677 if Options["Suite"]: 677 ↛ 695line 677 didn't jump to line 695, because the condition on line 677 was never false

678 suite_ids_list = [] 

679 for suitename in split_args(Options["Suite"]): 

680 suite = get_suite(suitename, session=session) 

681 if not suite or suite.suite_id is None: 681 ↛ 682line 681 didn't jump to line 682, because the condition on line 681 was never true

682 warn( 

683 "suite '%s' not recognised." 

684 % (suite and suite.suite_name or suitename) 

685 ) 

686 else: 

687 suite_ids_list.append(suite.suite_id) 

688 if suite_ids_list: 688 ↛ 693line 688 didn't jump to line 693, because the condition on line 688 was never false

689 con_suites = "AND su.id IN (%s)" % ", ".join( 

690 [str(i) for i in suite_ids_list] 

691 ) 

692 else: 

693 fubar("No valid suite given.") 

694 else: 

695 con_suites = "" 

696 

697 # Process component 

698 if Options["Component"]: 698 ↛ 699line 698 didn't jump to line 699, because the condition on line 698 was never true

699 component_ids_list = [] 

700 for componentname in split_args(Options["Component"]): 

701 component = get_component(componentname, session=session) 

702 if component is None: 

703 warn("component '%s' not recognised." % (componentname)) 

704 else: 

705 component_ids_list.append(component.component_id) 

706 if component_ids_list: 

707 con_components = "AND c.id IN (%s)" % ", ".join( 

708 [str(i) for i in component_ids_list] 

709 ) 

710 else: 

711 fubar("No valid component given.") 

712 else: 

713 con_components = "" 

714 

715 # Process architecture 

716 con_architectures = "" 

717 check_source = False 

718 if Options["Architecture"]: 718 ↛ 719line 718 didn't jump to line 719, because the condition on line 718 was never true

719 arch_ids_list = [] 

720 for archname in split_args(Options["Architecture"]): 

721 if archname == "source": 

722 check_source = True 

723 else: 

724 arch = get_architecture(archname, session=session) 

725 if arch is None: 

726 warn("architecture '%s' not recognised." % (archname)) 

727 else: 

728 arch_ids_list.append(arch.arch_id) 

729 if arch_ids_list: 

730 con_architectures = "AND a.id IN (%s)" % ", ".join( 

731 [str(i) for i in arch_ids_list] 

732 ) 

733 else: 

734 if not check_source: 

735 fubar("No valid architecture given.") 

736 else: 

737 check_source = True 

738 

739 return (con_suites, con_architectures, con_components, check_source) 

740 

741 

742################################################################################ 

743 

744 

745@functools.total_ordering 

746class ArchKey: 

747 """ 

748 Key object for use in sorting lists of architectures. 

749 

750 Sorts normally except that 'source' dominates all others. 

751 """ 

752 

753 __slots__ = ["arch", "issource"] 

754 

755 def __init__(self, arch, *args): 

756 self.arch = arch 

757 self.issource = arch == "source" 

758 

759 def __lt__(self, other: "ArchKey") -> bool: 

760 if self.issource: 

761 return not other.issource 

762 if other.issource: 

763 return False 

764 return self.arch < other.arch 

765 

766 def __eq__(self, other: object) -> bool: 

767 if not isinstance(other, ArchKey): 767 ↛ 768line 767 didn't jump to line 768, because the condition on line 767 was never true

768 return NotImplemented 

769 return self.arch == other.arch 

770 

771 

772################################################################################ 

773 

774 

775def split_args(s: str, dwim: bool = True) -> list[str]: 

776 """ 

777 Split command line arguments which can be separated by either commas 

778 or whitespace. If dwim is set, it will complain about string ending 

779 in comma since this usually means someone did 'dak ls -a i386, m68k 

780 foo' or something and the inevitable confusion resulting from 'm68k' 

781 being treated as an argument is undesirable. 

782 """ 

783 

784 if s.find(",") == -1: 784 ↛ 787line 784 didn't jump to line 787, because the condition on line 784 was never false

785 return s.split() 

786 else: 

787 if s[-1:] == "," and dwim: 

788 fubar("split_args: found trailing comma, spurious space maybe?") 

789 return s.split(",") 

790 

791 

792################################################################################ 

793 

794 

795def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]: 

796 if keyrings is None: 796 ↛ 799line 796 didn't jump to line 799, because the condition on line 796 was never false

797 keyrings = get_active_keyring_paths() 

798 

799 return ["--keyring={}".format(path) for path in keyrings] 

800 

801 

802################################################################################ 

803 

804 

805def _gpg_get_addresses_from_listing(output: bytes) -> list[str]: 

806 addresses: list[str] = [] 

807 

808 for line in output.split(b"\n"): 

809 parts = line.split(b":") 

810 if parts[0] not in (b"uid", b"pub"): 

811 continue 

812 if parts[1] in (b"i", b"d", b"r"): 812 ↛ 814line 812 didn't jump to line 814, because the condition on line 812 was never true

813 # Skip uid that is invalid, disabled or revoked 

814 continue 

815 try: 

816 uid_bytes = parts[9] 

817 except IndexError: 

818 continue 

819 try: 

820 uid = uid_bytes.decode(encoding="utf-8") 

821 except UnicodeDecodeError: 

822 # If the uid is not valid UTF-8, we assume it is an old uid 

823 # still encoding in Latin-1. 

824 uid = uid_bytes.decode(encoding="latin1") 

825 m = re_parse_maintainer.match(uid) 

826 if not m: 

827 continue 

828 address = m.group(2) 

829 if address.endswith("@debian.org"): 829 ↛ 832line 829 didn't jump to line 832, because the condition on line 829 was never true

830 # prefer @debian.org addresses 

831 # TODO: maybe not hardcode the domain 

832 addresses.insert(0, address) 

833 else: 

834 addresses.append(address) 

835 

836 return addresses 

837 

838 

839def gpg_get_key_addresses(fingerprint: str) -> list[str]: 

840 """retreive email addresses from gpg key uids for a given fingerprint""" 

841 addresses = key_uid_email_cache.get(fingerprint) 

842 if addresses is not None: 

843 return addresses 

844 

845 try: 

846 cmd = ["gpg", "--no-default-keyring"] 

847 cmd.extend(gpg_keyring_args()) 

848 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint]) 

849 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) 

850 except subprocess.CalledProcessError: 

851 addresses = [] 

852 else: 

853 addresses = _gpg_get_addresses_from_listing(output) 

854 

855 key_uid_email_cache[fingerprint] = addresses 

856 return addresses 

857 

858 

859################################################################################ 

860 

861 

862def open_ldap_connection(): 

863 """open connection to the configured LDAP server""" 

864 import ldap # type: ignore 

865 

866 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"] 

867 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile") 

868 

869 conn = ldap.initialize(LDAPServer) 

870 

871 if ca_cert_file: 

872 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD) 

873 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file) 

874 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True) 

875 conn.start_tls_s() 

876 

877 conn.simple_bind_s("", "") 

878 

879 return conn 

880 

881 

882################################################################################ 

883 

884 

885def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]: 

886 """retrieve login from LDAP linked to a given fingerprint""" 

887 import ldap 

888 

889 conn = open_ldap_connection() 

890 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

891 Attrs = conn.search_s( 

892 LDAPDn, 

893 ldap.SCOPE_ONELEVEL, 

894 "(keyfingerprint=%s)" % fingerprint, 

895 ["uid", "keyfingerprint"], 

896 ) 

897 login: dict[str, str] = {} 

898 for elem in Attrs: 

899 fpr = elem[1]["keyFingerPrint"][0].decode() 

900 uid = elem[1]["uid"][0].decode() 

901 login[fpr] = uid 

902 return login 

903 

904 

905################################################################################ 

906 

907 

908def get_users_from_ldap() -> dict[str, str]: 

909 """retrieve login and user names from LDAP""" 

910 import ldap 

911 

912 conn = open_ldap_connection() 

913 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

914 Attrs = conn.search_s( 

915 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"] 

916 ) 

917 users: dict[str, str] = {} 

918 for elem in Attrs: 

919 elem = elem[1] 

920 name = [] 

921 for k in ("cn", "mn", "sn"): 

922 try: 

923 value = elem[k][0].decode() 

924 if value and value[0] != "-": 

925 name.append(value) 

926 except KeyError: 

927 pass 

928 users[" ".join(name)] = elem["uid"][0] 

929 return users 

930 

931 

932################################################################################ 

933 

934 

935def clean_symlink(src: str, dest: str, root: str) -> str: 

936 """ 

937 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'. 

938 Returns fixed 'src' 

939 """ 

940 src = src.replace(root, "", 1) 

941 dest = dest.replace(root, "", 1) 

942 dest = os.path.dirname(dest) 

943 new_src = "../" * len(dest.split("/")) 

944 return new_src + src 

945 

946 

947################################################################################ 

948 

949 

950def temp_dirname( 

951 parent: Optional[str] = None, 

952 prefix: str = "dak", 

953 suffix: str = "", 

954 mode: Optional[int] = None, 

955 group: Optional[str] = None, 

956) -> str: 

957 """ 

958 Return a secure and unique directory by pre-creating it. 

959 

960 :param parent: If non-null it will be the directory the directory is pre-created in. 

961 :param prefix: The filename will be prefixed with this string 

962 :param suffix: The filename will end with this string 

963 :param mode: If set the file will get chmodded to those permissions 

964 :param group: If set the file will get chgrped to the specified group. 

965 :return: Returns a pair (fd, name) 

966 

967 """ 

968 

969 tfname = tempfile.mkdtemp(suffix, prefix, parent) 

970 if mode is not None: 970 ↛ 972line 970 didn't jump to line 972, because the condition on line 970 was never false

971 os.chmod(tfname, mode) 

972 if group is not None: 972 ↛ 973line 972 didn't jump to line 973, because the condition on line 972 was never true

973 gid = grp.getgrnam(group).gr_gid 

974 os.chown(tfname, -1, gid) 

975 return tfname 

976 

977 

978################################################################################ 

979 

980 

981def get_changes_files(from_dir: str) -> list[str]: 

982 """ 

983 Takes a directory and lists all .changes files in it (as well as chdir'ing 

984 to the directory; this is due to broken behaviour on the part of p-u/p-a 

985 when you're not in the right place) 

986 

987 Returns a list of filenames 

988 """ 

989 try: 

990 # Much of the rest of p-u/p-a depends on being in the right place 

991 os.chdir(from_dir) 

992 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")] 

993 except OSError as e: 

994 fubar("Failed to read list from directory %s (%s)" % (from_dir, e)) 

995 

996 return changes_files 

997 

998 

999################################################################################ 

1000 

1001 

1002Cnf = config.Config().Cnf 

1003 

1004################################################################################ 

1005 

1006 

1007def parse_wnpp_bug_file( 

1008 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm", 

1009) -> dict[str, list[str]]: 

1010 """ 

1011 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm 

1012 Well, actually it parsed a local copy, but let's document the source 

1013 somewhere ;) 

1014 

1015 returns a dict associating source package name with a list of open wnpp 

1016 bugs (Yes, there might be more than one) 

1017 """ 

1018 

1019 try: 

1020 with open(file) as f: 

1021 lines = f.readlines() 

1022 except OSError: 

1023 print( 

1024 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any." 

1025 % file 

1026 ) 

1027 lines = [] 

1028 wnpp = {} 

1029 

1030 for line in lines: 

1031 splited_line = line.split(": ", 1) 

1032 if len(splited_line) > 1: 

1033 wnpp[splited_line[0]] = splited_line[1].split("|") 

1034 

1035 for source in wnpp: 

1036 bugs = [] 

1037 for wnpp_bug in wnpp[source]: 

1038 bug_no = re.search(r"(\d)+", wnpp_bug).group() 

1039 if bug_no: 

1040 bugs.append(bug_no) 

1041 wnpp[source] = bugs 

1042 return wnpp 

1043 

1044 

1045################################################################################ 

1046 

1047 

1048def deb_extract_control(path: str) -> bytes: 

1049 """extract DEBIAN/control from a binary package""" 

1050 return apt_inst.DebFile(path).control.extractdata("control") 

1051 

1052 

1053################################################################################ 

1054 

1055 

1056def mail_addresses_for_upload( 

1057 maintainer: str, 

1058 changed_by: str, 

1059 fingerprint: str, 

1060 authorized_by_fingerprint: Optional[str], 

1061) -> list[str]: 

1062 """mail addresses to contact for an upload 

1063 

1064 :param maintainer: Maintainer field of the .changes file 

1065 :param changed_by: Changed-By field of the .changes file 

1066 :param fingerprint: fingerprint of the key used to sign the upload 

1067 :return: list of RFC 2047-encoded mail addresses to contact regarding 

1068 this upload 

1069 """ 

1070 recipients = Cnf.value_list("Dinstall::UploadMailRecipients") 

1071 if not recipients: 1071 ↛ 1080line 1071 didn't jump to line 1080, because the condition on line 1071 was never false

1072 recipients = [ 

1073 "maintainer", 

1074 "changed_by", 

1075 "signer", 

1076 "authorized_by", 

1077 ] 

1078 

1079 # Ensure signer and authorized_by are last if present 

1080 for r in ("signer", "authorized_by"): 

1081 try: 

1082 recipients.remove(r) 

1083 except ValueError: 

1084 pass 

1085 else: 

1086 recipients.append(r) 

1087 

1088 # Compute the set of addresses of the recipients 

1089 addresses = set() # Name + email 

1090 emails = set() # Email only, used to avoid duplicates 

1091 for recipient in recipients: 

1092 if recipient.startswith("mail:"): # Email hardcoded in config 1092 ↛ 1093line 1092 didn't jump to line 1093, because the condition on line 1092 was never true

1093 address = recipient[5:] 

1094 elif recipient == "maintainer": 

1095 address = maintainer 

1096 elif recipient == "changed_by": 

1097 address = changed_by 

1098 elif recipient == "signer" or recipient == "authorized_by": 1098 ↛ 1108line 1098 didn't jump to line 1108, because the condition on line 1098 was never false

1099 fpr = fingerprint if recipient == "signer" else authorized_by_fingerprint 

1100 if not fpr: 1100 ↛ 1101line 1100 didn't jump to line 1101, because the condition on line 1100 was never true

1101 continue 

1102 fpr_addresses = gpg_get_key_addresses(fpr) 

1103 address = fpr_addresses[0] if fpr_addresses else None 

1104 if any(x in emails for x in fpr_addresses): 

1105 # The signer already gets a copy via another email 

1106 address = None 

1107 else: 

1108 raise Exception( 

1109 "Unsupported entry in {0}: {1}".format( 

1110 "Dinstall::UploadMailRecipients", recipient 

1111 ) 

1112 ) 

1113 

1114 if address is not None: 

1115 mail = fix_maintainer(address)[3] 

1116 if mail not in emails: 

1117 addresses.add(address) 

1118 emails.add(mail) 

1119 

1120 encoded_addresses = [fix_maintainer(e)[1] for e in addresses] 

1121 return encoded_addresses 

1122 

1123 

1124################################################################################ 

1125 

1126 

1127def call_editor_for_file(path: str) -> None: 

1128 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor")) 

1129 subprocess.check_call([editor, path]) 

1130 

1131 

1132################################################################################ 

1133 

1134 

1135def call_editor(text: str = "", suffix: str = ".txt") -> str: 

1136 """run editor and return the result as a string 

1137 

1138 :param text: initial text 

1139 :param suffix: extension for temporary file 

1140 :return: string with the edited text 

1141 """ 

1142 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh: 

1143 print(text, end="", file=fh) 

1144 fh.flush() 

1145 call_editor_for_file(fh.name) 

1146 fh.seek(0) 

1147 return fh.read() 

1148 

1149 

1150################################################################################ 

1151 

1152 

1153def check_reverse_depends( 

1154 removals: Iterable[str], 

1155 suite: str, 

1156 arches: Optional[Iterable[Architecture]] = None, 

1157 session=None, 

1158 cruft: bool = False, 

1159 quiet: bool = False, 

1160 include_arch_all: bool = True, 

1161) -> bool: 

1162 dbsuite = get_suite(suite, session) 

1163 overridesuite = dbsuite 

1164 if dbsuite.overridesuite is not None: 1164 ↛ 1165line 1164 didn't jump to line 1165, because the condition on line 1164 was never true

1165 overridesuite = get_suite(dbsuite.overridesuite, session) 

1166 dep_problem = False 

1167 p2c = {} 

1168 all_broken = defaultdict(lambda: defaultdict(set)) 1168 ↛ exitline 1168 didn't run the lambda on line 1168

1169 if arches: 1169 ↛ 1170line 1169 didn't jump to line 1170, because the condition on line 1169 was never true

1170 all_arches = set(arches) 

1171 else: 

1172 all_arches = set(x.arch_string for x in get_suite_architectures(suite)) 

1173 all_arches -= set(["source", "all"]) 

1174 removal_set = set(removals) 

1175 metakey_d = get_or_set_metadatakey("Depends", session) 

1176 metakey_p = get_or_set_metadatakey("Provides", session) 

1177 params = { 

1178 "suite_id": dbsuite.suite_id, 

1179 "metakey_d_id": metakey_d.key_id, 

1180 "metakey_p_id": metakey_p.key_id, 

1181 } 

1182 if include_arch_all: 1182 ↛ 1185line 1182 didn't jump to line 1185, because the condition on line 1182 was never false

1183 rdep_architectures = all_arches | set(["all"]) 

1184 else: 

1185 rdep_architectures = all_arches 

1186 for architecture in rdep_architectures: 

1187 deps = {} 

1188 sources = {} 

1189 virtual_packages = {} 

1190 try: 

1191 params["arch_id"] = get_architecture(architecture, session).arch_id 

1192 except AttributeError: 

1193 continue 

1194 

1195 statement = sql.text( 

1196 """ 

1197 SELECT b.package, s.source, c.name as component, 

1198 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends, 

1199 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides 

1200 FROM binaries b 

1201 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id 

1202 JOIN source s ON b.source = s.id 

1203 JOIN files_archive_map af ON b.file = af.file_id 

1204 JOIN component c ON af.component_id = c.id 

1205 WHERE b.architecture = :arch_id""" 

1206 ) 

1207 query = ( 

1208 session.query( 

1209 sql.column("package"), 

1210 sql.column("source"), 

1211 sql.column("component"), 

1212 sql.column("depends"), 

1213 sql.column("provides"), 

1214 ) 

1215 .from_statement(statement) 

1216 .params(params) 

1217 ) 

1218 for package, source, component, depends, provides in query: 

1219 sources[package] = source 

1220 p2c[package] = component 

1221 if depends is not None: 1221 ↛ 1222line 1221 didn't jump to line 1222, because the condition on line 1221 was never true

1222 deps[package] = depends 

1223 # Maintain a counter for each virtual package. If a 

1224 # Provides: exists, set the counter to 0 and count all 

1225 # provides by a package not in the list for removal. 

1226 # If the counter stays 0 at the end, we know that only 

1227 # the to-be-removed packages provided this virtual 

1228 # package. 

1229 if provides is not None: 1229 ↛ 1230line 1229 didn't jump to line 1230, because the condition on line 1229 was never true

1230 for virtual_pkg in provides.split(","): 

1231 virtual_pkg = virtual_pkg.strip() 

1232 if virtual_pkg == package: 

1233 continue 

1234 if virtual_pkg not in virtual_packages: 

1235 virtual_packages[virtual_pkg] = 0 

1236 if package not in removals: 

1237 virtual_packages[virtual_pkg] += 1 

1238 

1239 # If a virtual package is only provided by the to-be-removed 

1240 # packages, treat the virtual package as to-be-removed too. 

1241 removal_set.update( 

1242 virtual_pkg 

1243 for virtual_pkg in virtual_packages 

1244 if not virtual_packages[virtual_pkg] 

1245 ) 

1246 

1247 # Check binary dependencies (Depends) 

1248 for package in deps: 1248 ↛ 1249line 1248 didn't jump to line 1249, because the loop on line 1248 never started

1249 if package in removals: 

1250 continue 

1251 try: 

1252 parsed_dep = apt_pkg.parse_depends(deps[package]) 

1253 except ValueError as e: 

1254 print("Error for package %s: %s" % (package, e)) 

1255 parsed_dep = [] 

1256 for dep in parsed_dep: 

1257 # Check for partial breakage. If a package has a ORed 

1258 # dependency, there is only a dependency problem if all 

1259 # packages in the ORed depends will be removed. 

1260 unsat = 0 

1261 for dep_package, _, _ in dep: 

1262 if dep_package in removals: 

1263 unsat += 1 

1264 if unsat == len(dep): 

1265 component = p2c[package] 

1266 source = sources[package] 

1267 if component != "main": 

1268 source = "%s/%s" % (source, component) 

1269 all_broken[source][package].add(architecture) 

1270 dep_problem = True 

1271 

1272 if all_broken and not quiet: 1272 ↛ 1273line 1272 didn't jump to line 1273, because the condition on line 1272 was never true

1273 if cruft: 

1274 print(" - broken Depends:") 

1275 else: 

1276 print("# Broken Depends:") 

1277 for source, bindict in sorted(all_broken.items()): 

1278 lines = [] 

1279 for binary, arches in sorted(bindict.items()): 

1280 if arches == all_arches or "all" in arches: 

1281 lines.append(binary) 

1282 else: 

1283 lines.append("%s [%s]" % (binary, " ".join(sorted(arches)))) 

1284 if cruft: 

1285 print(" %s: %s" % (source, lines[0])) 

1286 else: 

1287 print("%s: %s" % (source, lines[0])) 

1288 for line in lines[1:]: 

1289 if cruft: 

1290 print(" " + " " * (len(source) + 2) + line) 

1291 else: 

1292 print(" " * (len(source) + 2) + line) 

1293 if not cruft: 

1294 print() 

1295 

1296 # Check source dependencies (Build-Depends and Build-Depends-Indep) 

1297 all_broken = defaultdict(set) 

1298 metakey_bd = get_or_set_metadatakey("Build-Depends", session) 

1299 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session) 

1300 if include_arch_all: 1300 ↛ 1303line 1300 didn't jump to line 1303, because the condition on line 1300 was never false

1301 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id) 

1302 else: 

1303 metakey_ids = (metakey_bd.key_id,) 

1304 

1305 params = { 

1306 "suite_id": dbsuite.suite_id, 

1307 "metakey_ids": metakey_ids, 

1308 } 

1309 statement = sql.text( 

1310 """ 

1311 SELECT s.source, string_agg(sm.value, ', ') as build_dep 

1312 FROM source s 

1313 JOIN source_metadata sm ON s.id = sm.src_id 

1314 WHERE s.id in 

1315 (SELECT src FROM newest_src_association 

1316 WHERE suite = :suite_id) 

1317 AND sm.key_id in :metakey_ids 

1318 GROUP BY s.id, s.source""" 

1319 ) 

1320 query = ( 

1321 session.query(sql.column("source"), sql.column("build_dep")) 

1322 .from_statement(statement) 

1323 .params(params) 

1324 ) 

1325 for source, build_dep in query: 

1326 if source in removals: 

1327 continue 

1328 parsed_dep = [] 

1329 if build_dep is not None: 1329 ↛ 1336line 1329 didn't jump to line 1336, because the condition on line 1329 was never false

1330 # Remove [arch] information since we want to see breakage on all arches 

1331 build_dep = re_build_dep_arch.sub("", build_dep) 

1332 try: 

1333 parsed_dep = apt_pkg.parse_src_depends(build_dep) 

1334 except ValueError as e: 

1335 print("Error for source %s: %s" % (source, e)) 

1336 for dep in parsed_dep: 

1337 unsat = 0 

1338 for dep_package, _, _ in dep: 

1339 if dep_package in removals: 1339 ↛ 1340line 1339 didn't jump to line 1340, because the condition on line 1339 was never true

1340 unsat += 1 

1341 if unsat == len(dep): 1341 ↛ 1342line 1341 didn't jump to line 1342

1342 (component,) = ( 

1343 session.query(Component.component_name) 

1344 .join(Component.overrides) 

1345 .filter(Override.suite == overridesuite) 

1346 .filter( 

1347 Override.package 

1348 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source) 

1349 ) 

1350 .join(Override.overridetype) 

1351 .filter(OverrideType.overridetype == "dsc") 

1352 .first() 

1353 ) 

1354 key = source 

1355 if component != "main": 

1356 key = "%s/%s" % (source, component) 

1357 all_broken[key].add(pp_deps(dep)) 

1358 dep_problem = True 

1359 

1360 if all_broken and not quiet: 1360 ↛ 1361line 1360 didn't jump to line 1361, because the condition on line 1360 was never true

1361 if cruft: 

1362 print(" - broken Build-Depends:") 

1363 else: 

1364 print("# Broken Build-Depends:") 

1365 for source, bdeps in sorted(all_broken.items()): 

1366 bdeps = sorted(bdeps) 

1367 if cruft: 

1368 print(" %s: %s" % (source, bdeps[0])) 

1369 else: 

1370 print("%s: %s" % (source, bdeps[0])) 

1371 for bdep in bdeps[1:]: 

1372 if cruft: 

1373 print(" " + " " * (len(source) + 2) + bdep) 

1374 else: 

1375 print(" " * (len(source) + 2) + bdep) 

1376 if not cruft: 

1377 print() 

1378 

1379 return dep_problem 

1380 

1381 

1382################################################################################ 

1383 

1384 

1385def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]: 

1386 """source packages referenced via Built-Using 

1387 

1388 :param control: control file to take Built-Using field from 

1389 :return: list of (source_name, source_version) pairs 

1390 """ 

1391 built_using = control.get("Built-Using", None) 

1392 if built_using is None: 

1393 return [] 

1394 

1395 bu = [] 

1396 for dep in apt_pkg.parse_depends(built_using): 

1397 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field" 

1398 source_name, source_version, comp = dep[0] 

1399 assert comp == "=", "Built-Using must contain strict dependencies" 

1400 bu.append((source_name, source_version)) 

1401 

1402 return bu 

1403 

1404 

1405################################################################################ 

1406 

1407 

1408def is_in_debug_section(control: Mapping[str, str]) -> bool: 

1409 """binary package is a debug package 

1410 

1411 :param control: control file of binary package 

1412 :return: True if the binary package is a debug package 

1413 """ 

1414 section = control["Section"].split("/", 1)[-1] 

1415 auto_built_package = control.get("Auto-Built-Package") 

1416 return section == "debug" and auto_built_package == "debug-symbols" 

1417 

1418 

1419################################################################################ 

1420 

1421 

1422def find_possibly_compressed_file(filename: str) -> str: 

1423 """ 

1424 

1425 :param filename: path to a control file (Sources, Packages, etc) to 

1426 look for 

1427 :return: path to the (possibly compressed) control file, or null if the 

1428 file doesn't exist 

1429 """ 

1430 _compressions = ("", ".xz", ".gz", ".bz2") 

1431 

1432 for ext in _compressions: 1432 ↛ 1437line 1432 didn't jump to line 1437, because the loop on line 1432 didn't complete

1433 _file = filename + ext 

1434 if os.path.exists(_file): 

1435 return _file 

1436 

1437 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename) 

1438 

1439 

1440################################################################################ 

1441 

1442 

1443def parse_boolean_from_user(value: str) -> bool: 

1444 value = value.lower() 

1445 if value in {"yes", "true", "enable", "enabled"}: 

1446 return True 

1447 if value in {"no", "false", "disable", "disabled"}: 1447 ↛ 1449line 1447 didn't jump to line 1449, because the condition on line 1447 was never false

1448 return False 

1449 raise ValueError("Not sure whether %s should be a True or a False" % value) 

1450 

1451 

1452def suite_suffix(suite_name: str) -> str: 

1453 """Return suite_suffix for the given suite""" 

1454 suffix = Cnf.find("Dinstall::SuiteSuffix", "") 

1455 if suffix == "": 1455 ↛ 1457line 1455 didn't jump to line 1457, because the condition on line 1455 was never false

1456 return "" 

1457 elif "Dinstall::SuiteSuffixSuites" not in Cnf: 

1458 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future 

1459 return suffix 

1460 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"): 

1461 return suffix 

1462 return "" 

1463 

1464 

1465################################################################################ 

1466 

1467 

1468def process_buildinfos( 

1469 directory: str, 

1470 buildinfo_files: "Iterable[daklib.upload.HashedFile]", 

1471 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1472 logger: "daklib.daklog.Logger", 

1473) -> None: 

1474 """Copy buildinfo files into Dir::BuildinfoArchive 

1475 

1476 :param directory: directory where .changes is stored 

1477 :param buildinfo_files: names of buildinfo files 

1478 :param fs_transaction: FilesystemTransaction instance 

1479 :param logger: logger instance 

1480 """ 

1481 

1482 if "Dir::BuildinfoArchive" not in Cnf: 

1483 return 

1484 

1485 target_dir = os.path.join( 

1486 Cnf["Dir::BuildinfoArchive"], 

1487 datetime.datetime.now().strftime("%Y/%m/%d"), 

1488 ) 

1489 

1490 for f in buildinfo_files: 

1491 src = os.path.join(directory, f.filename) 

1492 dst = find_next_free(os.path.join(target_dir, f.filename)) 

1493 

1494 logger.log(["Archiving", f.filename]) 

1495 fs_transaction.copy(src, dst, mode=0o644) 

1496 

1497 

1498################################################################################ 

1499 

1500 

1501def move_to_morgue( 

1502 morguesubdir: str, 

1503 filenames: Iterable[str], 

1504 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1505 logger: "daklib.daklog.Logger", 

1506): 

1507 """Move a file to the correct dir in morgue 

1508 

1509 :param morguesubdir: subdirectory of morgue where this file needs to go 

1510 :param filenames: names of files 

1511 :param fs_transaction: FilesystemTransaction instance 

1512 :param logger: logger instance 

1513 """ 

1514 

1515 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf.get("Dir::Base"), "morgue")) 

1516 

1517 # Build directory as morguedir/morguesubdir/year/month/day 

1518 now = datetime.datetime.now() 

1519 dest = os.path.join( 

1520 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day 

1521 ) 

1522 

1523 for filename in filenames: 

1524 dest_filename = dest + "/" + os.path.basename(filename) 

1525 # If the destination file exists; try to find another filename to use 

1526 if os.path.lexists(dest_filename): 1526 ↛ 1527line 1526 didn't jump to line 1527, because the condition on line 1526 was never true

1527 dest_filename = find_next_free(dest_filename) 

1528 logger.log(["move to morgue", filename, dest_filename]) 

1529 fs_transaction.move(filename, dest_filename)