1# vim:set et ts=4 sw=4: 

2 

3"""Utility functions 

4 

5@contact: Debian FTP Master <ftpmaster@debian.org> 

6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org> 

7@license: GNU General Public License version 2 or later 

8""" 

9 

10# This program is free software; you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation; either version 2 of the License, or 

13# (at your option) any later version. 

14 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19 

20# You should have received a copy of the GNU General Public License 

21# along with this program; if not, write to the Free Software 

22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

23 

24import datetime 

25import email.policy 

26import errno 

27import functools 

28import grp 

29import os 

30import pwd 

31import re 

32import shutil 

33import subprocess 

34import sys 

35import tempfile 

36from collections import defaultdict 

37from collections.abc import Iterable, Mapping, Sequence 

38from typing import TYPE_CHECKING, Literal, NoReturn, Optional, Union 

39 

40import apt_inst 

41import apt_pkg 

42import sqlalchemy.sql as sql 

43 

44import daklib.config as config 

45import daklib.mail 

46from daklib.dbconn import ( 

47 Architecture, 

48 Component, 

49 DBConn, 

50 Override, 

51 OverrideType, 

52 get_active_keyring_paths, 

53 get_architecture, 

54 get_component, 

55 get_or_set_metadatakey, 

56 get_suite, 

57 get_suite_architectures, 

58) 

59 

60from .dak_exceptions import ( 

61 InvalidDscError, 

62 NoFilesFieldError, 

63 NoFreeFilenameError, 

64 ParseChangesError, 

65 SendmailFailedError, 

66 UnknownFormatError, 

67) 

68from .formats import parse_format, validate_changes_format 

69from .gpg import SignedFile 

70from .regexes import ( 

71 re_build_dep_arch, 

72 re_issource, 

73 re_multi_line_field, 

74 re_parse_maintainer, 

75 re_re_mark, 

76 re_single_line_field, 

77 re_srchasver, 

78 re_whitespace_comment, 

79) 

80from .srcformats import get_format_from_string 

81from .textutils import fix_maintainer 

82 

83if TYPE_CHECKING: 83 ↛ 84line 83 didn't jump to line 84, because the condition on line 83 was never true

84 import daklib.daklog 

85 import daklib.fstransactions 

86 import daklib.upload 

87 

88################################################################################ 

89 

90key_uid_email_cache: dict[str, list[str]] = ( 

91 {} 

92) #: Cache for email addresses from gpg key uids 

93 

94################################################################################ 

95 

96 

97def input_or_exit(prompt: Optional[str] = None) -> str: 

98 try: 

99 return input(prompt) 

100 except EOFError: 

101 sys.exit("\nUser interrupt (^D).") 

102 

103 

104################################################################################ 

105 

106 

107def extract_component_from_section(section: str) -> tuple[str, str]: 

108 """split "section" into "section", "component" parts 

109 

110 If "component" is not given, "main" is used instead. 

111 

112 :return: tuple (section, component) 

113 """ 

114 if section.find("/") != -1: 

115 return section, section.split("/", 1)[0] 

116 return section, "main" 

117 

118 

119################################################################################ 

120 

121 

122def parse_deb822( 

123 armored_contents: bytes, signing_rules: Literal[-1, 0, 1] = 0, keyrings=None 

124) -> dict[str, str]: 

125 require_signature = True 

126 if keyrings is None: 126 ↛ 130line 126 didn't jump to line 130, because the condition on line 126 was never false

127 keyrings = [] 

128 require_signature = False 

129 

130 signed_file = SignedFile( 

131 armored_contents, keyrings=keyrings, require_signature=require_signature 

132 ) 

133 contents = signed_file.contents.decode("utf-8") 

134 

135 error = "" 

136 changes = {} 

137 

138 # Split the lines in the input, keeping the linebreaks. 

139 lines = contents.splitlines(True) 

140 

141 if len(lines) == 0: 

142 raise ParseChangesError("[Empty changes file]") 

143 

144 # Reindex by line number so we can easily verify the format of 

145 # .dsc files... 

146 index = 0 

147 indexed_lines = {} 

148 for line in lines: 

149 index += 1 

150 indexed_lines[index] = line[:-1] 

151 

152 num_of_lines = len(indexed_lines) 

153 index = 0 

154 first = -1 

155 while index < num_of_lines: 

156 index += 1 

157 line = indexed_lines[index] 

158 if line == "" and signing_rules == 1: 158 ↛ 159line 158 didn't jump to line 159, because the condition on line 158 was never true

159 if index != num_of_lines: 

160 raise InvalidDscError(index) 

161 break 

162 if slf := re_single_line_field.match(line): 

163 field = slf.groups()[0].lower() 

164 changes[field] = slf.groups()[1] 

165 first = 1 

166 continue 

167 if line == " .": 

168 changes[field] += "\n" 

169 continue 

170 if mlf := re_multi_line_field.match(line): 

171 if first == -1: 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true

172 raise ParseChangesError( 

173 "'%s'\n [Multi-line field continuing on from nothing?]" % (line) 

174 ) 

175 if first == 1 and changes[field] != "": 

176 changes[field] += "\n" 

177 first = 0 

178 changes[field] += mlf.groups()[0] + "\n" 

179 continue 

180 error += line 

181 

182 changes["filecontents"] = armored_contents.decode() 

183 

184 if "source" in changes: 

185 # Strip the source version in brackets from the source field, 

186 # put it in the "source-version" field instead. 

187 if srcver := re_srchasver.search(changes["source"]): 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true

188 changes["source"] = srcver.group(1) 

189 changes["source-version"] = srcver.group(2) 

190 

191 if error: 191 ↛ 192line 191 didn't jump to line 192, because the condition on line 191 was never true

192 raise ParseChangesError(error) 

193 

194 return changes 

195 

196 

197################################################################################ 

198 

199 

200def parse_changes( 

201 filename: str, 

202 signing_rules: Literal[-1, 0, 1] = 0, 

203 dsc_file: bool = False, 

204 keyrings=None, 

205) -> dict[str, str]: 

206 """ 

207 Parses a changes or source control (.dsc) file and returns a dictionary 

208 where each field is a key. The mandatory first argument is the 

209 filename of the .changes file. 

210 

211 signing_rules is an optional argument: 

212 

213 - If signing_rules == -1, no signature is required. 

214 - If signing_rules == 0 (the default), a signature is required. 

215 - If signing_rules == 1, it turns on the same strict format checking 

216 as dpkg-source. 

217 

218 The rules for (signing_rules == 1)-mode are: 

219 

220 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----" 

221 followed by any PGP header data and must end with a blank line. 

222 

223 - The data section must end with a blank line and must be followed by 

224 "-----BEGIN PGP SIGNATURE-----". 

225 

226 :param dsc_file: `filename` is a Debian source control (.dsc) file 

227 """ 

228 

229 with open(filename, "rb") as changes_in: 

230 content = changes_in.read() 

231 changes = parse_deb822(content, signing_rules, keyrings=keyrings) 

232 

233 if not dsc_file: 

234 # Finally ensure that everything needed for .changes is there 

235 must_keywords = ( 

236 "Format", 

237 "Date", 

238 "Source", 

239 "Architecture", 

240 "Version", 

241 "Distribution", 

242 "Maintainer", 

243 "Changes", 

244 "Files", 

245 ) 

246 

247 missingfields = [] 

248 for keyword in must_keywords: 

249 if keyword.lower() not in changes: 249 ↛ 250line 249 didn't jump to line 250, because the condition on line 249 was never true

250 missingfields.append(keyword) 

251 

252 if len(missingfields): 

253 raise ParseChangesError( 

254 "Missing mandatory field(s) in changes file (policy 5.5): %s" 

255 % (missingfields) 

256 ) 

257 

258 return changes 

259 

260 

261################################################################################ 

262 

263 

264def check_dsc_files( 

265 dsc_filename: str, 

266 dsc: Mapping[str, str], 

267 dsc_files: Mapping[str, Mapping[str, str]], 

268) -> list[str]: 

269 """ 

270 Verify that the files listed in the Files field of the .dsc are 

271 those expected given the announced Format. 

272 

273 :param dsc_filename: path of .dsc file 

274 :param dsc: the content of the .dsc parsed by :func:`parse_changes` 

275 :param dsc_files: the file list returned by :func:`build_file_list` 

276 :return: all errors detected 

277 """ 

278 rejmsg = [] 

279 

280 # Ensure .dsc lists proper set of source files according to the format 

281 # announced 

282 has: defaultdict[str, int] = defaultdict(lambda: 0) 

283 

284 ftype_lookup = ( 

285 (r"orig\.tar\.(gz|bz2|xz)\.asc", ("orig_tar_sig",)), 

286 (r"orig\.tar\.gz", ("orig_tar_gz", "orig_tar")), 

287 (r"diff\.gz", ("debian_diff",)), 

288 (r"tar\.gz", ("native_tar_gz", "native_tar")), 

289 (r"debian\.tar\.(gz|bz2|xz)", ("debian_tar",)), 

290 (r"orig\.tar\.(gz|bz2|xz)", ("orig_tar",)), 

291 (r"tar\.(gz|bz2|xz)", ("native_tar",)), 

292 (r"orig-.+\.tar\.(gz|bz2|xz)\.asc", ("more_orig_tar_sig",)), 

293 (r"orig-.+\.tar\.(gz|bz2|xz)", ("more_orig_tar",)), 

294 ) 

295 

296 for f in dsc_files: 

297 m = re_issource.match(f) 

298 if not m: 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true

299 rejmsg.append( 

300 "%s: %s in Files field not recognised as source." % (dsc_filename, f) 

301 ) 

302 continue 

303 

304 # Populate 'has' dictionary by resolving keys in lookup table 

305 matched = False 

306 for regex, keys in ftype_lookup: 306 ↛ 314line 306 didn't jump to line 314, because the loop on line 306 didn't complete

307 if re.match(regex, m.group(3)): 

308 matched = True 

309 for key in keys: 

310 has[key] += 1 

311 break 

312 

313 # File does not match anything in lookup table; reject 

314 if not matched: 314 ↛ 315line 314 didn't jump to line 315, because the condition on line 314 was never true

315 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f)) 

316 break 

317 

318 # Check for multiple files 

319 for file_type in ( 

320 "orig_tar", 

321 "orig_tar_sig", 

322 "native_tar", 

323 "debian_tar", 

324 "debian_diff", 

325 ): 

326 if has[file_type] > 1: 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true

327 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type)) 

328 

329 # Source format specific tests 

330 try: 

331 format = get_format_from_string(dsc["format"]) 

332 rejmsg.extend(["%s: %s" % (dsc_filename, x) for x in format.reject_msgs(has)]) 

333 

334 except UnknownFormatError: 

335 # Not an error here for now 

336 pass 

337 

338 return rejmsg 

339 

340 

341################################################################################ 

342 

343# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl 

344 

345 

346def build_file_list( 

347 changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum" 

348) -> dict[str, dict[str, str]]: 

349 files = {} 

350 

351 # Make sure we have a Files: field to parse... 

352 if field not in changes: 352 ↛ 353line 352 didn't jump to line 353, because the condition on line 352 was never true

353 raise NoFilesFieldError 

354 

355 # Validate .changes Format: field 

356 if not is_a_dsc: 356 ↛ 357line 356 didn't jump to line 357, because the condition on line 356 was never true

357 validate_changes_format(parse_format(changes["format"]), field) 

358 

359 includes_section = (not is_a_dsc) and field == "files" 

360 

361 # Parse each entry/line: 

362 for i in changes[field].split("\n"): 362 ↛ 387line 362 didn't jump to line 387, because the loop on line 362 didn't complete

363 if not i: 

364 break 

365 s = i.split() 

366 section = priority = "" 

367 try: 

368 if includes_section: 368 ↛ 369line 368 didn't jump to line 369, because the condition on line 368 was never true

369 (md5, size, section, priority, name) = s 

370 else: 

371 (md5, size, name) = s 

372 except ValueError: 

373 raise ParseChangesError(i) 

374 

375 if section == "": 375 ↛ 377line 375 didn't jump to line 377, because the condition on line 375 was never false

376 section = "-" 

377 if priority == "": 377 ↛ 380line 377 didn't jump to line 380, because the condition on line 377 was never false

378 priority = "-" 

379 

380 (section, component) = extract_component_from_section(section) 

381 

382 files[name] = dict( 

383 size=size, section=section, priority=priority, component=component 

384 ) 

385 files[name][hashname] = md5 

386 

387 return files 

388 

389 

390################################################################################ 

391 

392 

393def send_mail(message: str, whitelists: Optional[list[str]] = None) -> None: 

394 """sendmail wrapper, takes a message string 

395 

396 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists 

397 everything, otherwise an address is whitelisted if it is 

398 included in any of the lists. 

399 In addition a global whitelist can be specified in 

400 Dinstall::MailWhiteList. 

401 """ 

402 

403 msg = daklib.mail.parse_mail(message) 

404 

405 # The incoming message might be UTF-8, but outgoing mail should 

406 # use a legacy-compatible encoding. Set the content to the 

407 # text to make sure this is the case. 

408 # Note that this does not work with multipart messages. 

409 msg.set_content(msg.get_payload(), cte="quoted-printable") 

410 

411 # Check whether we're supposed to be sending mail 

412 call_sendmail = True 

413 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]: 

414 call_sendmail = False 

415 

416 if whitelists is None or None in whitelists: 

417 whitelists = [] 

418 if Cnf.get("Dinstall::MailWhiteList", ""): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true

419 whitelists.append(Cnf["Dinstall::MailWhiteList"]) 

420 if len(whitelists) != 0: 420 ↛ 421line 420 didn't jump to line 421, because the condition on line 420 was never true

421 whitelist = [] 

422 for path in whitelists: 

423 with open(path, "r") as whitelist_in: 

424 for line in whitelist_in: 

425 if not re_whitespace_comment.match(line): 

426 if re_re_mark.match(line): 

427 whitelist.append( 

428 re.compile(re_re_mark.sub("", line.strip(), 1)) 

429 ) 

430 else: 

431 whitelist.append(re.compile(re.escape(line.strip()))) 

432 

433 # Fields to check. 

434 fields = ["To", "Bcc", "Cc"] 

435 for field in fields: 

436 # Check each field 

437 value = msg.get(field, None) 

438 if value is not None: 

439 match = [] 

440 for item in value.split(","): 

441 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer( 

442 item.strip() 

443 ) 

444 mail_whitelisted = 0 

445 for wr in whitelist: 

446 if wr.match(mail): 

447 mail_whitelisted = 1 

448 break 

449 if not mail_whitelisted: 

450 print("Skipping {0} since it's not whitelisted".format(item)) 

451 continue 

452 match.append(item) 

453 

454 # Doesn't have any mail in whitelist so remove the header 

455 if len(match) == 0: 

456 del msg[field] 

457 else: 

458 msg.replace_header(field, ", ".join(match)) 

459 

460 # Change message fields in order if we don't have a To header 

461 if "To" not in msg: 

462 fields.reverse() 

463 for field in fields: 

464 if field in msg: 

465 msg[fields[-1]] = msg[field] 

466 del msg[field] 

467 break 

468 else: 

469 # return, as we removed all recipients. 

470 call_sendmail = False 

471 

472 # sign mail 

473 if mailkey := Cnf.get("Dinstall::Mail-Signature-Key", ""): 473 ↛ 483line 473 didn't jump to line 483, because the condition on line 473 was never false

474 kwargs = { 

475 "keyids": [mailkey], 

476 "pubring": Cnf.get("Dinstall::SigningPubKeyring") or None, 

477 "secring": Cnf.get("Dinstall::SigningKeyring") or None, 

478 "homedir": Cnf.get("Dinstall::SigningHomedir") or None, 

479 "passphrase_file": Cnf.get("Dinstall::SigningPassphraseFile") or None, 

480 } 

481 msg = daklib.mail.sign_mail(msg, **kwargs) 

482 

483 msg_bytes = msg.as_bytes(policy=email.policy.default) 

484 

485 maildir = Cnf.get("Dir::Mail") 

486 if maildir: 486 ↛ 493line 486 didn't jump to line 493, because the condition on line 486 was never false

487 path = os.path.join(maildir, datetime.datetime.now().isoformat()) 

488 path = find_next_free(path) 

489 with open(path, "wb") as fh: 

490 fh.write(msg_bytes) 

491 

492 # Invoke sendmail 

493 if not call_sendmail: 

494 return 

495 try: 

496 subprocess.run( 

497 Cnf["Dinstall::SendmailCommand"].split(), 

498 input=msg_bytes, 

499 check=True, 

500 stdout=subprocess.PIPE, 

501 stderr=subprocess.STDOUT, 

502 ) 

503 except subprocess.CalledProcessError as e: 

504 raise SendmailFailedError(e.output.decode().rstrip()) 

505 

506 

507################################################################################ 

508 

509 

510def poolify(source: str) -> str: 

511 """convert `source` name into directory path used in pool structure""" 

512 if source[:3] == "lib": 512 ↛ 513line 512 didn't jump to line 513, because the condition on line 512 was never true

513 return source[:4] + "/" + source + "/" 

514 else: 

515 return source[:1] + "/" + source + "/" 

516 

517 

518################################################################################ 

519 

520 

521def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None: 

522 if os.path.exists(dest) and os.path.isdir(dest): 

523 dest_dir = dest 

524 else: 

525 dest_dir = os.path.dirname(dest) 

526 if not os.path.lexists(dest_dir): 

527 umask = os.umask(00000) 

528 os.makedirs(dest_dir, 0o2775) 

529 os.umask(umask) 

530 # print "Moving %s to %s..." % (src, dest) 

531 if os.path.exists(dest) and os.path.isdir(dest): 

532 dest += "/" + os.path.basename(src) 

533 # Don't overwrite unless forced to 

534 if os.path.lexists(dest): 

535 if not overwrite: 

536 fubar("Can't move %s to %s - file already exists." % (src, dest)) 

537 else: 

538 if not os.access(dest, os.W_OK): 

539 fubar( 

540 "Can't move %s to %s - can't write to existing file." % (src, dest) 

541 ) 

542 shutil.copy2(src, dest) 

543 os.chmod(dest, perms) 

544 os.unlink(src) 

545 

546 

547################################################################################ 

548 

549 

550def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str: 

551 """Perform a substition of template""" 

552 with open(filename) as templatefile: 

553 template = templatefile.read() 

554 for k, v in subst_map.items(): 

555 template = template.replace(k, str(v)) 

556 return template 

557 

558 

559################################################################################ 

560 

561 

562def fubar(msg: str, exit_code: int = 1) -> NoReturn: 

563 """print error message and exit program""" 

564 print("E:", msg, file=sys.stderr) 

565 sys.exit(exit_code) 

566 

567 

568def warn(msg: str) -> None: 

569 """print warning message""" 

570 print("W:", msg, file=sys.stderr) 

571 

572 

573################################################################################ 

574 

575 

576def whoami() -> str: 

577 """get user name 

578 

579 Returns the user name with a laughable attempt at rfc822 conformancy 

580 (read: removing stray periods). 

581 """ 

582 return pwd.getpwuid(os.getuid())[4].split(",")[0].replace(".", "") 

583 

584 

585def getusername() -> str: 

586 """get login name""" 

587 return pwd.getpwuid(os.getuid())[0] 

588 

589 

590################################################################################ 

591 

592 

593def size_type(c: Union[int, float]) -> str: 

594 t = " B" 

595 if c > 10240: 

596 c = c / 1024 

597 t = " KB" 

598 if c > 10240: 598 ↛ 599line 598 didn't jump to line 599, because the condition on line 598 was never true

599 c = c / 1024 

600 t = " MB" 

601 return "%d%s" % (c, t) 

602 

603 

604################################################################################ 

605 

606 

607def find_next_free(dest: str, too_many: int = 100) -> str: 

608 extra = 0 

609 orig_dest = dest 

610 while os.path.lexists(dest) and extra < too_many: 

611 dest = orig_dest + "." + repr(extra) 

612 extra += 1 

613 if extra >= too_many: 613 ↛ 614line 613 didn't jump to line 614, because the condition on line 613 was never true

614 raise NoFreeFilenameError 

615 return dest 

616 

617 

618################################################################################ 

619 

620 

621def result_join(original: Iterable[Optional[str]], sep: str = "\t") -> str: 

622 return sep.join(x if x is not None else "" for x in original) 

623 

624 

625################################################################################ 

626 

627 

628def prefix_multi_line_string( 

629 lines: str, prefix: str, include_blank_lines: bool = False 

630) -> str: 

631 """prepend `prefix` to each line in `lines`""" 

632 return "\n".join( 

633 prefix + cleaned_line 

634 for line in lines.split("\n") 

635 if (cleaned_line := line.strip()) or include_blank_lines 

636 ) 

637 

638 

639################################################################################ 

640 

641 

642def join_with_commas_and(list: Sequence[str]) -> str: 

643 if len(list) == 0: 643 ↛ 644line 643 didn't jump to line 644, because the condition on line 643 was never true

644 return "nothing" 

645 if len(list) == 1: 645 ↛ 647line 645 didn't jump to line 647, because the condition on line 645 was never false

646 return list[0] 

647 return ", ".join(list[:-1]) + " and " + list[-1] 

648 

649 

650################################################################################ 

651 

652 

653def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str: 

654 pp_deps = ( 

655 f"{pkg} ({constraint} {version})" if constraint else pkg 

656 for pkg, constraint, version in deps 

657 ) 

658 return " |".join(pp_deps) 

659 

660 

661################################################################################ 

662 

663 

664def get_conf(): 

665 return Cnf 

666 

667 

668################################################################################ 

669 

670 

671def parse_args(Options) -> tuple[str, str, str, bool]: 

672 """Handle -a, -c and -s arguments; returns them as SQL constraints""" 

673 # XXX: This should go away and everything which calls it be converted 

674 # to use SQLA properly. For now, we'll just fix it not to use 

675 # the old Pg interface though 

676 session = DBConn().session() 

677 # Process suite 

678 if Options["Suite"]: 678 ↛ 696line 678 didn't jump to line 696, because the condition on line 678 was never false

679 suite_ids_list = [] 

680 for suitename in split_args(Options["Suite"]): 

681 suite = get_suite(suitename, session=session) 

682 if not suite or suite.suite_id is None: 682 ↛ 683line 682 didn't jump to line 683, because the condition on line 682 was never true

683 warn( 

684 "suite '%s' not recognised." 

685 % (suite and suite.suite_name or suitename) 

686 ) 

687 else: 

688 suite_ids_list.append(suite.suite_id) 

689 if suite_ids_list: 689 ↛ 694line 689 didn't jump to line 694, because the condition on line 689 was never false

690 con_suites = "AND su.id IN (%s)" % ", ".join( 

691 [str(i) for i in suite_ids_list] 

692 ) 

693 else: 

694 fubar("No valid suite given.") 

695 else: 

696 con_suites = "" 

697 

698 # Process component 

699 if Options["Component"]: 699 ↛ 700line 699 didn't jump to line 700, because the condition on line 699 was never true

700 component_ids_list = [] 

701 for componentname in split_args(Options["Component"]): 

702 component = get_component(componentname, session=session) 

703 if component is None: 

704 warn("component '%s' not recognised." % (componentname)) 

705 else: 

706 component_ids_list.append(component.component_id) 

707 if component_ids_list: 

708 con_components = "AND c.id IN (%s)" % ", ".join( 

709 [str(i) for i in component_ids_list] 

710 ) 

711 else: 

712 fubar("No valid component given.") 

713 else: 

714 con_components = "" 

715 

716 # Process architecture 

717 con_architectures = "" 

718 check_source = False 

719 if Options["Architecture"]: 719 ↛ 720line 719 didn't jump to line 720, because the condition on line 719 was never true

720 arch_ids_list = [] 

721 for archname in split_args(Options["Architecture"]): 

722 if archname == "source": 

723 check_source = True 

724 else: 

725 arch = get_architecture(archname, session=session) 

726 if arch is None: 

727 warn("architecture '%s' not recognised." % (archname)) 

728 else: 

729 arch_ids_list.append(arch.arch_id) 

730 if arch_ids_list: 

731 con_architectures = "AND a.id IN (%s)" % ", ".join( 

732 [str(i) for i in arch_ids_list] 

733 ) 

734 else: 

735 if not check_source: 

736 fubar("No valid architecture given.") 

737 else: 

738 check_source = True 

739 

740 return (con_suites, con_architectures, con_components, check_source) 

741 

742 

743################################################################################ 

744 

745 

746@functools.total_ordering 

747class ArchKey: 

748 """ 

749 Key object for use in sorting lists of architectures. 

750 

751 Sorts normally except that 'source' dominates all others. 

752 """ 

753 

754 __slots__ = ["arch", "issource"] 

755 

756 def __init__(self, arch, *args): 

757 self.arch = arch 

758 self.issource = arch == "source" 

759 

760 def __lt__(self, other: "ArchKey") -> bool: 

761 if self.issource: 

762 return not other.issource 

763 if other.issource: 

764 return False 

765 return self.arch < other.arch 

766 

767 def __eq__(self, other: object) -> bool: 

768 if not isinstance(other, ArchKey): 768 ↛ 769line 768 didn't jump to line 769, because the condition on line 768 was never true

769 return NotImplemented 

770 return self.arch == other.arch 

771 

772 

773################################################################################ 

774 

775 

776def split_args(s: str, dwim: bool = True) -> list[str]: 

777 """ 

778 Split command line arguments which can be separated by either commas 

779 or whitespace. If dwim is set, it will complain about string ending 

780 in comma since this usually means someone did 'dak ls -a i386, m68k 

781 foo' or something and the inevitable confusion resulting from 'm68k' 

782 being treated as an argument is undesirable. 

783 """ 

784 

785 if s.find(",") == -1: 785 ↛ 788line 785 didn't jump to line 788, because the condition on line 785 was never false

786 return s.split() 

787 else: 

788 if s[-1:] == "," and dwim: 

789 fubar("split_args: found trailing comma, spurious space maybe?") 

790 return s.split(",") 

791 

792 

793################################################################################ 

794 

795 

796def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]: 

797 if keyrings is None: 797 ↛ 800line 797 didn't jump to line 800, because the condition on line 797 was never false

798 keyrings = get_active_keyring_paths() 

799 

800 return ["--keyring={}".format(path) for path in keyrings] 

801 

802 

803################################################################################ 

804 

805 

806def _gpg_get_addresses_from_listing(output: bytes) -> list[str]: 

807 addresses: list[str] = [] 

808 

809 for line in output.split(b"\n"): 

810 parts = line.split(b":") 

811 if parts[0] not in (b"uid", b"pub"): 

812 continue 

813 if parts[1] in (b"i", b"d", b"r"): 813 ↛ 815line 813 didn't jump to line 815, because the condition on line 813 was never true

814 # Skip uid that is invalid, disabled or revoked 

815 continue 

816 try: 

817 uid_bytes = parts[9] 

818 except IndexError: 

819 continue 

820 try: 

821 uid = uid_bytes.decode(encoding="utf-8") 

822 except UnicodeDecodeError: 

823 # If the uid is not valid UTF-8, we assume it is an old uid 

824 # still encoding in Latin-1. 

825 uid = uid_bytes.decode(encoding="latin1") 

826 m = re_parse_maintainer.match(uid) 

827 if not m: 

828 continue 

829 address = m.group(2) 

830 if address.endswith("@debian.org"): 830 ↛ 833line 830 didn't jump to line 833, because the condition on line 830 was never true

831 # prefer @debian.org addresses 

832 # TODO: maybe not hardcode the domain 

833 addresses.insert(0, address) 

834 else: 

835 addresses.append(address) 

836 

837 return addresses 

838 

839 

840def gpg_get_key_addresses(fingerprint: str) -> list[str]: 

841 """retreive email addresses from gpg key uids for a given fingerprint""" 

842 addresses = key_uid_email_cache.get(fingerprint) 

843 if addresses is not None: 

844 return addresses 

845 

846 try: 

847 cmd = ["gpg", "--no-default-keyring"] 

848 cmd.extend(gpg_keyring_args()) 

849 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint]) 

850 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) 

851 except subprocess.CalledProcessError: 

852 addresses = [] 

853 else: 

854 addresses = _gpg_get_addresses_from_listing(output) 

855 

856 key_uid_email_cache[fingerprint] = addresses 

857 return addresses 

858 

859 

860################################################################################ 

861 

862 

863def open_ldap_connection(): 

864 """open connection to the configured LDAP server""" 

865 import ldap # type: ignore 

866 

867 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"] 

868 ca_cert_file = Cnf.get("Import-LDAP-Fingerprints::CACertFile") 

869 

870 conn = ldap.initialize(LDAPServer) 

871 

872 if ca_cert_file: 

873 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD) 

874 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file) 

875 conn.set_option(ldap.OPT_X_TLS_NEWCTX, True) 

876 conn.start_tls_s() 

877 

878 conn.simple_bind_s("", "") 

879 

880 return conn 

881 

882 

883################################################################################ 

884 

885 

886def get_logins_from_ldap(fingerprint: str = "*") -> dict[str, str]: 

887 """retrieve login from LDAP linked to a given fingerprint""" 

888 import ldap 

889 

890 conn = open_ldap_connection() 

891 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

892 Attrs = conn.search_s( 

893 LDAPDn, 

894 ldap.SCOPE_ONELEVEL, 

895 "(keyfingerprint=%s)" % fingerprint, 

896 ["uid", "keyfingerprint"], 

897 ) 

898 login: dict[str, str] = {} 

899 for elem in Attrs: 

900 fpr = elem[1]["keyFingerPrint"][0].decode() 

901 uid = elem[1]["uid"][0].decode() 

902 login[fpr] = uid 

903 return login 

904 

905 

906################################################################################ 

907 

908 

909def get_users_from_ldap() -> dict[str, str]: 

910 """retrieve login and user names from LDAP""" 

911 import ldap 

912 

913 conn = open_ldap_connection() 

914 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

915 Attrs = conn.search_s( 

916 LDAPDn, ldap.SCOPE_ONELEVEL, "(uid=*)", ["uid", "cn", "mn", "sn"] 

917 ) 

918 users: dict[str, str] = {} 

919 for elem in Attrs: 

920 elem = elem[1] 

921 name = [] 

922 for k in ("cn", "mn", "sn"): 

923 try: 

924 value = elem[k][0].decode() 

925 if value and value[0] != "-": 

926 name.append(value) 

927 except KeyError: 

928 pass 

929 users[" ".join(name)] = elem["uid"][0] 

930 return users 

931 

932 

933################################################################################ 

934 

935 

936def clean_symlink(src: str, dest: str, root: str) -> str: 

937 """ 

938 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'. 

939 Returns fixed 'src' 

940 """ 

941 src = src.replace(root, "", 1) 

942 dest = dest.replace(root, "", 1) 

943 dest = os.path.dirname(dest) 

944 new_src = "../" * len(dest.split("/")) 

945 return new_src + src 

946 

947 

948################################################################################ 

949 

950 

951def temp_dirname( 

952 parent: Optional[str] = None, 

953 prefix: str = "dak", 

954 suffix: str = "", 

955 mode: Optional[int] = None, 

956 group: Optional[str] = None, 

957) -> str: 

958 """ 

959 Return a secure and unique directory by pre-creating it. 

960 

961 :param parent: If non-null it will be the directory the directory is pre-created in. 

962 :param prefix: The filename will be prefixed with this string 

963 :param suffix: The filename will end with this string 

964 :param mode: If set the file will get chmodded to those permissions 

965 :param group: If set the file will get chgrped to the specified group. 

966 :return: Returns a pair (fd, name) 

967 

968 """ 

969 

970 tfname = tempfile.mkdtemp(suffix, prefix, parent) 

971 if mode is not None: 971 ↛ 973line 971 didn't jump to line 973, because the condition on line 971 was never false

972 os.chmod(tfname, mode) 

973 if group is not None: 973 ↛ 974line 973 didn't jump to line 974, because the condition on line 973 was never true

974 gid = grp.getgrnam(group).gr_gid 

975 os.chown(tfname, -1, gid) 

976 return tfname 

977 

978 

979################################################################################ 

980 

981 

982def get_changes_files(from_dir: str) -> list[str]: 

983 """ 

984 Takes a directory and lists all .changes files in it (as well as chdir'ing 

985 to the directory; this is due to broken behaviour on the part of p-u/p-a 

986 when you're not in the right place) 

987 

988 Returns a list of filenames 

989 """ 

990 try: 

991 # Much of the rest of p-u/p-a depends on being in the right place 

992 os.chdir(from_dir) 

993 changes_files = [x for x in os.listdir(from_dir) if x.endswith(".changes")] 

994 except OSError as e: 

995 fubar("Failed to read list from directory %s (%s)" % (from_dir, e)) 

996 

997 return changes_files 

998 

999 

1000################################################################################ 

1001 

1002 

1003Cnf = config.Config().Cnf 

1004 

1005################################################################################ 

1006 

1007 

1008def parse_wnpp_bug_file( 

1009 file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm", 

1010) -> dict[str, list[str]]: 

1011 """ 

1012 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm 

1013 Well, actually it parsed a local copy, but let's document the source 

1014 somewhere ;) 

1015 

1016 returns a dict associating source package name with a list of open wnpp 

1017 bugs (Yes, there might be more than one) 

1018 """ 

1019 

1020 try: 

1021 with open(file) as f: 

1022 lines = f.readlines() 

1023 except OSError: 

1024 print( 

1025 "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any." 

1026 % file 

1027 ) 

1028 lines = [] 

1029 wnpp = {} 

1030 

1031 for line in lines: 

1032 splited_line = line.split(": ", 1) 

1033 if len(splited_line) > 1: 

1034 wnpp[splited_line[0]] = splited_line[1].split("|") 

1035 

1036 for source in wnpp: 

1037 bugs = [] 

1038 for wnpp_bug in wnpp[source]: 

1039 bug_no = re.search(r"(\d)+", wnpp_bug).group() 

1040 if bug_no: 

1041 bugs.append(bug_no) 

1042 wnpp[source] = bugs 

1043 return wnpp 

1044 

1045 

1046################################################################################ 

1047 

1048 

1049def deb_extract_control(path: str) -> bytes: 

1050 """extract DEBIAN/control from a binary package""" 

1051 return apt_inst.DebFile(path).control.extractdata("control") 

1052 

1053 

1054################################################################################ 

1055 

1056 

1057def mail_addresses_for_upload( 

1058 maintainer: str, changed_by: str, fingerprint: str 

1059) -> list[str]: 

1060 """mail addresses to contact for an upload 

1061 

1062 :param maintainer: Maintainer field of the .changes file 

1063 :param changed_by: Changed-By field of the .changes file 

1064 :param fingerprint: fingerprint of the key used to sign the upload 

1065 :return: list of RFC 2047-encoded mail addresses to contact regarding 

1066 this upload 

1067 """ 

1068 recipients = Cnf.value_list("Dinstall::UploadMailRecipients") 

1069 if not recipients: 1069 ↛ 1077line 1069 didn't jump to line 1077, because the condition on line 1069 was never false

1070 recipients = [ 

1071 "maintainer", 

1072 "changed_by", 

1073 "signer", 

1074 ] 

1075 

1076 # Ensure signer is last if present 

1077 try: 

1078 recipients.remove("signer") 

1079 recipients.append("signer") 

1080 except ValueError: 

1081 pass 

1082 

1083 # Compute the set of addresses of the recipients 

1084 addresses = set() # Name + email 

1085 emails = set() # Email only, used to avoid duplicates 

1086 for recipient in recipients: 

1087 if recipient.startswith("mail:"): # Email hardcoded in config 1087 ↛ 1088line 1087 didn't jump to line 1088, because the condition on line 1087 was never true

1088 address = recipient[5:] 

1089 elif recipient == "maintainer": 

1090 address = maintainer 

1091 elif recipient == "changed_by": 

1092 address = changed_by 

1093 elif recipient == "signer": 1093 ↛ 1100line 1093 didn't jump to line 1100, because the condition on line 1093 was never false

1094 fpr_addresses = gpg_get_key_addresses(fingerprint) 

1095 address = fpr_addresses[0] if fpr_addresses else None 

1096 if any(x in emails for x in fpr_addresses): 

1097 # The signer already gets a copy via another email 

1098 address = None 

1099 else: 

1100 raise Exception( 

1101 "Unsupported entry in {0}: {1}".format( 

1102 "Dinstall::UploadMailRecipients", recipient 

1103 ) 

1104 ) 

1105 

1106 if address is not None: 

1107 mail = fix_maintainer(address)[3] 

1108 if mail not in emails: 

1109 addresses.add(address) 

1110 emails.add(mail) 

1111 

1112 encoded_addresses = [fix_maintainer(e)[1] for e in addresses] 

1113 return encoded_addresses 

1114 

1115 

1116################################################################################ 

1117 

1118 

1119def call_editor_for_file(path: str) -> None: 

1120 editor = os.environ.get("VISUAL", os.environ.get("EDITOR", "sensible-editor")) 

1121 subprocess.check_call([editor, path]) 

1122 

1123 

1124################################################################################ 

1125 

1126 

1127def call_editor(text: str = "", suffix: str = ".txt") -> str: 

1128 """run editor and return the result as a string 

1129 

1130 :param text: initial text 

1131 :param suffix: extension for temporary file 

1132 :return: string with the edited text 

1133 """ 

1134 with tempfile.NamedTemporaryFile(mode="w+t", suffix=suffix) as fh: 

1135 print(text, end="", file=fh) 

1136 fh.flush() 

1137 call_editor_for_file(fh.name) 

1138 fh.seek(0) 

1139 return fh.read() 

1140 

1141 

1142################################################################################ 

1143 

1144 

1145def check_reverse_depends( 

1146 removals: Iterable[str], 

1147 suite: str, 

1148 arches: Optional[Iterable[Architecture]] = None, 

1149 session=None, 

1150 cruft: bool = False, 

1151 quiet: bool = False, 

1152 include_arch_all: bool = True, 

1153) -> bool: 

1154 dbsuite = get_suite(suite, session) 

1155 overridesuite = dbsuite 

1156 if dbsuite.overridesuite is not None: 1156 ↛ 1157line 1156 didn't jump to line 1157, because the condition on line 1156 was never true

1157 overridesuite = get_suite(dbsuite.overridesuite, session) 

1158 dep_problem = False 

1159 p2c = {} 

1160 all_broken = defaultdict(lambda: defaultdict(set)) 1160 ↛ exitline 1160 didn't run the lambda on line 1160

1161 if arches: 1161 ↛ 1162line 1161 didn't jump to line 1162, because the condition on line 1161 was never true

1162 all_arches = set(arches) 

1163 else: 

1164 all_arches = set(x.arch_string for x in get_suite_architectures(suite)) 

1165 all_arches -= set(["source", "all"]) 

1166 removal_set = set(removals) 

1167 metakey_d = get_or_set_metadatakey("Depends", session) 

1168 metakey_p = get_or_set_metadatakey("Provides", session) 

1169 params = { 

1170 "suite_id": dbsuite.suite_id, 

1171 "metakey_d_id": metakey_d.key_id, 

1172 "metakey_p_id": metakey_p.key_id, 

1173 } 

1174 if include_arch_all: 1174 ↛ 1177line 1174 didn't jump to line 1177, because the condition on line 1174 was never false

1175 rdep_architectures = all_arches | set(["all"]) 

1176 else: 

1177 rdep_architectures = all_arches 

1178 for architecture in rdep_architectures: 

1179 deps = {} 

1180 sources = {} 

1181 virtual_packages = {} 

1182 try: 

1183 params["arch_id"] = get_architecture(architecture, session).arch_id 

1184 except AttributeError: 

1185 continue 

1186 

1187 statement = sql.text( 

1188 """ 

1189 SELECT b.package, s.source, c.name as component, 

1190 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends, 

1191 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides 

1192 FROM binaries b 

1193 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id 

1194 JOIN source s ON b.source = s.id 

1195 JOIN files_archive_map af ON b.file = af.file_id 

1196 JOIN component c ON af.component_id = c.id 

1197 WHERE b.architecture = :arch_id""" 

1198 ) 

1199 query = ( 

1200 session.query( 

1201 sql.column("package"), 

1202 sql.column("source"), 

1203 sql.column("component"), 

1204 sql.column("depends"), 

1205 sql.column("provides"), 

1206 ) 

1207 .from_statement(statement) 

1208 .params(params) 

1209 ) 

1210 for package, source, component, depends, provides in query: 

1211 sources[package] = source 

1212 p2c[package] = component 

1213 if depends is not None: 1213 ↛ 1214line 1213 didn't jump to line 1214, because the condition on line 1213 was never true

1214 deps[package] = depends 

1215 # Maintain a counter for each virtual package. If a 

1216 # Provides: exists, set the counter to 0 and count all 

1217 # provides by a package not in the list for removal. 

1218 # If the counter stays 0 at the end, we know that only 

1219 # the to-be-removed packages provided this virtual 

1220 # package. 

1221 if provides is not None: 1221 ↛ 1222line 1221 didn't jump to line 1222, because the condition on line 1221 was never true

1222 for virtual_pkg in provides.split(","): 

1223 virtual_pkg = virtual_pkg.strip() 

1224 if virtual_pkg == package: 

1225 continue 

1226 if virtual_pkg not in virtual_packages: 

1227 virtual_packages[virtual_pkg] = 0 

1228 if package not in removals: 

1229 virtual_packages[virtual_pkg] += 1 

1230 

1231 # If a virtual package is only provided by the to-be-removed 

1232 # packages, treat the virtual package as to-be-removed too. 

1233 removal_set.update( 

1234 virtual_pkg 

1235 for virtual_pkg in virtual_packages 

1236 if not virtual_packages[virtual_pkg] 

1237 ) 

1238 

1239 # Check binary dependencies (Depends) 

1240 for package in deps: 1240 ↛ 1241line 1240 didn't jump to line 1241, because the loop on line 1240 never started

1241 if package in removals: 

1242 continue 

1243 try: 

1244 parsed_dep = apt_pkg.parse_depends(deps[package]) 

1245 except ValueError as e: 

1246 print("Error for package %s: %s" % (package, e)) 

1247 parsed_dep = [] 

1248 for dep in parsed_dep: 

1249 # Check for partial breakage. If a package has a ORed 

1250 # dependency, there is only a dependency problem if all 

1251 # packages in the ORed depends will be removed. 

1252 unsat = 0 

1253 for dep_package, _, _ in dep: 

1254 if dep_package in removals: 

1255 unsat += 1 

1256 if unsat == len(dep): 

1257 component = p2c[package] 

1258 source = sources[package] 

1259 if component != "main": 

1260 source = "%s/%s" % (source, component) 

1261 all_broken[source][package].add(architecture) 

1262 dep_problem = True 

1263 

1264 if all_broken and not quiet: 1264 ↛ 1265line 1264 didn't jump to line 1265, because the condition on line 1264 was never true

1265 if cruft: 

1266 print(" - broken Depends:") 

1267 else: 

1268 print("# Broken Depends:") 

1269 for source, bindict in sorted(all_broken.items()): 

1270 lines = [] 

1271 for binary, arches in sorted(bindict.items()): 

1272 if arches == all_arches or "all" in arches: 

1273 lines.append(binary) 

1274 else: 

1275 lines.append("%s [%s]" % (binary, " ".join(sorted(arches)))) 

1276 if cruft: 

1277 print(" %s: %s" % (source, lines[0])) 

1278 else: 

1279 print("%s: %s" % (source, lines[0])) 

1280 for line in lines[1:]: 

1281 if cruft: 

1282 print(" " + " " * (len(source) + 2) + line) 

1283 else: 

1284 print(" " * (len(source) + 2) + line) 

1285 if not cruft: 

1286 print() 

1287 

1288 # Check source dependencies (Build-Depends and Build-Depends-Indep) 

1289 all_broken = defaultdict(set) 

1290 metakey_bd = get_or_set_metadatakey("Build-Depends", session) 

1291 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session) 

1292 if include_arch_all: 1292 ↛ 1295line 1292 didn't jump to line 1295, because the condition on line 1292 was never false

1293 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id) 

1294 else: 

1295 metakey_ids = (metakey_bd.key_id,) 

1296 

1297 params = { 

1298 "suite_id": dbsuite.suite_id, 

1299 "metakey_ids": metakey_ids, 

1300 } 

1301 statement = sql.text( 

1302 """ 

1303 SELECT s.source, string_agg(sm.value, ', ') as build_dep 

1304 FROM source s 

1305 JOIN source_metadata sm ON s.id = sm.src_id 

1306 WHERE s.id in 

1307 (SELECT src FROM newest_src_association 

1308 WHERE suite = :suite_id) 

1309 AND sm.key_id in :metakey_ids 

1310 GROUP BY s.id, s.source""" 

1311 ) 

1312 query = ( 

1313 session.query(sql.column("source"), sql.column("build_dep")) 

1314 .from_statement(statement) 

1315 .params(params) 

1316 ) 

1317 for source, build_dep in query: 

1318 if source in removals: 

1319 continue 

1320 parsed_dep = [] 

1321 if build_dep is not None: 1321 ↛ 1328line 1321 didn't jump to line 1328, because the condition on line 1321 was never false

1322 # Remove [arch] information since we want to see breakage on all arches 

1323 build_dep = re_build_dep_arch.sub("", build_dep) 

1324 try: 

1325 parsed_dep = apt_pkg.parse_src_depends(build_dep) 

1326 except ValueError as e: 

1327 print("Error for source %s: %s" % (source, e)) 

1328 for dep in parsed_dep: 

1329 unsat = 0 

1330 for dep_package, _, _ in dep: 

1331 if dep_package in removals: 1331 ↛ 1332line 1331 didn't jump to line 1332, because the condition on line 1331 was never true

1332 unsat += 1 

1333 if unsat == len(dep): 1333 ↛ 1334line 1333 didn't jump to line 1334

1334 (component,) = ( 

1335 session.query(Component.component_name) 

1336 .join(Component.overrides) 

1337 .filter(Override.suite == overridesuite) 

1338 .filter( 

1339 Override.package 

1340 == re.sub("/(contrib|non-free-firmware|non-free)$", "", source) 

1341 ) 

1342 .join(Override.overridetype) 

1343 .filter(OverrideType.overridetype == "dsc") 

1344 .first() 

1345 ) 

1346 key = source 

1347 if component != "main": 

1348 key = "%s/%s" % (source, component) 

1349 all_broken[key].add(pp_deps(dep)) 

1350 dep_problem = True 

1351 

1352 if all_broken and not quiet: 1352 ↛ 1353line 1352 didn't jump to line 1353, because the condition on line 1352 was never true

1353 if cruft: 

1354 print(" - broken Build-Depends:") 

1355 else: 

1356 print("# Broken Build-Depends:") 

1357 for source, bdeps in sorted(all_broken.items()): 

1358 bdeps = sorted(bdeps) 

1359 if cruft: 

1360 print(" %s: %s" % (source, bdeps[0])) 

1361 else: 

1362 print("%s: %s" % (source, bdeps[0])) 

1363 for bdep in bdeps[1:]: 

1364 if cruft: 

1365 print(" " + " " * (len(source) + 2) + bdep) 

1366 else: 

1367 print(" " * (len(source) + 2) + bdep) 

1368 if not cruft: 

1369 print() 

1370 

1371 return dep_problem 

1372 

1373 

1374################################################################################ 

1375 

1376 

1377def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]: 

1378 """source packages referenced via Built-Using 

1379 

1380 :param control: control file to take Built-Using field from 

1381 :return: list of (source_name, source_version) pairs 

1382 """ 

1383 built_using = control.get("Built-Using", None) 

1384 if built_using is None: 

1385 return [] 

1386 

1387 bu = [] 

1388 for dep in apt_pkg.parse_depends(built_using): 

1389 assert len(dep) == 1, "Alternatives are not allowed in Built-Using field" 

1390 source_name, source_version, comp = dep[0] 

1391 assert comp == "=", "Built-Using must contain strict dependencies" 

1392 bu.append((source_name, source_version)) 

1393 

1394 return bu 

1395 

1396 

1397################################################################################ 

1398 

1399 

1400def is_in_debug_section(control: Mapping[str, str]) -> bool: 

1401 """binary package is a debug package 

1402 

1403 :param control: control file of binary package 

1404 :return: True if the binary package is a debug package 

1405 """ 

1406 section = control["Section"].split("/", 1)[-1] 

1407 auto_built_package = control.get("Auto-Built-Package") 

1408 return section == "debug" and auto_built_package == "debug-symbols" 

1409 

1410 

1411################################################################################ 

1412 

1413 

1414def find_possibly_compressed_file(filename: str) -> str: 

1415 """ 

1416 

1417 :param filename: path to a control file (Sources, Packages, etc) to 

1418 look for 

1419 :return: path to the (possibly compressed) control file, or null if the 

1420 file doesn't exist 

1421 """ 

1422 _compressions = ("", ".xz", ".gz", ".bz2") 

1423 

1424 for ext in _compressions: 1424 ↛ 1429line 1424 didn't jump to line 1429, because the loop on line 1424 didn't complete

1425 _file = filename + ext 

1426 if os.path.exists(_file): 

1427 return _file 

1428 

1429 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename) 

1430 

1431 

1432################################################################################ 

1433 

1434 

1435def parse_boolean_from_user(value: str) -> bool: 

1436 value = value.lower() 

1437 if value in {"yes", "true", "enable", "enabled"}: 

1438 return True 

1439 if value in {"no", "false", "disable", "disabled"}: 1439 ↛ 1441line 1439 didn't jump to line 1441, because the condition on line 1439 was never false

1440 return False 

1441 raise ValueError("Not sure whether %s should be a True or a False" % value) 

1442 

1443 

1444def suite_suffix(suite_name: str) -> str: 

1445 """Return suite_suffix for the given suite""" 

1446 suffix = Cnf.find("Dinstall::SuiteSuffix", "") 

1447 if suffix == "": 1447 ↛ 1449line 1447 didn't jump to line 1449, because the condition on line 1447 was never false

1448 return "" 

1449 elif "Dinstall::SuiteSuffixSuites" not in Cnf: 

1450 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future 

1451 return suffix 

1452 elif suite_name in Cnf.value_list("Dinstall::SuiteSuffixSuites"): 

1453 return suffix 

1454 return "" 

1455 

1456 

1457################################################################################ 

1458 

1459 

1460def process_buildinfos( 

1461 directory: str, 

1462 buildinfo_files: "Iterable[daklib.upload.HashedFile]", 

1463 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1464 logger: "daklib.daklog.Logger", 

1465) -> None: 

1466 """Copy buildinfo files into Dir::BuildinfoArchive 

1467 

1468 :param directory: directory where .changes is stored 

1469 :param buildinfo_files: names of buildinfo files 

1470 :param fs_transaction: FilesystemTransaction instance 

1471 :param logger: logger instance 

1472 """ 

1473 

1474 if "Dir::BuildinfoArchive" not in Cnf: 

1475 return 

1476 

1477 target_dir = os.path.join( 

1478 Cnf["Dir::BuildinfoArchive"], 

1479 datetime.datetime.now().strftime("%Y/%m/%d"), 

1480 ) 

1481 

1482 for f in buildinfo_files: 

1483 src = os.path.join(directory, f.filename) 

1484 dst = find_next_free(os.path.join(target_dir, f.filename)) 

1485 

1486 logger.log(["Archiving", f.filename]) 

1487 fs_transaction.copy(src, dst, mode=0o644) 

1488 

1489 

1490################################################################################ 

1491 

1492 

1493def move_to_morgue( 

1494 morguesubdir: str, 

1495 filenames: Iterable[str], 

1496 fs_transaction: "daklib.fstransactions.FilesystemTransaction", 

1497 logger: "daklib.daklog.Logger", 

1498): 

1499 """Move a file to the correct dir in morgue 

1500 

1501 :param morguesubdir: subdirectory of morgue where this file needs to go 

1502 :param filenames: names of files 

1503 :param fs_transaction: FilesystemTransaction instance 

1504 :param logger: logger instance 

1505 """ 

1506 

1507 morguedir = Cnf.get("Dir::Morgue", os.path.join(Cnf.get("Dir::Base"), "morgue")) 

1508 

1509 # Build directory as morguedir/morguesubdir/year/month/day 

1510 now = datetime.datetime.now() 

1511 dest = os.path.join( 

1512 morguedir, morguesubdir, str(now.year), "%.2d" % now.month, "%.2d" % now.day 

1513 ) 

1514 

1515 for filename in filenames: 

1516 dest_filename = dest + "/" + os.path.basename(filename) 

1517 # If the destination file exists; try to find another filename to use 

1518 if os.path.lexists(dest_filename): 1518 ↛ 1519line 1518 didn't jump to line 1519, because the condition on line 1518 was never true

1519 dest_filename = find_next_free(dest_filename) 

1520 logger.log(["move to morgue", filename, dest_filename]) 

1521 fs_transaction.move(filename, dest_filename)