1# vim:set et ts=4 sw=4: 

2 

3"""Utility functions 

4 

5@contact: Debian FTP Master <ftpmaster@debian.org> 

6@copyright: 2000, 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org> 

7@license: GNU General Public License version 2 or later 

8""" 

9 

10# This program is free software; you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation; either version 2 of the License, or 

13# (at your option) any later version. 

14 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19 

20# You should have received a copy of the GNU General Public License 

21# along with this program; if not, write to the Free Software 

22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

23 

24import datetime 

25import os 

26import pwd 

27import grp 

28import shutil 

29import sqlalchemy.sql as sql 

30import sys 

31import tempfile 

32import apt_inst 

33import apt_pkg 

34import re 

35import email.policy 

36import subprocess 

37import errno 

38import functools 

39from collections.abc import Iterable, Mapping, Sequence 

40from typing import Literal, NoReturn, Optional, TYPE_CHECKING, Union 

41 

42import daklib.config as config 

43import daklib.mail 

44from daklib.dbconn import Architecture, DBConn, get_architecture, get_component, get_suite, \ 

45 get_active_keyring_paths, \ 

46 get_suite_architectures, get_or_set_metadatakey, \ 

47 Component, Override, OverrideType 

48from .dak_exceptions import * 

49from .gpg import SignedFile 

50from .textutils import fix_maintainer 

51from .regexes import re_single_line_field, \ 

52 re_multi_line_field, re_srchasver, \ 

53 re_re_mark, re_whitespace_comment, re_issource, \ 

54 re_build_dep_arch, re_parse_maintainer 

55 

56from .formats import parse_format, validate_changes_format 

57from .srcformats import get_format_from_string 

58from collections import defaultdict 

59 

60if TYPE_CHECKING: 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true

61 import daklib.daklog 

62 import daklib.fstransactions 

63 import daklib.upload 

64 

65################################################################################ 

66 

67key_uid_email_cache: dict[str, list[str]] = {} #: Cache for email addresses from gpg key uids 

68 

69################################################################################ 

70 

71 

72def input_or_exit(prompt: Optional[str] = None) -> str: 

73 try: 

74 return input(prompt) 

75 except EOFError: 

76 sys.exit("\nUser interrupt (^D).") 

77 

78################################################################################ 

79 

80 

81def extract_component_from_section(section: str) -> tuple[str, str]: 

82 """split "section" into "section", "component" parts 

83 

84 If "component" is not given, "main" is used instead. 

85 

86 :return: tuple (section, component) 

87 """ 

88 if section.find('/') != -1: 

89 return section, section.split('/', 1)[0] 

90 return section, "main" 

91 

92################################################################################ 

93 

94 

95def parse_deb822(armored_contents: bytes, signing_rules: Literal[-1, 0, 1] = 0, keyrings=None) -> dict[str, str]: 

96 require_signature = True 

97 if keyrings is None: 97 ↛ 101line 97 didn't jump to line 101, because the condition on line 97 was never false

98 keyrings = [] 

99 require_signature = False 

100 

101 signed_file = SignedFile(armored_contents, keyrings=keyrings, require_signature=require_signature) 

102 contents = signed_file.contents.decode('utf-8') 

103 

104 error = "" 

105 changes = {} 

106 

107 # Split the lines in the input, keeping the linebreaks. 

108 lines = contents.splitlines(True) 

109 

110 if len(lines) == 0: 

111 raise ParseChangesError("[Empty changes file]") 

112 

113 # Reindex by line number so we can easily verify the format of 

114 # .dsc files... 

115 index = 0 

116 indexed_lines = {} 

117 for line in lines: 

118 index += 1 

119 indexed_lines[index] = line[:-1] 

120 

121 num_of_lines = len(indexed_lines) 

122 index = 0 

123 first = -1 

124 while index < num_of_lines: 

125 index += 1 

126 line = indexed_lines[index] 

127 if line == "" and signing_rules == 1: 127 ↛ 128line 127 didn't jump to line 128, because the condition on line 127 was never true

128 if index != num_of_lines: 

129 raise InvalidDscError(index) 

130 break 

131 if slf := re_single_line_field.match(line): 

132 field = slf.groups()[0].lower() 

133 changes[field] = slf.groups()[1] 

134 first = 1 

135 continue 

136 if line == " .": 

137 changes[field] += '\n' 

138 continue 

139 if mlf := re_multi_line_field.match(line): 

140 if first == -1: 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true

141 raise ParseChangesError("'%s'\n [Multi-line field continuing on from nothing?]" % (line)) 

142 if first == 1 and changes[field] != "": 

143 changes[field] += '\n' 

144 first = 0 

145 changes[field] += mlf.groups()[0] + '\n' 

146 continue 

147 error += line 

148 

149 changes["filecontents"] = armored_contents.decode() 

150 

151 if "source" in changes: 

152 # Strip the source version in brackets from the source field, 

153 # put it in the "source-version" field instead. 

154 if srcver := re_srchasver.search(changes["source"]): 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true

155 changes["source"] = srcver.group(1) 

156 changes["source-version"] = srcver.group(2) 

157 

158 if error: 158 ↛ 159line 158 didn't jump to line 159, because the condition on line 158 was never true

159 raise ParseChangesError(error) 

160 

161 return changes 

162 

163################################################################################ 

164 

165 

166def parse_changes(filename: str, signing_rules: Literal[-1, 0, 1] = 0, dsc_file: bool = False, keyrings=None) -> dict[str, str]: 

167 """ 

168 Parses a changes or source control (.dsc) file and returns a dictionary 

169 where each field is a key. The mandatory first argument is the 

170 filename of the .changes file. 

171 

172 signing_rules is an optional argument: 

173 

174 - If signing_rules == -1, no signature is required. 

175 - If signing_rules == 0 (the default), a signature is required. 

176 - If signing_rules == 1, it turns on the same strict format checking 

177 as dpkg-source. 

178 

179 The rules for (signing_rules == 1)-mode are: 

180 

181 - The PGP header consists of "-----BEGIN PGP SIGNED MESSAGE-----" 

182 followed by any PGP header data and must end with a blank line. 

183 

184 - The data section must end with a blank line and must be followed by 

185 "-----BEGIN PGP SIGNATURE-----". 

186 

187 :param dsc_file: `filename` is a Debian source control (.dsc) file 

188 """ 

189 

190 with open(filename, 'rb') as changes_in: 

191 content = changes_in.read() 

192 changes = parse_deb822(content, signing_rules, keyrings=keyrings) 

193 

194 if not dsc_file: 

195 # Finally ensure that everything needed for .changes is there 

196 must_keywords = ('Format', 'Date', 'Source', 'Architecture', 'Version', 

197 'Distribution', 'Maintainer', 'Changes', 'Files') 

198 

199 missingfields = [] 

200 for keyword in must_keywords: 

201 if keyword.lower() not in changes: 201 ↛ 202line 201 didn't jump to line 202, because the condition on line 201 was never true

202 missingfields.append(keyword) 

203 

204 if len(missingfields): 

205 raise ParseChangesError("Missing mandatory field(s) in changes file (policy 5.5): %s" % (missingfields)) 

206 

207 return changes 

208 

209 

210################################################################################ 

211 

212 

213def check_dsc_files(dsc_filename: str, dsc: Mapping[str, str], dsc_files: Mapping[str, Mapping[str, str]]) -> list[str]: 

214 """ 

215 Verify that the files listed in the Files field of the .dsc are 

216 those expected given the announced Format. 

217 

218 :param dsc_filename: path of .dsc file 

219 :param dsc: the content of the .dsc parsed by :func:`parse_changes` 

220 :param dsc_files: the file list returned by :func:`build_file_list` 

221 :return: all errors detected 

222 """ 

223 rejmsg = [] 

224 

225 # Ensure .dsc lists proper set of source files according to the format 

226 # announced 

227 has: defaultdict[str, int] = defaultdict(lambda: 0) 

228 

229 ftype_lookup = ( 

230 (r'orig\.tar\.(gz|bz2|xz)\.asc', ('orig_tar_sig',)), 

231 (r'orig\.tar\.gz', ('orig_tar_gz', 'orig_tar')), 

232 (r'diff\.gz', ('debian_diff',)), 

233 (r'tar\.gz', ('native_tar_gz', 'native_tar')), 

234 (r'debian\.tar\.(gz|bz2|xz)', ('debian_tar',)), 

235 (r'orig\.tar\.(gz|bz2|xz)', ('orig_tar',)), 

236 (r'tar\.(gz|bz2|xz)', ('native_tar',)), 

237 (r'orig-.+\.tar\.(gz|bz2|xz)\.asc', ('more_orig_tar_sig',)), 

238 (r'orig-.+\.tar\.(gz|bz2|xz)', ('more_orig_tar',)), 

239 ) 

240 

241 for f in dsc_files: 

242 m = re_issource.match(f) 

243 if not m: 243 ↛ 244line 243 didn't jump to line 244, because the condition on line 243 was never true

244 rejmsg.append("%s: %s in Files field not recognised as source." 

245 % (dsc_filename, f)) 

246 continue 

247 

248 # Populate 'has' dictionary by resolving keys in lookup table 

249 matched = False 

250 for regex, keys in ftype_lookup: 250 ↛ 258line 250 didn't jump to line 258, because the loop on line 250 didn't complete

251 if re.match(regex, m.group(3)): 

252 matched = True 

253 for key in keys: 

254 has[key] += 1 

255 break 

256 

257 # File does not match anything in lookup table; reject 

258 if not matched: 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true

259 rejmsg.append("%s: unexpected source file '%s'" % (dsc_filename, f)) 

260 break 

261 

262 # Check for multiple files 

263 for file_type in ('orig_tar', 'orig_tar_sig', 'native_tar', 'debian_tar', 'debian_diff'): 

264 if has[file_type] > 1: 264 ↛ 265line 264 didn't jump to line 265, because the condition on line 264 was never true

265 rejmsg.append("%s: lists multiple %s" % (dsc_filename, file_type)) 

266 

267 # Source format specific tests 

268 try: 

269 format = get_format_from_string(dsc['format']) 

270 rejmsg.extend([ 

271 '%s: %s' % (dsc_filename, x) for x in format.reject_msgs(has) 

272 ]) 

273 

274 except UnknownFormatError: 

275 # Not an error here for now 

276 pass 

277 

278 return rejmsg 

279 

280################################################################################ 

281 

282# Dropped support for 1.4 and ``buggy dchanges 3.4'' (?!) compared to di.pl 

283 

284 

285def build_file_list(changes: Mapping[str, str], is_a_dsc: bool = False, field="files", hashname="md5sum") -> dict[str, dict[str, str]]: 

286 files = {} 

287 

288 # Make sure we have a Files: field to parse... 

289 if field not in changes: 289 ↛ 290line 289 didn't jump to line 290, because the condition on line 289 was never true

290 raise NoFilesFieldError 

291 

292 # Validate .changes Format: field 

293 if not is_a_dsc: 293 ↛ 294line 293 didn't jump to line 294, because the condition on line 293 was never true

294 validate_changes_format(parse_format(changes['format']), field) 

295 

296 includes_section = (not is_a_dsc) and field == "files" 

297 

298 # Parse each entry/line: 

299 for i in changes[field].split('\n'): 299 ↛ 323line 299 didn't jump to line 323, because the loop on line 299 didn't complete

300 if not i: 

301 break 

302 s = i.split() 

303 section = priority = "" 

304 try: 

305 if includes_section: 305 ↛ 306line 305 didn't jump to line 306, because the condition on line 305 was never true

306 (md5, size, section, priority, name) = s 

307 else: 

308 (md5, size, name) = s 

309 except ValueError: 

310 raise ParseChangesError(i) 

311 

312 if section == "": 312 ↛ 314line 312 didn't jump to line 314, because the condition on line 312 was never false

313 section = "-" 

314 if priority == "": 314 ↛ 317line 314 didn't jump to line 317, because the condition on line 314 was never false

315 priority = "-" 

316 

317 (section, component) = extract_component_from_section(section) 

318 

319 files[name] = dict(size=size, section=section, 

320 priority=priority, component=component) 

321 files[name][hashname] = md5 

322 

323 return files 

324 

325################################################################################ 

326 

327 

328def send_mail(message: str, whitelists: Optional[list[str]] = None) -> None: 

329 """sendmail wrapper, takes a message string 

330 

331 :param whitelists: path to whitelists. :const:`None` or an empty list whitelists 

332 everything, otherwise an address is whitelisted if it is 

333 included in any of the lists. 

334 In addition a global whitelist can be specified in 

335 Dinstall::MailWhiteList. 

336 """ 

337 

338 msg = daklib.mail.parse_mail(message) 

339 

340 # The incoming message might be UTF-8, but outgoing mail should 

341 # use a legacy-compatible encoding. Set the content to the 

342 # text to make sure this is the case. 

343 # Note that this does not work with multipart messages. 

344 msg.set_content(msg.get_payload(), cte="quoted-printable") 

345 

346 # Check whether we're supposed to be sending mail 

347 call_sendmail = True 

348 if "Dinstall::Options::No-Mail" in Cnf and Cnf["Dinstall::Options::No-Mail"]: 

349 call_sendmail = False 

350 

351 if whitelists is None or None in whitelists: 

352 whitelists = [] 

353 if Cnf.get('Dinstall::MailWhiteList', ''): 353 ↛ 354line 353 didn't jump to line 354, because the condition on line 353 was never true

354 whitelists.append(Cnf['Dinstall::MailWhiteList']) 

355 if len(whitelists) != 0: 355 ↛ 356line 355 didn't jump to line 356, because the condition on line 355 was never true

356 whitelist = [] 

357 for path in whitelists: 

358 with open(path, 'r') as whitelist_in: 

359 for line in whitelist_in: 

360 if not re_whitespace_comment.match(line): 

361 if re_re_mark.match(line): 

362 whitelist.append(re.compile(re_re_mark.sub("", line.strip(), 1))) 

363 else: 

364 whitelist.append(re.compile(re.escape(line.strip()))) 

365 

366 # Fields to check. 

367 fields = ["To", "Bcc", "Cc"] 

368 for field in fields: 

369 # Check each field 

370 value = msg.get(field, None) 

371 if value is not None: 

372 match = [] 

373 for item in value.split(","): 

374 (rfc822_maint, rfc2047_maint, name, mail) = fix_maintainer(item.strip()) 

375 mail_whitelisted = 0 

376 for wr in whitelist: 

377 if wr.match(mail): 

378 mail_whitelisted = 1 

379 break 

380 if not mail_whitelisted: 

381 print("Skipping {0} since it's not whitelisted".format(item)) 

382 continue 

383 match.append(item) 

384 

385 # Doesn't have any mail in whitelist so remove the header 

386 if len(match) == 0: 

387 del msg[field] 

388 else: 

389 msg.replace_header(field, ', '.join(match)) 

390 

391 # Change message fields in order if we don't have a To header 

392 if "To" not in msg: 

393 fields.reverse() 

394 for field in fields: 

395 if field in msg: 

396 msg[fields[-1]] = msg[field] 

397 del msg[field] 

398 break 

399 else: 

400 # return, as we removed all recipients. 

401 call_sendmail = False 

402 

403 # sign mail 

404 if mailkey := Cnf.get('Dinstall::Mail-Signature-Key', ''): 404 ↛ 414line 404 didn't jump to line 414, because the condition on line 404 was never false

405 kwargs = { 

406 'keyids': [mailkey], 

407 'pubring': Cnf.get('Dinstall::SigningPubKeyring') or None, 

408 'secring': Cnf.get('Dinstall::SigningKeyring') or None, 

409 'homedir': Cnf.get('Dinstall::SigningHomedir') or None, 

410 'passphrase_file': Cnf.get('Dinstall::SigningPassphraseFile') or None, 

411 } 

412 msg = daklib.mail.sign_mail(msg, **kwargs) 

413 

414 msg_bytes = msg.as_bytes(policy=email.policy.default) 

415 

416 maildir = Cnf.get('Dir::Mail') 

417 if maildir: 417 ↛ 424line 417 didn't jump to line 424, because the condition on line 417 was never false

418 path = os.path.join(maildir, datetime.datetime.now().isoformat()) 

419 path = find_next_free(path) 

420 with open(path, 'wb') as fh: 

421 fh.write(msg_bytes) 

422 

423 # Invoke sendmail 

424 if not call_sendmail: 

425 return 

426 try: 

427 subprocess.run(Cnf["Dinstall::SendmailCommand"].split(), 

428 input=msg_bytes, 

429 check=True, 

430 stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 

431 except subprocess.CalledProcessError as e: 

432 raise SendmailFailedError(e.output.decode().rstrip()) 

433 

434################################################################################ 

435 

436 

437def poolify(source: str) -> str: 

438 """convert `source` name into directory path used in pool structure""" 

439 if source[:3] == "lib": 439 ↛ 440line 439 didn't jump to line 440, because the condition on line 439 was never true

440 return source[:4] + '/' + source + '/' 

441 else: 

442 return source[:1] + '/' + source + '/' 

443 

444################################################################################ 

445 

446 

447def move(src: str, dest: str, overwrite: bool = False, perms: int = 0o664) -> None: 

448 if os.path.exists(dest) and os.path.isdir(dest): 

449 dest_dir = dest 

450 else: 

451 dest_dir = os.path.dirname(dest) 

452 if not os.path.lexists(dest_dir): 

453 umask = os.umask(00000) 

454 os.makedirs(dest_dir, 0o2775) 

455 os.umask(umask) 

456 # print "Moving %s to %s..." % (src, dest) 

457 if os.path.exists(dest) and os.path.isdir(dest): 

458 dest += '/' + os.path.basename(src) 

459 # Don't overwrite unless forced to 

460 if os.path.lexists(dest): 

461 if not overwrite: 

462 fubar("Can't move %s to %s - file already exists." % (src, dest)) 

463 else: 

464 if not os.access(dest, os.W_OK): 

465 fubar("Can't move %s to %s - can't write to existing file." % (src, dest)) 

466 shutil.copy2(src, dest) 

467 os.chmod(dest, perms) 

468 os.unlink(src) 

469 

470 

471################################################################################ 

472 

473 

474def TemplateSubst(subst_map: Mapping[str, str], filename: str) -> str: 

475 """ Perform a substition of template """ 

476 with open(filename) as templatefile: 

477 template = templatefile.read() 

478 for k, v in subst_map.items(): 

479 template = template.replace(k, str(v)) 

480 return template 

481 

482################################################################################ 

483 

484 

485def fubar(msg: str, exit_code: int = 1) -> NoReturn: 

486 """print error message and exit program""" 

487 print("E:", msg, file=sys.stderr) 

488 sys.exit(exit_code) 

489 

490 

491def warn(msg: str) -> None: 

492 """print warning message""" 

493 print("W:", msg, file=sys.stderr) 

494 

495################################################################################ 

496 

497 

498def whoami() -> str: 

499 """get user name 

500 

501 Returns the user name with a laughable attempt at rfc822 conformancy 

502 (read: removing stray periods). 

503 """ 

504 return pwd.getpwuid(os.getuid())[4].split(',')[0].replace('.', '') 

505 

506 

507def getusername() -> str: 

508 """get login name""" 

509 return pwd.getpwuid(os.getuid())[0] 

510 

511################################################################################ 

512 

513 

514def size_type(c: Union[int, float]) -> str: 

515 t = " B" 

516 if c > 10240: 

517 c = c / 1024 

518 t = " KB" 

519 if c > 10240: 519 ↛ 520line 519 didn't jump to line 520, because the condition on line 519 was never true

520 c = c / 1024 

521 t = " MB" 

522 return ("%d%s" % (c, t)) 

523 

524################################################################################ 

525 

526 

527def find_next_free(dest: str, too_many: int = 100) -> str: 

528 extra = 0 

529 orig_dest = dest 

530 while os.path.lexists(dest) and extra < too_many: 

531 dest = orig_dest + '.' + repr(extra) 

532 extra += 1 

533 if extra >= too_many: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true

534 raise NoFreeFilenameError 

535 return dest 

536 

537################################################################################ 

538 

539 

540def result_join(original: Iterable[Optional[str]], sep: str = '\t') -> str: 

541 return sep.join( 

542 x if x is not None else "" 

543 for x in original 

544 ) 

545 

546 

547################################################################################ 

548 

549 

550def prefix_multi_line_string(lines: str, prefix: str, include_blank_lines: bool = False) -> str: 

551 """prepend `prefix` to each line in `lines`""" 

552 return "\n".join( 

553 prefix + cleaned_line 

554 for line in lines.split("\n") 

555 if (cleaned_line := line.strip()) or include_blank_lines 

556 ) 

557 

558################################################################################ 

559 

560 

561def join_with_commas_and(list: Sequence[str]) -> str: 

562 if len(list) == 0: 562 ↛ 563line 562 didn't jump to line 563, because the condition on line 562 was never true

563 return "nothing" 

564 if len(list) == 1: 564 ↛ 566line 564 didn't jump to line 566, because the condition on line 564 was never false

565 return list[0] 

566 return ", ".join(list[:-1]) + " and " + list[-1] 

567 

568################################################################################ 

569 

570 

571def pp_deps(deps: Iterable[tuple[str, str, str]]) -> str: 

572 pp_deps = ( 

573 f"{pkg} ({constraint} {version})" if constraint else pkg 

574 for pkg, constraint, version in deps 

575 ) 

576 return " |".join(pp_deps) 

577 

578################################################################################ 

579 

580 

581def get_conf(): 

582 return Cnf 

583 

584################################################################################ 

585 

586 

587def parse_args(Options) -> tuple[str, str, str, bool]: 

588 """ Handle -a, -c and -s arguments; returns them as SQL constraints """ 

589 # XXX: This should go away and everything which calls it be converted 

590 # to use SQLA properly. For now, we'll just fix it not to use 

591 # the old Pg interface though 

592 session = DBConn().session() 

593 # Process suite 

594 if Options["Suite"]: 594 ↛ 607line 594 didn't jump to line 607, because the condition on line 594 was never false

595 suite_ids_list = [] 

596 for suitename in split_args(Options["Suite"]): 

597 suite = get_suite(suitename, session=session) 

598 if not suite or suite.suite_id is None: 598 ↛ 599line 598 didn't jump to line 599, because the condition on line 598 was never true

599 warn("suite '%s' not recognised." % (suite and suite.suite_name or suitename)) 

600 else: 

601 suite_ids_list.append(suite.suite_id) 

602 if suite_ids_list: 602 ↛ 605line 602 didn't jump to line 605, because the condition on line 602 was never false

603 con_suites = "AND su.id IN (%s)" % ", ".join([str(i) for i in suite_ids_list]) 

604 else: 

605 fubar("No valid suite given.") 

606 else: 

607 con_suites = "" 

608 

609 # Process component 

610 if Options["Component"]: 610 ↛ 611line 610 didn't jump to line 611, because the condition on line 610 was never true

611 component_ids_list = [] 

612 for componentname in split_args(Options["Component"]): 

613 component = get_component(componentname, session=session) 

614 if component is None: 

615 warn("component '%s' not recognised." % (componentname)) 

616 else: 

617 component_ids_list.append(component.component_id) 

618 if component_ids_list: 

619 con_components = "AND c.id IN (%s)" % ", ".join([str(i) for i in component_ids_list]) 

620 else: 

621 fubar("No valid component given.") 

622 else: 

623 con_components = "" 

624 

625 # Process architecture 

626 con_architectures = "" 

627 check_source = False 

628 if Options["Architecture"]: 628 ↛ 629line 628 didn't jump to line 629, because the condition on line 628 was never true

629 arch_ids_list = [] 

630 for archname in split_args(Options["Architecture"]): 

631 if archname == "source": 

632 check_source = True 

633 else: 

634 arch = get_architecture(archname, session=session) 

635 if arch is None: 

636 warn("architecture '%s' not recognised." % (archname)) 

637 else: 

638 arch_ids_list.append(arch.arch_id) 

639 if arch_ids_list: 

640 con_architectures = "AND a.id IN (%s)" % ", ".join([str(i) for i in arch_ids_list]) 

641 else: 

642 if not check_source: 

643 fubar("No valid architecture given.") 

644 else: 

645 check_source = True 

646 

647 return (con_suites, con_architectures, con_components, check_source) 

648 

649################################################################################ 

650 

651 

652@functools.total_ordering 

653class ArchKey: 

654 """ 

655 Key object for use in sorting lists of architectures. 

656 

657 Sorts normally except that 'source' dominates all others. 

658 """ 

659 

660 __slots__ = ['arch', 'issource'] 

661 

662 def __init__(self, arch, *args): 

663 self.arch = arch 

664 self.issource = arch == 'source' 

665 

666 def __lt__(self, other: 'ArchKey') -> bool: 

667 if self.issource: 

668 return not other.issource 

669 if other.issource: 

670 return False 

671 return self.arch < other.arch 

672 

673 def __eq__(self, other: object) -> bool: 

674 if not isinstance(other, ArchKey): 674 ↛ 675line 674 didn't jump to line 675, because the condition on line 674 was never true

675 return NotImplemented 

676 return self.arch == other.arch 

677 

678 

679################################################################################ 

680 

681 

682def split_args(s: str, dwim: bool = True) -> list[str]: 

683 """ 

684 Split command line arguments which can be separated by either commas 

685 or whitespace. If dwim is set, it will complain about string ending 

686 in comma since this usually means someone did 'dak ls -a i386, m68k 

687 foo' or something and the inevitable confusion resulting from 'm68k' 

688 being treated as an argument is undesirable. 

689 """ 

690 

691 if s.find(",") == -1: 691 ↛ 694line 691 didn't jump to line 694, because the condition on line 691 was never false

692 return s.split() 

693 else: 

694 if s[-1:] == "," and dwim: 

695 fubar("split_args: found trailing comma, spurious space maybe?") 

696 return s.split(",") 

697 

698################################################################################ 

699 

700 

701def gpg_keyring_args(keyrings: Optional[Iterable[str]] = None) -> list[str]: 

702 if keyrings is None: 702 ↛ 705line 702 didn't jump to line 705, because the condition on line 702 was never false

703 keyrings = get_active_keyring_paths() 

704 

705 return ["--keyring={}".format(path) for path in keyrings] 

706 

707################################################################################ 

708 

709 

710def _gpg_get_addresses_from_listing(output: bytes) -> list[str]: 

711 addresses: list[str] = [] 

712 

713 for line in output.split(b'\n'): 

714 parts = line.split(b':') 

715 if parts[0] not in (b"uid", b"pub"): 

716 continue 

717 if parts[1] in (b"i", b"d", b"r"): 717 ↛ 719line 717 didn't jump to line 719, because the condition on line 717 was never true

718 # Skip uid that is invalid, disabled or revoked 

719 continue 

720 try: 

721 uid_bytes = parts[9] 

722 except IndexError: 

723 continue 

724 try: 

725 uid = uid_bytes.decode(encoding='utf-8') 

726 except UnicodeDecodeError: 

727 # If the uid is not valid UTF-8, we assume it is an old uid 

728 # still encoding in Latin-1. 

729 uid = uid_bytes.decode(encoding='latin1') 

730 m = re_parse_maintainer.match(uid) 

731 if not m: 

732 continue 

733 address = m.group(2) 

734 if address.endswith('@debian.org'): 734 ↛ 737line 734 didn't jump to line 737, because the condition on line 734 was never true

735 # prefer @debian.org addresses 

736 # TODO: maybe not hardcode the domain 

737 addresses.insert(0, address) 

738 else: 

739 addresses.append(address) 

740 

741 return addresses 

742 

743 

744def gpg_get_key_addresses(fingerprint: str) -> list[str]: 

745 """retreive email addresses from gpg key uids for a given fingerprint""" 

746 addresses = key_uid_email_cache.get(fingerprint) 

747 if addresses is not None: 

748 return addresses 

749 

750 try: 

751 cmd = ["gpg", "--no-default-keyring"] 

752 cmd.extend(gpg_keyring_args()) 

753 cmd.extend(["--with-colons", "--list-keys", "--", fingerprint]) 

754 output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) 

755 except subprocess.CalledProcessError: 

756 addresses = [] 

757 else: 

758 addresses = _gpg_get_addresses_from_listing(output) 

759 

760 key_uid_email_cache[fingerprint] = addresses 

761 return addresses 

762 

763################################################################################ 

764 

765 

766def open_ldap_connection(): 

767 """open connection to the configured LDAP server""" 

768 import ldap # type: ignore 

769 

770 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

771 LDAPServer = Cnf["Import-LDAP-Fingerprints::LDAPServer"] 

772 ca_cert_file = Cnf.get('Import-LDAP-Fingerprints::CACertFile') 

773 

774 l = ldap.initialize(LDAPServer) 

775 

776 if ca_cert_file: 

777 l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD) 

778 l.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file) 

779 l.set_option(ldap.OPT_X_TLS_NEWCTX, True) 

780 l.start_tls_s() 

781 

782 l.simple_bind_s("", "") 

783 

784 return l 

785 

786################################################################################ 

787 

788 

789def get_logins_from_ldap(fingerprint: str = '*') -> dict[str, str]: 

790 """retrieve login from LDAP linked to a given fingerprint""" 

791 import ldap 

792 l = open_ldap_connection() 

793 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

794 Attrs = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, 

795 '(keyfingerprint=%s)' % fingerprint, 

796 ['uid', 'keyfingerprint']) 

797 login: dict[str, str] = {} 

798 for elem in Attrs: 

799 fpr = elem[1]['keyFingerPrint'][0].decode() 

800 uid = elem[1]['uid'][0].decode() 

801 login[fpr] = uid 

802 return login 

803 

804################################################################################ 

805 

806 

807def get_users_from_ldap() -> dict[str, str]: 

808 """retrieve login and user names from LDAP""" 

809 import ldap 

810 l = open_ldap_connection() 

811 LDAPDn = Cnf["Import-LDAP-Fingerprints::LDAPDn"] 

812 Attrs = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL, 

813 '(uid=*)', ['uid', 'cn', 'mn', 'sn']) 

814 users: dict[str, str] = {} 

815 for elem in Attrs: 

816 elem = elem[1] 

817 name = [] 

818 for k in ('cn', 'mn', 'sn'): 

819 try: 

820 value = elem[k][0].decode() 

821 if value and value[0] != '-': 

822 name.append(value) 

823 except KeyError: 

824 pass 

825 users[' '.join(name)] = elem['uid'][0] 

826 return users 

827 

828################################################################################ 

829 

830 

831def clean_symlink(src: str, dest: str, root: str) -> str: 

832 """ 

833 Relativize an absolute symlink from 'src' -> 'dest' relative to 'root'. 

834 Returns fixed 'src' 

835 """ 

836 src = src.replace(root, '', 1) 

837 dest = dest.replace(root, '', 1) 

838 dest = os.path.dirname(dest) 

839 new_src = '../' * len(dest.split('/')) 

840 return new_src + src 

841 

842################################################################################ 

843 

844 

845def temp_dirname(parent: Optional[str] = None, prefix: str = "dak", suffix: str = "", mode: Optional[int] = None, group: Optional[str] = None) -> str: 

846 """ 

847 Return a secure and unique directory by pre-creating it. 

848 

849 :param parent: If non-null it will be the directory the directory is pre-created in. 

850 :param prefix: The filename will be prefixed with this string 

851 :param suffix: The filename will end with this string 

852 :param mode: If set the file will get chmodded to those permissions 

853 :param group: If set the file will get chgrped to the specified group. 

854 :return: Returns a pair (fd, name) 

855 

856 """ 

857 

858 tfname = tempfile.mkdtemp(suffix, prefix, parent) 

859 if mode is not None: 859 ↛ 861line 859 didn't jump to line 861, because the condition on line 859 was never false

860 os.chmod(tfname, mode) 

861 if group is not None: 861 ↛ 862line 861 didn't jump to line 862, because the condition on line 861 was never true

862 gid = grp.getgrnam(group).gr_gid 

863 os.chown(tfname, -1, gid) 

864 return tfname 

865 

866 

867################################################################################ 

868 

869 

870def get_changes_files(from_dir: str) -> list[str]: 

871 """ 

872 Takes a directory and lists all .changes files in it (as well as chdir'ing 

873 to the directory; this is due to broken behaviour on the part of p-u/p-a 

874 when you're not in the right place) 

875 

876 Returns a list of filenames 

877 """ 

878 try: 

879 # Much of the rest of p-u/p-a depends on being in the right place 

880 os.chdir(from_dir) 

881 changes_files = [x for x in os.listdir(from_dir) if x.endswith('.changes')] 

882 except OSError as e: 

883 fubar("Failed to read list from directory %s (%s)" % (from_dir, e)) 

884 

885 return changes_files 

886 

887################################################################################ 

888 

889 

890Cnf = config.Config().Cnf 

891 

892################################################################################ 

893 

894 

895def parse_wnpp_bug_file(file: str = "/srv/ftp-master.debian.org/scripts/masterfiles/wnpp_rm") -> dict[str, list[str]]: 

896 """ 

897 Parses the wnpp bug list available at https://qa.debian.org/data/bts/wnpp_rm 

898 Well, actually it parsed a local copy, but let's document the source 

899 somewhere ;) 

900 

901 returns a dict associating source package name with a list of open wnpp 

902 bugs (Yes, there might be more than one) 

903 """ 

904 

905 try: 

906 with open(file) as f: 

907 lines = f.readlines() 

908 except OSError: 

909 print("Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any." % file) 

910 lines = [] 

911 wnpp = {} 

912 

913 for line in lines: 

914 splited_line = line.split(": ", 1) 

915 if len(splited_line) > 1: 

916 wnpp[splited_line[0]] = splited_line[1].split("|") 

917 

918 for source in wnpp: 

919 bugs = [] 

920 for wnpp_bug in wnpp[source]: 

921 bug_no = re.search(r"(\d)+", wnpp_bug).group() 

922 if bug_no: 

923 bugs.append(bug_no) 

924 wnpp[source] = bugs 

925 return wnpp 

926 

927################################################################################ 

928 

929 

930def deb_extract_control(path: str) -> bytes: 

931 """extract DEBIAN/control from a binary package""" 

932 return apt_inst.DebFile(path).control.extractdata("control") 

933 

934################################################################################ 

935 

936 

937def mail_addresses_for_upload(maintainer: str, changed_by: str, fingerprint: str) -> list[str]: 

938 """mail addresses to contact for an upload 

939 

940 :param maintainer: Maintainer field of the .changes file 

941 :param changed_by: Changed-By field of the .changes file 

942 :param fingerprint: fingerprint of the key used to sign the upload 

943 :return: list of RFC 2047-encoded mail addresses to contact regarding 

944 this upload 

945 """ 

946 recipients = Cnf.value_list('Dinstall::UploadMailRecipients') 

947 if not recipients: 947 ↛ 955line 947 didn't jump to line 955, because the condition on line 947 was never false

948 recipients = [ 

949 'maintainer', 

950 'changed_by', 

951 'signer', 

952 ] 

953 

954 # Ensure signer is last if present 

955 try: 

956 recipients.remove('signer') 

957 recipients.append('signer') 

958 except ValueError: 

959 pass 

960 

961 # Compute the set of addresses of the recipients 

962 addresses = set() # Name + email 

963 emails = set() # Email only, used to avoid duplicates 

964 for recipient in recipients: 

965 if recipient.startswith('mail:'): # Email hardcoded in config 965 ↛ 966line 965 didn't jump to line 966, because the condition on line 965 was never true

966 address = recipient[5:] 

967 elif recipient == 'maintainer': 

968 address = maintainer 

969 elif recipient == 'changed_by': 

970 address = changed_by 

971 elif recipient == 'signer': 971 ↛ 978line 971 didn't jump to line 978, because the condition on line 971 was never false

972 fpr_addresses = gpg_get_key_addresses(fingerprint) 

973 address = fpr_addresses[0] if fpr_addresses else None 

974 if any(x in emails for x in fpr_addresses): 

975 # The signer already gets a copy via another email 

976 address = None 

977 else: 

978 raise Exception('Unsupported entry in {0}: {1}'.format( 

979 'Dinstall::UploadMailRecipients', recipient)) 

980 

981 if address is not None: 

982 mail = fix_maintainer(address)[3] 

983 if mail not in emails: 

984 addresses.add(address) 

985 emails.add(mail) 

986 

987 encoded_addresses = [fix_maintainer(e)[1] for e in addresses] 

988 return encoded_addresses 

989 

990################################################################################ 

991 

992 

993def call_editor_for_file(path: str) -> None: 

994 editor = os.environ.get('VISUAL', os.environ.get('EDITOR', 'sensible-editor')) 

995 subprocess.check_call([editor, path]) 

996 

997################################################################################ 

998 

999 

1000def call_editor(text: str = "", suffix: str = ".txt") -> str: 

1001 """run editor and return the result as a string 

1002 

1003 :param text: initial text 

1004 :param suffix: extension for temporary file 

1005 :return: string with the edited text 

1006 """ 

1007 with tempfile.NamedTemporaryFile(mode='w+t', suffix=suffix) as fh: 

1008 print(text, end='', file=fh) 

1009 fh.flush() 

1010 call_editor_for_file(fh.name) 

1011 fh.seek(0) 

1012 return fh.read() 

1013 

1014################################################################################ 

1015 

1016 

1017def check_reverse_depends(removals: Iterable[str], suite: str, arches: Optional[Iterable[Architecture]] = None, session=None, cruft: bool = False, quiet: bool = False, include_arch_all: bool = True) -> bool: 

1018 dbsuite = get_suite(suite, session) 

1019 overridesuite = dbsuite 

1020 if dbsuite.overridesuite is not None: 1020 ↛ 1021line 1020 didn't jump to line 1021, because the condition on line 1020 was never true

1021 overridesuite = get_suite(dbsuite.overridesuite, session) 

1022 dep_problem = False 

1023 p2c = {} 

1024 all_broken = defaultdict(lambda: defaultdict(set)) 1024 ↛ exitline 1024 didn't run the lambda on line 1024

1025 if arches: 1025 ↛ 1026line 1025 didn't jump to line 1026, because the condition on line 1025 was never true

1026 all_arches = set(arches) 

1027 else: 

1028 all_arches = set(x.arch_string for x in get_suite_architectures(suite)) 

1029 all_arches -= set(["source", "all"]) 

1030 removal_set = set(removals) 

1031 metakey_d = get_or_set_metadatakey("Depends", session) 

1032 metakey_p = get_or_set_metadatakey("Provides", session) 

1033 params = { 

1034 'suite_id': dbsuite.suite_id, 

1035 'metakey_d_id': metakey_d.key_id, 

1036 'metakey_p_id': metakey_p.key_id, 

1037 } 

1038 if include_arch_all: 1038 ↛ 1041line 1038 didn't jump to line 1041, because the condition on line 1038 was never false

1039 rdep_architectures = all_arches | set(['all']) 

1040 else: 

1041 rdep_architectures = all_arches 

1042 for architecture in rdep_architectures: 

1043 deps = {} 

1044 sources = {} 

1045 virtual_packages = {} 

1046 try: 

1047 params['arch_id'] = get_architecture(architecture, session).arch_id 

1048 except AttributeError: 

1049 continue 

1050 

1051 statement = sql.text(''' 

1052 SELECT b.package, s.source, c.name as component, 

1053 (SELECT bmd.value FROM binaries_metadata bmd WHERE bmd.bin_id = b.id AND bmd.key_id = :metakey_d_id) AS depends, 

1054 (SELECT bmp.value FROM binaries_metadata bmp WHERE bmp.bin_id = b.id AND bmp.key_id = :metakey_p_id) AS provides 

1055 FROM binaries b 

1056 JOIN bin_associations ba ON b.id = ba.bin AND ba.suite = :suite_id 

1057 JOIN source s ON b.source = s.id 

1058 JOIN files_archive_map af ON b.file = af.file_id 

1059 JOIN component c ON af.component_id = c.id 

1060 WHERE b.architecture = :arch_id''') 

1061 query = session.query(sql.column('package'), sql.column('source'), 

1062 sql.column('component'), sql.column('depends'), 

1063 sql.column('provides')). \ 

1064 from_statement(statement).params(params) 

1065 for package, source, component, depends, provides in query: 

1066 sources[package] = source 

1067 p2c[package] = component 

1068 if depends is not None: 1068 ↛ 1069line 1068 didn't jump to line 1069, because the condition on line 1068 was never true

1069 deps[package] = depends 

1070 # Maintain a counter for each virtual package. If a 

1071 # Provides: exists, set the counter to 0 and count all 

1072 # provides by a package not in the list for removal. 

1073 # If the counter stays 0 at the end, we know that only 

1074 # the to-be-removed packages provided this virtual 

1075 # package. 

1076 if provides is not None: 1076 ↛ 1077line 1076 didn't jump to line 1077, because the condition on line 1076 was never true

1077 for virtual_pkg in provides.split(","): 

1078 virtual_pkg = virtual_pkg.strip() 

1079 if virtual_pkg == package: 

1080 continue 

1081 if virtual_pkg not in virtual_packages: 

1082 virtual_packages[virtual_pkg] = 0 

1083 if package not in removals: 

1084 virtual_packages[virtual_pkg] += 1 

1085 

1086 # If a virtual package is only provided by the to-be-removed 

1087 # packages, treat the virtual package as to-be-removed too. 

1088 removal_set.update(virtual_pkg for virtual_pkg in virtual_packages if not virtual_packages[virtual_pkg]) 

1089 

1090 # Check binary dependencies (Depends) 

1091 for package in deps: 1091 ↛ 1092line 1091 didn't jump to line 1092, because the loop on line 1091 never started

1092 if package in removals: 

1093 continue 

1094 try: 

1095 parsed_dep = apt_pkg.parse_depends(deps[package]) 

1096 except ValueError as e: 

1097 print("Error for package %s: %s" % (package, e)) 

1098 parsed_dep = [] 

1099 for dep in parsed_dep: 

1100 # Check for partial breakage. If a package has a ORed 

1101 # dependency, there is only a dependency problem if all 

1102 # packages in the ORed depends will be removed. 

1103 unsat = 0 

1104 for dep_package, _, _ in dep: 

1105 if dep_package in removals: 

1106 unsat += 1 

1107 if unsat == len(dep): 

1108 component = p2c[package] 

1109 source = sources[package] 

1110 if component != "main": 

1111 source = "%s/%s" % (source, component) 

1112 all_broken[source][package].add(architecture) 

1113 dep_problem = True 

1114 

1115 if all_broken and not quiet: 1115 ↛ 1116line 1115 didn't jump to line 1116, because the condition on line 1115 was never true

1116 if cruft: 

1117 print(" - broken Depends:") 

1118 else: 

1119 print("# Broken Depends:") 

1120 for source, bindict in sorted(all_broken.items()): 

1121 lines = [] 

1122 for binary, arches in sorted(bindict.items()): 

1123 if arches == all_arches or 'all' in arches: 

1124 lines.append(binary) 

1125 else: 

1126 lines.append('%s [%s]' % (binary, ' '.join(sorted(arches)))) 

1127 if cruft: 

1128 print(' %s: %s' % (source, lines[0])) 

1129 else: 

1130 print('%s: %s' % (source, lines[0])) 

1131 for line in lines[1:]: 

1132 if cruft: 

1133 print(' ' + ' ' * (len(source) + 2) + line) 

1134 else: 

1135 print(' ' * (len(source) + 2) + line) 

1136 if not cruft: 

1137 print() 

1138 

1139 # Check source dependencies (Build-Depends and Build-Depends-Indep) 

1140 all_broken = defaultdict(set) 

1141 metakey_bd = get_or_set_metadatakey("Build-Depends", session) 

1142 metakey_bdi = get_or_set_metadatakey("Build-Depends-Indep", session) 

1143 if include_arch_all: 1143 ↛ 1146line 1143 didn't jump to line 1146, because the condition on line 1143 was never false

1144 metakey_ids = (metakey_bd.key_id, metakey_bdi.key_id) 

1145 else: 

1146 metakey_ids = (metakey_bd.key_id,) 

1147 

1148 params = { 

1149 'suite_id': dbsuite.suite_id, 

1150 'metakey_ids': metakey_ids, 

1151 } 

1152 statement = sql.text(''' 

1153 SELECT s.source, string_agg(sm.value, ', ') as build_dep 

1154 FROM source s 

1155 JOIN source_metadata sm ON s.id = sm.src_id 

1156 WHERE s.id in 

1157 (SELECT src FROM newest_src_association 

1158 WHERE suite = :suite_id) 

1159 AND sm.key_id in :metakey_ids 

1160 GROUP BY s.id, s.source''') 

1161 query = session.query(sql.column('source'), sql.column('build_dep')) \ 

1162 .from_statement(statement).params(params) 

1163 for source, build_dep in query: 

1164 if source in removals: 

1165 continue 

1166 parsed_dep = [] 

1167 if build_dep is not None: 1167 ↛ 1174line 1167 didn't jump to line 1174, because the condition on line 1167 was never false

1168 # Remove [arch] information since we want to see breakage on all arches 

1169 build_dep = re_build_dep_arch.sub("", build_dep) 

1170 try: 

1171 parsed_dep = apt_pkg.parse_src_depends(build_dep) 

1172 except ValueError as e: 

1173 print("Error for source %s: %s" % (source, e)) 

1174 for dep in parsed_dep: 

1175 unsat = 0 

1176 for dep_package, _, _ in dep: 

1177 if dep_package in removals: 1177 ↛ 1178line 1177 didn't jump to line 1178, because the condition on line 1177 was never true

1178 unsat += 1 

1179 if unsat == len(dep): 1179 ↛ 1180line 1179 didn't jump to line 1180, because the condition on line 1179 was never true

1180 component, = session.query(Component.component_name) \ 

1181 .join(Component.overrides) \ 

1182 .filter(Override.suite == overridesuite) \ 

1183 .filter(Override.package == re.sub('/(contrib|non-free-firmware|non-free)$', '', source)) \ 

1184 .join(Override.overridetype).filter(OverrideType.overridetype == 'dsc') \ 

1185 .first() 

1186 key = source 

1187 if component != "main": 

1188 key = "%s/%s" % (source, component) 

1189 all_broken[key].add(pp_deps(dep)) 

1190 dep_problem = True 

1191 

1192 if all_broken and not quiet: 1192 ↛ 1193line 1192 didn't jump to line 1193, because the condition on line 1192 was never true

1193 if cruft: 

1194 print(" - broken Build-Depends:") 

1195 else: 

1196 print("# Broken Build-Depends:") 

1197 for source, bdeps in sorted(all_broken.items()): 

1198 bdeps = sorted(bdeps) 

1199 if cruft: 

1200 print(' %s: %s' % (source, bdeps[0])) 

1201 else: 

1202 print('%s: %s' % (source, bdeps[0])) 

1203 for bdep in bdeps[1:]: 

1204 if cruft: 

1205 print(' ' + ' ' * (len(source) + 2) + bdep) 

1206 else: 

1207 print(' ' * (len(source) + 2) + bdep) 

1208 if not cruft: 

1209 print() 

1210 

1211 return dep_problem 

1212 

1213################################################################################ 

1214 

1215 

1216def parse_built_using(control: Mapping[str, str]) -> list[tuple[str, str]]: 

1217 """source packages referenced via Built-Using 

1218 

1219 :param control: control file to take Built-Using field from 

1220 :return: list of (source_name, source_version) pairs 

1221 """ 

1222 built_using = control.get('Built-Using', None) 

1223 if built_using is None: 

1224 return [] 

1225 

1226 bu = [] 

1227 for dep in apt_pkg.parse_depends(built_using): 

1228 assert len(dep) == 1, 'Alternatives are not allowed in Built-Using field' 

1229 source_name, source_version, comp = dep[0] 

1230 assert comp == '=', 'Built-Using must contain strict dependencies' 

1231 bu.append((source_name, source_version)) 

1232 

1233 return bu 

1234 

1235################################################################################ 

1236 

1237 

1238def is_in_debug_section(control: Mapping[str, str]) -> bool: 

1239 """binary package is a debug package 

1240 

1241 :param control: control file of binary package 

1242 :return: True if the binary package is a debug package 

1243 """ 

1244 section = control['Section'].split('/', 1)[-1] 

1245 auto_built_package = control.get("Auto-Built-Package") 

1246 return section == "debug" and auto_built_package == "debug-symbols" 

1247 

1248################################################################################ 

1249 

1250 

1251def find_possibly_compressed_file(filename: str) -> str: 

1252 """ 

1253 

1254 :param filename: path to a control file (Sources, Packages, etc) to 

1255 look for 

1256 :return: path to the (possibly compressed) control file, or null if the 

1257 file doesn't exist 

1258 """ 

1259 _compressions = ('', '.xz', '.gz', '.bz2') 

1260 

1261 for ext in _compressions: 1261 ↛ 1266line 1261 didn't jump to line 1266, because the loop on line 1261 didn't complete

1262 _file = filename + ext 

1263 if os.path.exists(_file): 

1264 return _file 

1265 

1266 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), filename) 

1267 

1268################################################################################ 

1269 

1270 

1271def parse_boolean_from_user(value: str) -> bool: 

1272 value = value.lower() 

1273 if value in {'yes', 'true', 'enable', 'enabled'}: 

1274 return True 

1275 if value in {'no', 'false', 'disable', 'disabled'}: 1275 ↛ 1277line 1275 didn't jump to line 1277, because the condition on line 1275 was never false

1276 return False 

1277 raise ValueError("Not sure whether %s should be a True or a False" % value) 

1278 

1279 

1280def suite_suffix(suite_name: str) -> str: 

1281 """Return suite_suffix for the given suite""" 

1282 suffix = Cnf.find('Dinstall::SuiteSuffix', '') 

1283 if suffix == '': 1283 ↛ 1285line 1283 didn't jump to line 1285, because the condition on line 1283 was never false

1284 return '' 

1285 elif 'Dinstall::SuiteSuffixSuites' not in Cnf: 

1286 # TODO: warn (once per run) that SuiteSuffix will be deprecated in the future 

1287 return suffix 

1288 elif suite_name in Cnf.value_list('Dinstall::SuiteSuffixSuites'): 

1289 return suffix 

1290 return '' 

1291 

1292################################################################################ 

1293 

1294 

1295def process_buildinfos(directory: str, buildinfo_files: 'Iterable[daklib.upload.HashedFile]', fs_transaction: 'daklib.fstransactions.FilesystemTransaction', logger: 'daklib.daklog.Logger') -> None: 

1296 """Copy buildinfo files into Dir::BuildinfoArchive 

1297 

1298 :param directory: directory where .changes is stored 

1299 :param buildinfo_files: names of buildinfo files 

1300 :param fs_transaction: FilesystemTransaction instance 

1301 :param logger: logger instance 

1302 """ 

1303 

1304 if 'Dir::BuildinfoArchive' not in Cnf: 

1305 return 

1306 

1307 target_dir = os.path.join( 

1308 Cnf['Dir::BuildinfoArchive'], 

1309 datetime.datetime.now().strftime('%Y/%m/%d'), 

1310 ) 

1311 

1312 for f in buildinfo_files: 

1313 src = os.path.join(directory, f.filename) 

1314 dst = find_next_free(os.path.join(target_dir, f.filename)) 

1315 

1316 logger.log(["Archiving", f.filename]) 

1317 fs_transaction.copy(src, dst, mode=0o644) 

1318 

1319################################################################################ 

1320 

1321 

1322def move_to_morgue(morguesubdir: str, filenames: Iterable[str], fs_transaction: 'daklib.fstransactions.FilesystemTransaction', logger: 'daklib.daklog.Logger'): 

1323 """Move a file to the correct dir in morgue 

1324 

1325 :param morguesubdir: subdirectory of morgue where this file needs to go 

1326 :param filenames: names of files 

1327 :param fs_transaction: FilesystemTransaction instance 

1328 :param logger: logger instance 

1329 """ 

1330 

1331 morguedir = Cnf.get("Dir::Morgue", os.path.join( 

1332 Cnf.get("Dir::Base"), 'morgue')) 

1333 

1334 # Build directory as morguedir/morguesubdir/year/month/day 

1335 now = datetime.datetime.now() 

1336 dest = os.path.join(morguedir, 

1337 morguesubdir, 

1338 str(now.year), 

1339 '%.2d' % now.month, 

1340 '%.2d' % now.day) 

1341 

1342 for filename in filenames: 

1343 dest_filename = dest + '/' + os.path.basename(filename) 

1344 # If the destination file exists; try to find another filename to use 

1345 if os.path.lexists(dest_filename): 1345 ↛ 1346line 1345 didn't jump to line 1346, because the condition on line 1345 was never true

1346 dest_filename = find_next_free(dest_filename) 

1347 logger.log(["move to morgue", filename, dest_filename]) 

1348 fs_transaction.move(filename, dest_filename)