Coverage for daklib/checks.py: 70%

673 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# Parts based on code that is 

4# Copyright (C) 2001-2006, James Troup <james@nocrew.org> 

5# Copyright (C) 2009-2010, Joerg Jaspert <joerg@debian.org> 

6# 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11# 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License along 

18# with this program; if not, write to the Free Software Foundation, Inc., 

19# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

20 

21"""module provided pre-acceptance tests 

22 

23Please read the documentation for the :class:`Check` class for the interface. 

24""" 

25 

26import datetime 

27import os 

28import re 

29import subprocess 

30import tempfile 

31import textwrap 

32import time 

33from collections.abc import Callable, Iterable 

34from typing import TYPE_CHECKING, Literal, cast, override 

35 

36import apt_inst 

37import apt_pkg 

38import yaml 

39from apt_pkg import version_compare 

40from sqlalchemy import sql 

41 

42import daklib.dbconn as dbconn 

43import daklib.gpg 

44import daklib.lintian as lintian 

45import daklib.upload 

46import daklib.utils as utils 

47from daklib.config import Config 

48from daklib.dbconn import ( 

49 ACL, 

50 ACLPerSource, 

51 Architecture, 

52 DBBinary, 

53 DBSource, 

54 SignatureHistory, 

55 SrcFormat, 

56 Suite, 

57 get_source_in_suite, 

58) 

59from daklib.regexes import ( 

60 re_field_package, 

61 re_field_source, 

62 re_field_version, 

63 re_field_version_upstream, 

64 re_file_binary, 

65 re_file_changes, 

66 re_file_dsc, 

67 re_file_orig, 

68 re_file_source, 

69 re_isanum, 

70) 

71from daklib.textutils import ParseMaintError, fix_maintainer 

72 

73if TYPE_CHECKING: 

74 from sqlalchemy.orm import Session 

75 

76 import daklib.archive 

77 

78 

79def check_fields_for_valid_utf8(filename: str, control: apt_pkg.TagSection) -> None: 

80 """Check all fields of a control file for valid UTF-8""" 

81 for field in control.keys(): 

82 try: 

83 # Access the field value to make `TagSection` try to decode it. 

84 # We should also do the same for the field name, but this requires 

85 # https://bugs.debian.org/995118 to be fixed. 

86 # TODO: make sure the field name `field` is valid UTF-8 too 

87 control[field] 

88 except UnicodeDecodeError: 

89 raise Reject( 

90 "{0}: The {1} field is not valid UTF-8".format(filename, field) 

91 ) 

92 

93 

94class Reject(Exception): 

95 """exception raised by failing checks""" 

96 

97 pass 

98 

99 

100class RejectExternalFilesMismatch(Reject): 

101 """exception raised by failing the external hashes check""" 

102 

103 @override 

104 def __str__(self): 

105 return ( 

106 "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" 

107 % self.args[:4] 

108 ) 

109 

110 

111class RejectACL(Reject): 

112 """exception raise by failing ACL checks""" 

113 

114 def __init__(self, acl: ACL, reason: str): 

115 self.acl = acl 

116 self.reason = reason 

117 

118 @override 

119 def __str__(self): 

120 return "ACL {0}: {1}".format(self.acl.name, self.reason) 

121 

122 

123class Check: 

124 """base class for checks 

125 

126 checks are called by :class:`daklib.archive.ArchiveUpload`. Failing tests should 

127 raise a :exc:`daklib.checks.Reject` exception including a human-readable 

128 description why the upload should be rejected. 

129 """ 

130 

131 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

132 """do checks 

133 

134 :param upload: upload to check 

135 

136 :raises Reject: upload should be rejected 

137 """ 

138 raise NotImplementedError 

139 

140 def per_suite_check( 

141 self, upload: "daklib.archive.ArchiveUpload", suite: Suite 

142 ) -> bool: 

143 """do per-suite checks 

144 

145 :param upload: upload to check 

146 :param suite: suite to check 

147 

148 :raises Reject: upload should be rejected 

149 """ 

150 raise NotImplementedError 

151 

152 @property 

153 def forcable(self) -> bool: 

154 """allow to force ignore failing test 

155 

156 :const:`True` if it is acceptable to force ignoring a failing test, 

157 :const:`False` otherwise 

158 """ 

159 return False 

160 

161 

162class SignatureAndHashesCheck(Check): 

163 """Check signature of changes and dsc file (if included in upload) 

164 

165 Make sure the signature is valid and done by a known user. 

166 """ 

167 

168 def check_replay(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

169 # Use private session as we want to remember having seen the .changes 

170 # in all cases. 

171 session = upload.session 

172 history = SignatureHistory.from_signed_file(upload.changes.signature) 

173 r = history.query(session) 

174 if r is not None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 raise Reject( 

176 "Signature for changes file was already seen at {0}.\nPlease refresh the signature of the changes file if you want to upload it again.".format( 

177 r.seen 

178 ) 

179 ) 

180 return True 

181 

182 @override 

183 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

184 allow_source_untrusted_sig_keys = Config().value_list( 

185 "Dinstall::AllowSourceUntrustedSigKeys" 

186 ) 

187 

188 changes = upload.changes 

189 if not changes.valid_signature: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 raise Reject("Signature for .changes not valid.") 

191 self.check_replay(upload) 

192 self._check_hashes(upload, changes.filename, changes.files.values()) 

193 

194 source = None 

195 try: 

196 source = changes.source 

197 except Exception as e: 

198 raise Reject("Invalid dsc file: {0}".format(e)) 

199 if source is not None: 

200 if changes.primary_fingerprint not in allow_source_untrusted_sig_keys: 200 ↛ 205line 200 didn't jump to line 205 because the condition on line 200 was always true

201 if not source.valid_signature: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise Reject("Signature for .dsc not valid.") 

203 if source.primary_fingerprint != changes.primary_fingerprint: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 raise Reject(".changes and .dsc not signed by the same key.") 

205 self._check_hashes(upload, source.filename, source.files.values()) 

206 

207 if upload.fingerprint is None or upload.fingerprint.uid is None: 

208 raise Reject(".changes signed by unknown key.") 

209 

210 return True 

211 

212 def _check_hashes( 

213 self, 

214 upload: "daklib.archive.ArchiveUpload", 

215 filename: str, 

216 files: Iterable[daklib.upload.HashedFile], 

217 ) -> None: 

218 """Make sure hashes match existing files 

219 

220 :param upload: upload we are processing 

221 :param filename: name of the file the expected hash values are taken from 

222 :param files: files to check the hashes for 

223 """ 

224 try: 

225 for f in files: 

226 f.check(upload.directory) 

227 except daklib.upload.FileDoesNotExist as e: 227 ↛ 235line 227 didn't jump to line 235

228 raise Reject( 

229 "{0}: {1}\n" 

230 "Perhaps you need to include the file in your upload?\n\n" 

231 "If the orig tarball is missing, the -sa flag for dpkg-buildpackage will be your friend.".format( 

232 filename, str(e) 

233 ) 

234 ) 

235 except daklib.upload.UploadException as e: 

236 raise Reject("{0}: {1}".format(filename, str(e))) 

237 

238 

239class WeakSignatureCheck(Check): 

240 """Check that .changes and .dsc are not signed using a weak algorithm""" 

241 

242 @override 

243 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

244 changes = upload.changes 

245 if changes.weak_signature: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 raise Reject( 

247 "The .changes was signed using a weak algorithm (such as SHA-1)" 

248 ) 

249 

250 source = changes.source 

251 if source is not None: 

252 if source.weak_signature: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 raise Reject( 

254 "The source package was signed using a weak algorithm (such as SHA-1)" 

255 ) 

256 

257 return True 

258 

259 

260def check_signature_timestamp(prefix: str, signed_file: daklib.gpg.SignedFile) -> bool: 

261 now = datetime.datetime.now(datetime.UTC) 

262 timestamp = signed_file.signature_timestamp 

263 age = now - timestamp 

264 

265 age_max = datetime.timedelta(days=365) 

266 age_min = datetime.timedelta(days=-7) 

267 

268 if age > age_max: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 raise Reject( 

270 "{0}: Signature from {1} is too old (maximum age is {2} days)".format( 

271 prefix, timestamp, age_max.days 

272 ) 

273 ) 

274 if age < age_min: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 raise Reject( 

276 "{0}: Signature from {1} is too far in the future (tolerance is {2} days)".format( 

277 prefix, timestamp, abs(age_min.days) 

278 ) 

279 ) 

280 return True 

281 

282 

283class SignatureTimestampCheck(Check): 

284 """Check timestamp of .changes signature""" 

285 

286 @override 

287 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

288 return check_signature_timestamp( 

289 upload.changes.filename, upload.changes.signature 

290 ) 

291 

292 

293class ChangesCheck(Check): 

294 """Check changes file for syntax errors.""" 

295 

296 @override 

297 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

298 changes = upload.changes 

299 control = changes.changes 

300 fn = changes.filename 

301 

302 for field in ( 

303 "Distribution", 

304 "Source", 

305 "Architecture", 

306 "Version", 

307 "Maintainer", 

308 "Files", 

309 "Changes", 

310 ): 

311 if field not in control: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 raise Reject("{0}: misses mandatory field {1}".format(fn, field)) 

313 

314 if len(changes.binaries) > 0: 

315 for field in ("Binary", "Description"): 

316 if field not in control: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 raise Reject( 

318 "{0}: binary upload requires {1} field".format(fn, field) 

319 ) 

320 

321 check_fields_for_valid_utf8(fn, control) 

322 

323 source_match = re_field_source.match(control["Source"]) 

324 if not source_match: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true

325 raise Reject("{0}: Invalid Source field".format(fn)) 

326 version_match = re_field_version.match(control["Version"]) 

327 if not version_match: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 raise Reject("{0}: Invalid Version field".format(fn)) 

329 version_without_epoch = version_match.group("without_epoch") 

330 

331 match = re_file_changes.match(fn) 

332 if not match: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 raise Reject("{0}: Does not match re_file_changes".format(fn)) 

334 if match.group("package") != source_match.group("package"): 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 raise Reject("{0}: Filename does not match Source field".format(fn)) 

336 if match.group("version") != version_without_epoch: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 raise Reject("{0}: Filename does not match Version field".format(fn)) 

338 

339 for bn in changes.binary_names: 

340 if not re_field_package.match(bn): 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 raise Reject("{0}: Invalid binary package name {1}".format(fn, bn)) 

342 

343 if changes.sourceful and changes.source is None: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 raise Reject("Changes has architecture source, but no source found.") 

345 if changes.source is not None and not changes.sourceful: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 raise Reject("Upload includes source, but changes does not say so.") 

347 

348 try: 

349 fix_maintainer(changes.changes["Maintainer"]) 

350 except ParseMaintError as e: 

351 raise Reject( 

352 "{0}: Failed to parse Maintainer field: {1}".format(changes.filename, e) 

353 ) 

354 

355 try: 

356 changed_by = changes.changes.get("Changed-By") 

357 if changed_by is not None: 357 ↛ 364line 357 didn't jump to line 364 because the condition on line 357 was always true

358 fix_maintainer(changed_by) 

359 except ParseMaintError as e: 

360 raise Reject( 

361 "{0}: Failed to parse Changed-By field: {1}".format(changes.filename, e) 

362 ) 

363 

364 try: 

365 changes.byhand_files 

366 except daklib.upload.InvalidChangesException as e: 

367 raise Reject("{0}".format(e)) 

368 

369 if len(changes.files) == 0: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 raise Reject("Changes includes no files.") 

371 

372 for bugnum in changes.closed_bugs: 372 ↛ 373line 372 didn't jump to line 373 because the loop on line 372 never started

373 if not re_isanum.match(bugnum): 

374 raise Reject( 

375 '{0}: "{1}" in Closes field is not a number'.format( 

376 changes.filename, bugnum 

377 ) 

378 ) 

379 

380 return True 

381 

382 

383class ExternalHashesCheck(Check): 

384 """Checks hashes in .changes and .dsc against an external database.""" 

385 

386 def check_single(self, session: "Session", f): 

387 q = session.execute( 

388 sql.text( 

389 "SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE :pattern" 

390 ), 

391 {"pattern": "%/{}".format(f.filename)}, 

392 ) 

393 (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or ( 

394 None, 

395 None, 

396 None, 

397 None, 

398 ) 

399 

400 if not ext_size: 

401 return 

402 

403 if ext_size != f.size: 

404 raise RejectExternalFilesMismatch(f.filename, "size", f.size, ext_size) 

405 

406 if ext_md5sum != f.md5sum: 

407 raise RejectExternalFilesMismatch( 

408 f.filename, "md5sum", f.md5sum, ext_md5sum 

409 ) 

410 

411 if ext_sha1sum != f.sha1sum: 

412 raise RejectExternalFilesMismatch( 

413 f.filename, "sha1sum", f.sha1sum, ext_sha1sum 

414 ) 

415 

416 if ext_sha256sum != f.sha256sum: 

417 raise RejectExternalFilesMismatch( 

418 f.filename, "sha256sum", f.sha256sum, ext_sha256sum 

419 ) 

420 

421 @override 

422 def check(self, upload: "daklib.archive.ArchiveUpload"): 

423 cnf = Config() 

424 

425 if not cnf.use_extfiles: # type: ignore[attr-defined] 425 ↛ 428line 425 didn't jump to line 428 because the condition on line 425 was always true

426 return 

427 

428 session = upload.session 

429 changes = upload.changes 

430 

431 for f in changes.files.values(): 

432 self.check_single(session, f) 

433 source = changes.source 

434 if source is not None: 

435 for f in source.files.values(): 

436 self.check_single(session, f) 

437 

438 

439class BinaryCheck(Check): 

440 """Check binary packages for syntax errors.""" 

441 

442 @override 

443 def check(self, upload): 

444 debug_deb_name_postfix = "-dbgsym" 

445 # XXX: Handle dynamic debug section name here 

446 

447 self._architectures: set[str] = set() 

448 

449 for binary in upload.changes.binaries: 

450 self.check_binary(upload, binary) 

451 

452 for arch in upload.changes.architectures: 

453 if arch == "source": 

454 continue 

455 if arch not in self._architectures: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 raise Reject( 

457 "{}: Architecture field includes {}, but no binary packages for {} are included in the upload".format( 

458 upload.changes.filename, arch, arch 

459 ) 

460 ) 

461 

462 binaries = { 

463 binary.control["Package"]: binary for binary in upload.changes.binaries 

464 } 

465 

466 for name, binary in list(binaries.items()): 

467 if name in upload.changes.binary_names: 467 ↛ 470line 467 didn't jump to line 470 because the condition on line 467 was always true

468 # Package is listed in Binary field. Everything is good. 

469 pass 

470 elif daklib.utils.is_in_debug_section(binary.control): 

471 # If we have a binary package in the debug section, we 

472 # can allow it to not be present in the Binary field 

473 # in the .changes file, so long as its name (without 

474 # -dbgsym) is present in the Binary list. 

475 if not name.endswith(debug_deb_name_postfix): 

476 raise Reject( 

477 "Package {0} is in the debug section, but " 

478 "does not end in {1}.".format(name, debug_deb_name_postfix) 

479 ) 

480 

481 # Right, so, it's named properly, let's check that 

482 # the corresponding package is in the Binary list 

483 origin_package_name = name[: -len(debug_deb_name_postfix)] 

484 if origin_package_name not in upload.changes.binary_names: 

485 raise Reject( 

486 "Debug package {debug}'s corresponding binary package " 

487 "{origin} is not present in the Binary field.".format( 

488 debug=name, origin=origin_package_name 

489 ) 

490 ) 

491 else: 

492 # Someone was a nasty little hacker and put a package 

493 # into the .changes that isn't in debian/control. Bad, 

494 # bad person. 

495 raise Reject( 

496 "Package {0} is not mentioned in Binary field in changes".format( 

497 name 

498 ) 

499 ) 

500 

501 return True 

502 

503 def check_binary(self, upload: "daklib.archive.ArchiveUpload", binary): 

504 fn = binary.hashed_file.filename 

505 control = binary.control 

506 

507 for field in ("Package", "Architecture", "Version", "Description", "Section"): 

508 if field not in control: 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true

509 raise Reject("{0}: Missing mandatory field {1}.".format(fn, field)) 

510 

511 check_fields_for_valid_utf8(fn, control) 

512 

513 # check fields 

514 

515 package = control["Package"] 

516 if not re_field_package.match(package): 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 raise Reject("{0}: Invalid Package field".format(fn)) 

518 

519 version = control["Version"] 

520 version_match = re_field_version.match(version) 

521 if not version_match: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 raise Reject("{0}: Invalid Version field".format(fn)) 

523 version_without_epoch = version_match.group("without_epoch") 

524 

525 architecture = control["Architecture"] 

526 if architecture not in upload.changes.architectures: 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true

527 raise Reject( 

528 "{0}: Architecture not in Architecture field in changes file".format(fn) 

529 ) 

530 if architecture == "source": 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 raise Reject( 

532 '{0}: Architecture "source" invalid for binary packages'.format(fn) 

533 ) 

534 self._architectures.add(architecture) 

535 

536 source = control.get("Source") 

537 if source is not None and not re_field_source.match(source): 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true

538 raise Reject("{0}: Invalid Source field".format(fn)) 

539 

540 section = control.get("Section", "") 

541 if section == "" or section == "unknown" or section.endswith("/unknown"): 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 raise Reject( 

543 '{0}: The "Section" field must be present and use a real section name.'.format( 

544 fn 

545 ) 

546 ) 

547 

548 # check filename 

549 

550 match = re_file_binary.match(fn) 

551 if match is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 raise Reject(f"{fn}: does not match re_file_binary") 

553 if package != match.group("package"): 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true

554 raise Reject("{0}: filename does not match Package field".format(fn)) 

555 if version_without_epoch != match.group("version"): 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true

556 raise Reject("{0}: filename does not match Version field".format(fn)) 

557 if architecture != match.group("architecture"): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 raise Reject("{0}: filename does not match Architecture field".format(fn)) 

559 

560 # check dependency field syntax 

561 

562 def check_dependency_field( 

563 field, 

564 control, 

565 dependency_parser=apt_pkg.parse_depends, 

566 allow_alternatives=True, 

567 allow_relations=("", "<", "<=", "=", ">=", ">"), 

568 ): 

569 value = control.get(field) 

570 if value is not None: 

571 if value.strip() == "": 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true

572 raise Reject("{0}: empty {1} field".format(fn, field)) 

573 try: 

574 depends = dependency_parser(value) 

575 except: 

576 raise Reject("{0}: APT could not parse {1} field".format(fn, field)) 

577 for group in depends: 

578 if not allow_alternatives and len(group) != 1: 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true

579 raise Reject( 

580 "{0}: {1}: alternatives are not allowed".format(fn, field) 

581 ) 

582 for dep_pkg, dep_ver, dep_rel in group: 

583 if dep_rel not in allow_relations: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true

584 raise Reject( 

585 "{}: {}: depends on {}, but only relations {} are allowed for this field".format( 

586 fn, 

587 field, 

588 " ".join([dep_pkg, dep_rel, dep_ver]), 

589 allow_relations, 

590 ) 

591 ) 

592 

593 for field in ( 

594 "Breaks", 

595 "Conflicts", 

596 "Depends", 

597 "Enhances", 

598 "Pre-Depends", 

599 "Recommends", 

600 "Replaces", 

601 "Suggests", 

602 ): 

603 check_dependency_field(field, control) 

604 

605 check_dependency_field( 

606 "Provides", control, allow_alternatives=False, allow_relations=("", "=") 

607 ) 

608 check_dependency_field( 

609 "Built-Using", 

610 control, 

611 dependency_parser=apt_pkg.parse_src_depends, 

612 allow_alternatives=False, 

613 allow_relations=("=",), 

614 ) 

615 

616 

617_DEB_ALLOWED_MEMBERS = { 

618 "debian-binary", 

619 *(f"control.tar.{comp}" for comp in ("gz", "xz")), 

620 *(f"data.tar.{comp}" for comp in ("gz", "bz2", "xz")), 

621} 

622 

623 

624class BinaryMembersCheck(Check): 

625 """check members of .deb file""" 

626 

627 @override 

628 def check(self, upload: "daklib.archive.ArchiveUpload"): 

629 for binary in upload.changes.binaries: 

630 filename = binary.hashed_file.filename 

631 path = os.path.join(upload.directory, filename) 

632 self._check_binary(filename, path) 

633 return True 

634 

635 def _check_binary(self, filename: str, path: str) -> None: 

636 deb = apt_inst.DebFile(path) 

637 members = {member.name for member in deb.getmembers()} # type: ignore[attr-defined] 

638 if blocked_members := members - _DEB_ALLOWED_MEMBERS: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true

639 raise Reject( 

640 f"{filename}: Contains blocked members {', '.join(blocked_members)}" 

641 ) 

642 

643 

644class BinaryTimestampCheck(Check): 

645 """check timestamps of files in binary packages 

646 

647 Files in the near future cause ugly warnings and extreme time travel 

648 can cause errors on extraction. 

649 """ 

650 

651 @override 

652 def check(self, upload: "daklib.archive.ArchiveUpload"): 

653 cnf = Config() 

654 future_cutoff = time.time() + cnf.find_i( 

655 "Dinstall::FutureTimeTravelGrace", 24 * 3600 

656 ) 

657 past_cutoff = time.mktime( 

658 time.strptime(cnf.find("Dinstall::PastCutoffYear", "1975"), "%Y") 

659 ) 

660 

661 class TarTime: 

662 def __init__(self): 

663 self.future_files: dict[str, int] = {} 

664 self.past_files: dict[str, int] = {} 

665 

666 def callback(self, member, data) -> None: 

667 if member.mtime > future_cutoff: 667 ↛ 668line 667 didn't jump to line 668 because the condition on line 667 was never true

668 self.future_files[member.name] = member.mtime 

669 elif member.mtime < past_cutoff: 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true

670 self.past_files[member.name] = member.mtime 

671 

672 def format_reason(filename, direction, files) -> str: 

673 reason = ( 

674 "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format( 

675 filename, len(files), direction 

676 ) 

677 ) 

678 for fn, ts in files.items(): 

679 reason += " {0} ({1})".format(fn, time.ctime(ts)) 

680 return reason 

681 

682 for binary in upload.changes.binaries: 

683 filename = binary.hashed_file.filename 

684 path = os.path.join(upload.directory, filename) 

685 deb = apt_inst.DebFile(path) 

686 tar = TarTime() 

687 for archive in (deb.control, deb.data): 

688 archive.go(tar.callback) 

689 if tar.future_files: 689 ↛ 690line 689 didn't jump to line 690 because the condition on line 689 was never true

690 raise Reject(format_reason(filename, "future", tar.future_files)) 

691 if tar.past_files: 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true

692 raise Reject(format_reason(filename, "past", tar.past_files)) 

693 

694 

695class SourceCheck(Check): 

696 """Check source package for syntax errors.""" 

697 

698 def check_filename(self, control, filename, regex: re.Pattern) -> None: 

699 # In case we have an .orig.tar.*, we have to strip the Debian revison 

700 # from the version number. So handle this special case first. 

701 is_orig = True 

702 match = re_file_orig.match(filename) 

703 if not match: 

704 is_orig = False 

705 match = regex.match(filename) 

706 

707 if not match: 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true

708 raise Reject( 

709 "{0}: does not match regular expression for source filenames".format( 

710 filename 

711 ) 

712 ) 

713 if match.group("package") != control["Source"]: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 raise Reject("{0}: filename does not match Source field".format(filename)) 

715 

716 version = control["Version"] 

717 if is_orig: 

718 upstream_match = re_field_version_upstream.match(version) 

719 if not upstream_match: 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true

720 raise Reject( 

721 "{0}: Source package includes upstream tarball, but {1} has no Debian revision.".format( 

722 filename, version 

723 ) 

724 ) 

725 version = upstream_match.group("upstream") 

726 version_match = re_field_version.match(version) 

727 if version_match is None: 727 ↛ 728line 727 didn't jump to line 728 because the condition on line 727 was never true

728 raise Reject(f"{filename}: Version field does not match re_field_version") 

729 version_without_epoch = version_match.group("without_epoch") 

730 if match.group("version") != version_without_epoch: 730 ↛ 731line 730 didn't jump to line 731 because the condition on line 730 was never true

731 raise Reject("{0}: filename does not match Version field".format(filename)) 

732 

733 @override 

734 def check(self, upload: "daklib.archive.ArchiveUpload"): 

735 if upload.changes.source is None: 

736 if upload.changes.sourceful: 736 ↛ 737line 736 didn't jump to line 737 because the condition on line 736 was never true

737 raise Reject( 

738 "{}: Architecture field includes source, but no source package is included in the upload".format( 

739 upload.changes.filename 

740 ) 

741 ) 

742 return True 

743 

744 if not upload.changes.sourceful: 744 ↛ 745line 744 didn't jump to line 745 because the condition on line 744 was never true

745 raise Reject( 

746 "{}: Architecture field does not include source, but a source package is included in the upload".format( 

747 upload.changes.filename 

748 ) 

749 ) 

750 

751 changes = upload.changes.changes 

752 source = upload.changes.source 

753 control = cast(apt_pkg.TagSection, source.dsc) 

754 dsc_fn = source._dsc_file.filename 

755 

756 check_fields_for_valid_utf8(dsc_fn, control) 

757 

758 # check fields 

759 if not re_field_package.match(control["Source"]): 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true

760 raise Reject("{0}: Invalid Source field".format(dsc_fn)) 

761 if control["Source"] != changes["Source"]: 761 ↛ 762line 761 didn't jump to line 762 because the condition on line 761 was never true

762 raise Reject( 

763 "{0}: Source field does not match Source field in changes".format( 

764 dsc_fn 

765 ) 

766 ) 

767 if control["Version"] != changes["Version"]: 767 ↛ 768line 767 didn't jump to line 768 because the condition on line 767 was never true

768 raise Reject( 

769 "{0}: Version field does not match Version field in changes".format( 

770 dsc_fn 

771 ) 

772 ) 

773 

774 # check filenames 

775 self.check_filename(control, dsc_fn, re_file_dsc) 

776 for f in source.files.values(): 

777 self.check_filename(control, f.filename, re_file_source) 

778 

779 # check dependency field syntax 

780 for field in ( 

781 "Build-Conflicts", 

782 "Build-Conflicts-Indep", 

783 "Build-Depends", 

784 "Build-Depends-Arch", 

785 "Build-Depends-Indep", 

786 ): 

787 value = control.get(field) 

788 if value is not None: 

789 if value.strip() == "": 789 ↛ 790line 789 didn't jump to line 790 because the condition on line 789 was never true

790 raise Reject("{0}: empty {1} field".format(dsc_fn, field)) 

791 try: 

792 apt_pkg.parse_src_depends(value) 

793 except Exception as e: 

794 raise Reject( 

795 "{0}: APT could not parse {1} field: {2}".format( 

796 dsc_fn, field, e 

797 ) 

798 ) 

799 

800 rejects = utils.check_dsc_files(dsc_fn, control, list(source.files.keys())) 

801 if len(rejects) > 0: 801 ↛ 802line 801 didn't jump to line 802 because the condition on line 801 was never true

802 raise Reject("\n".join(rejects)) 

803 

804 return True 

805 

806 

807class SingleDistributionCheck(Check): 

808 """Check that the .changes targets only a single distribution.""" 

809 

810 @override 

811 def check(self, upload): 

812 if len(upload.changes.distributions) != 1: 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true

813 raise Reject("Only uploads to a single distribution are allowed.") 

814 

815 

816class ACLCheck(Check): 

817 """Check the uploader is allowed to upload the packages in .changes""" 

818 

819 def _does_hijack( 

820 self, session: "Session", upload: "daklib.archive.ArchiveUpload", suite: Suite 

821 ) -> tuple[Literal[True], str, str] | tuple[Literal[False], None, None]: 

822 # Try to catch hijacks. 

823 # This doesn't work correctly. Uploads to experimental can still 

824 # "hijack" binaries from unstable. Also one can hijack packages 

825 # via buildds (but people who try this should not be DMs). 

826 for binary_name in upload.changes.binary_names: 

827 binaries = ( 

828 session.query(DBBinary) 

829 .join(DBBinary.source) 

830 .filter(DBBinary.suites.contains(suite)) 

831 .filter(DBBinary.package == binary_name) 

832 ) 

833 for binary in binaries: 

834 if binary.source.source != upload.changes.changes["Source"]: 834 ↛ 835line 834 didn't jump to line 835 because the condition on line 834 was never true

835 return True, binary.package, binary.source.source 

836 return False, None, None 

837 

838 def _check_acl( 

839 self, session: "Session", upload: "daklib.archive.ArchiveUpload", acl: ACL 

840 ) -> tuple[Literal[False] | None, str] | tuple[Literal[True], None]: 

841 source_name = upload.changes.source_name 

842 fingerprint = upload.authorized_by_fingerprint 

843 

844 if acl.match_fingerprint and fingerprint not in acl.fingerprints: 844 ↛ 845line 844 didn't jump to line 845 because the condition on line 844 was never true

845 return None, "Fingerprint not in ACL" 

846 if acl.match_keyring is not None and fingerprint.keyring != acl.match_keyring: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true

847 return None, "Fingerprint not in ACL's keyring" 

848 

849 if not acl.allow_new: 

850 if upload.new: 

851 return False, "NEW uploads are not allowed" 

852 for f in upload.changes.files.values(): 

853 assert f.section is not None 

854 if f.section == "byhand" or f.section.startswith("raw-"): 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true

855 return False, "BYHAND uploads are not allowed" 

856 if not acl.allow_source and upload.changes.source is not None: 856 ↛ 857line 856 didn't jump to line 857 because the condition on line 856 was never true

857 return False, "sourceful uploads are not allowed" 

858 binaries = upload.changes.binaries 

859 if len(binaries) != 0: 

860 if not acl.allow_binary: 860 ↛ 861line 860 didn't jump to line 861 because the condition on line 860 was never true

861 return False, "binary uploads are not allowed" 

862 if upload.changes.source is None and not acl.allow_binary_only: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 return False, "binary-only uploads are not allowed" 

864 if not acl.allow_binary_all: 864 ↛ 865line 864 didn't jump to line 865 because the condition on line 864 was never true

865 uploaded_arches = set(upload.changes.architectures) 

866 uploaded_arches.discard("source") 

867 allowed_arches = set(a.arch_string for a in acl.architectures) 

868 forbidden_arches = uploaded_arches - allowed_arches 

869 if len(forbidden_arches) != 0: 

870 return ( 

871 False, 

872 "uploads for architecture(s) {0} are not allowed".format( 

873 ", ".join(forbidden_arches) 

874 ), 

875 ) 

876 if not acl.allow_hijack: 

877 assert upload.final_suites is not None 

878 for suite in upload.final_suites: 

879 does_hijack, hijacked_binary, hijacked_from = self._does_hijack( 

880 session, upload, suite 

881 ) 

882 if does_hijack: 882 ↛ 883line 882 didn't jump to line 883 because the condition on line 882 was never true

883 return ( 

884 False, 

885 "hijacks are not allowed (binary={0}, other-source={1})".format( 

886 hijacked_binary, hijacked_from 

887 ), 

888 ) 

889 

890 acl_per_source = ( 

891 session.query(ACLPerSource) 

892 .filter_by(acl=acl, fingerprint=fingerprint, source=source_name) 

893 .first() 

894 ) 

895 if acl.allow_per_source: 

896 if acl_per_source is None: 

897 return False, "not allowed to upload source package '{0}'".format( 

898 source_name 

899 ) 

900 if acl.deny_per_source and acl_per_source is not None: 900 ↛ 901line 900 didn't jump to line 901 because the condition on line 900 was never true

901 return ( 

902 False, 

903 acl_per_source.reason 

904 or "forbidden to upload source package '{0}'".format(source_name), 

905 ) 

906 

907 return True, None 

908 

909 @override 

910 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

911 session = upload.session 

912 fingerprint = upload.authorized_by_fingerprint 

913 keyring = fingerprint.keyring 

914 

915 if keyring is None: 915 ↛ 916line 915 didn't jump to line 916 because the condition on line 915 was never true

916 raise Reject( 

917 "No keyring for fingerprint {0}".format(fingerprint.fingerprint) 

918 ) 

919 if not keyring.active: 919 ↛ 920line 919 didn't jump to line 920 because the condition on line 919 was never true

920 raise Reject("Keyring {0} is not active".format(keyring.keyring_name)) 

921 

922 acl = fingerprint.acl or keyring.acl 

923 if acl is None: 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true

924 raise Reject("No ACL for fingerprint {0}".format(fingerprint.fingerprint)) 

925 result, reason = self._check_acl(session, upload, acl) 

926 if not result: 

927 assert reason is not None 

928 raise RejectACL(acl, reason) 

929 

930 for acl in session.query(ACL).filter_by(is_global=True): 

931 result, reason = self._check_acl(session, upload, acl) 

932 if result is False: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true

933 assert reason is not None 

934 raise RejectACL(acl, reason) 

935 

936 return True 

937 

938 @override 

939 def per_suite_check( 

940 self, upload: "daklib.archive.ArchiveUpload", suite: Suite 

941 ) -> bool: 

942 acls = suite.acls 

943 if len(acls) != 0: 943 ↛ 944line 943 didn't jump to line 944 because the condition on line 943 was never true

944 accept = False 

945 for acl in acls: 

946 result, reason = self._check_acl(upload.session, upload, acl) 

947 if result is False: 

948 raise Reject(reason) 

949 accept = accept or bool(result) 

950 if not accept: 

951 raise Reject( 

952 "Not accepted by any per-suite acl (suite={0})".format( 

953 suite.suite_name 

954 ) 

955 ) 

956 return True 

957 

958 

959class TransitionCheck(Check): 

960 """check for a transition""" 

961 

962 @override 

963 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

964 if not upload.changes.sourceful: 

965 return True 

966 

967 transitions = self.get_transitions() 

968 if transitions is None: 968 ↛ 971line 968 didn't jump to line 971 because the condition on line 968 was always true

969 return True 

970 

971 session = upload.session 

972 

973 control = upload.changes.changes 

974 source_match = re_field_source.match(control["Source"]) 

975 if source_match is None: 

976 raise Reject( 

977 f"{upload.changes.filename}: Source field does not match re_field_source" 

978 ) 

979 source = source_match.group("package") 

980 

981 for trans in transitions: 

982 t = transitions[trans] 

983 transition_source = t["source"] 

984 expected = t["new"] 

985 

986 # Will be None if nothing is in testing. 

987 current = get_source_in_suite(transition_source, "testing", session) 

988 if current is not None: 

989 compare = apt_pkg.version_compare(current.version, expected) 

990 

991 if current is None or compare < 0: 

992 # This is still valid, the current version in testing is older than 

993 # the new version we wait for, or there is none in testing yet 

994 

995 # Check if the source we look at is affected by this. 

996 if source in t["packages"]: 

997 # The source is affected, lets reject it. 

998 

999 rejectmsg = "{0}: part of the {1} transition.\n\n".format( 

1000 source, trans 

1001 ) 

1002 

1003 if current is not None: 

1004 currentlymsg = "at version {0}".format(current.version) 

1005 else: 

1006 currentlymsg = "not present in testing" 

1007 

1008 rejectmsg += "Transition description: {0}\n\n".format(t["reason"]) 

1009 

1010 rejectmsg += "\n".join( 

1011 textwrap.wrap( 

1012 """Your package 

1013is part of a testing transition designed to get {0} migrated (it is 

1014currently {1}, we need version {2}). This transition is managed by the 

1015Release Team, and {3} is the Release-Team member responsible for it. 

1016Please mail debian-release@lists.debian.org or contact {3} directly if you 

1017need further assistance. You might want to upload to experimental until this 

1018transition is done.""".format( 

1019 transition_source, currentlymsg, expected, t["rm"] 

1020 ) 

1021 ) 

1022 ) 

1023 

1024 raise Reject(rejectmsg) 

1025 

1026 return True 

1027 

1028 def get_transitions(self): 

1029 cnf = Config() 

1030 path = cnf.get("Dinstall::ReleaseTransitions", "") 

1031 if path == "" or not os.path.exists(path): 1031 ↛ 1034line 1031 didn't jump to line 1034 because the condition on line 1031 was always true

1032 return None 

1033 

1034 with open(path, "r") as fd: 

1035 contents = fd.read() 

1036 try: 

1037 transitions = yaml.safe_load(contents) 

1038 return transitions 

1039 except yaml.YAMLError as msg: 

1040 utils.warn( 

1041 "Not checking transitions, the transitions file is broken: {0}".format( 

1042 msg 

1043 ) 

1044 ) 

1045 

1046 return None 

1047 

1048 

1049class NoSourceOnlyCheck(Check): 

1050 def is_source_only_upload(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

1051 changes = upload.changes 

1052 if changes.source is not None and len(changes.binaries) == 0: 

1053 return True 

1054 return False 

1055 

1056 """Check for source-only upload 

1057 

1058 Source-only uploads are only allowed if Dinstall::AllowSourceOnlyUploads is 

1059 set. Otherwise they are rejected. 

1060 

1061 Source-only uploads are only accepted for source packages having a 

1062 Package-List field that also lists architectures per package. This 

1063 check can be disabled via 

1064 Dinstall::AllowSourceOnlyUploadsWithoutPackageList. 

1065 

1066 Source-only uploads to NEW are only allowed if 

1067 Dinstall::AllowSourceOnlyNew is set. 

1068 

1069 Uploads not including architecture-independent packages are only 

1070 allowed if Dinstall::AllowNoArchIndepUploads is set. 

1071 

1072 """ 

1073 

1074 @override 

1075 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

1076 if not self.is_source_only_upload(upload): 

1077 return True 

1078 changes = upload.changes 

1079 assert changes.source is not None 

1080 

1081 allow_source_only_uploads = Config().find_b("Dinstall::AllowSourceOnlyUploads") 

1082 allow_source_only_uploads_without_package_list = Config().find_b( 

1083 "Dinstall::AllowSourceOnlyUploadsWithoutPackageList" 

1084 ) 

1085 allow_source_only_new = Config().find_b("Dinstall::AllowSourceOnlyNew") 

1086 allow_source_only_new_keys = Config().value_list( 

1087 "Dinstall::AllowSourceOnlyNewKeys" 

1088 ) 

1089 allow_source_only_new_sources = Config().value_list( 

1090 "Dinstall::AllowSourceOnlyNewSources" 

1091 ) 

1092 allow_no_arch_indep_uploads = Config().find_b( 

1093 "Dinstall::AllowNoArchIndepUploads", True 

1094 ) 

1095 

1096 if not allow_source_only_uploads: 1096 ↛ 1097line 1096 didn't jump to line 1097 because the condition on line 1096 was never true

1097 raise Reject("Source-only uploads are not allowed.") 

1098 if ( 1098 ↛ 1102line 1098 didn't jump to line 1102

1099 not allow_source_only_uploads_without_package_list 

1100 and changes.source.package_list.fallback 

1101 ): 

1102 raise Reject( 

1103 "Source-only uploads are only allowed if a Package-List field that also list architectures is included in the source package. dpkg (>= 1.17.7) includes this information." 

1104 ) 

1105 if ( 1105 ↛ 1111line 1105 didn't jump to line 1111

1106 not allow_source_only_new 

1107 and upload.new 

1108 and changes.primary_fingerprint not in allow_source_only_new_keys 

1109 and changes.source_name not in allow_source_only_new_sources 

1110 ): 

1111 raise Reject("Source-only uploads to NEW are not allowed.") 

1112 

1113 if ( 

1114 "all" not in changes.architectures 

1115 and changes.source.package_list.has_arch_indep_packages() 

1116 ): 

1117 if not allow_no_arch_indep_uploads: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 raise Reject("Uploads must include architecture-independent packages.") 

1119 

1120 return True 

1121 

1122 

1123class NewOverrideCheck(Check): 

1124 """Override NEW requirement""" 

1125 

1126 @override 

1127 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

1128 if not upload.new: 

1129 return True 

1130 

1131 new_override_keys = Config().value_list("Dinstall::NewOverrideKeys") 

1132 changes = upload.changes 

1133 

1134 if changes.primary_fingerprint in new_override_keys: 1134 ↛ 1135line 1134 didn't jump to line 1135 because the condition on line 1134 was never true

1135 upload.new = False 

1136 

1137 return True 

1138 

1139 

1140class ArchAllBinNMUCheck(Check): 

1141 """Check for arch:all binNMUs""" 

1142 

1143 @override 

1144 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

1145 changes = upload.changes 

1146 

1147 if ( 1147 ↛ 1151line 1147 didn't jump to line 1151

1148 "all" in changes.architectures 

1149 and changes.changes.get("Binary-Only") == "yes" 

1150 ): 

1151 raise Reject("arch:all binNMUs are not allowed.") 

1152 

1153 return True 

1154 

1155 

1156class LintianCheck(Check): 

1157 """Check package using lintian""" 

1158 

1159 @override 

1160 def check(self, upload: "daklib.archive.ArchiveUpload") -> bool: 

1161 changes = upload.changes 

1162 

1163 # Only check sourceful uploads. 

1164 if changes.source is None: 

1165 return True 

1166 # Only check uploads to unstable or experimental. 

1167 if ( 1167 ↛ 1171line 1167 didn't jump to line 1171

1168 "unstable" not in changes.distributions 

1169 and "experimental" not in changes.distributions 

1170 ): 

1171 return True 

1172 

1173 cnf = Config() 

1174 if "Dinstall::LintianTags" not in cnf: 

1175 return True 

1176 tagfile = cnf["Dinstall::LintianTags"] 

1177 

1178 with open(tagfile, "r") as sourcefile: 

1179 sourcecontent = sourcefile.read() 

1180 try: 

1181 lintiantags = yaml.safe_load(sourcecontent)["lintian"] 

1182 except yaml.YAMLError as msg: 

1183 raise Exception( 

1184 "Could not read lintian tags file {0}, YAML error: {1}".format( 

1185 tagfile, msg 

1186 ) 

1187 ) 

1188 

1189 with tempfile.NamedTemporaryFile(mode="w+t") as temptagfile: 

1190 os.fchmod(temptagfile.fileno(), 0o644) 

1191 for tags in lintiantags.values(): 

1192 for tag in tags: 

1193 print(tag, file=temptagfile) 

1194 temptagfile.flush() 

1195 

1196 changespath = os.path.join(upload.directory, changes.filename) 

1197 

1198 tempdir = cnf.get("Dir::TempPath") or os.environ.get("TMPDIR", "/tmp") 

1199 cmd = [] 

1200 user = cnf.get("Dinstall::UnprivUser") or None 

1201 if user is not None: 1201 ↛ 1202line 1201 didn't jump to line 1202 because the condition on line 1201 was never true

1202 cmd.extend(["sudo", "-H", "-u", user, "TMPDIR={0}".format(tempdir)]) 

1203 cmd.extend( 

1204 [ 

1205 "/usr/bin/lintian", 

1206 "--show-overrides", 

1207 "--tags-from-file", 

1208 temptagfile.name, 

1209 changespath, 

1210 ] 

1211 ) 

1212 process = subprocess.run( 

1213 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8" 

1214 ) 

1215 output = process.stdout 

1216 result = process.returncode 

1217 

1218 if result == 2: 1218 ↛ 1219line 1218 didn't jump to line 1219 because the condition on line 1218 was never true

1219 utils.warn( 

1220 "lintian failed for %s [return code: %s]." % (changespath, result) 

1221 ) 

1222 utils.warn(utils.prefix_multi_line_string(output, " [possible output:] ")) 

1223 

1224 parsed_tags = lintian.parse_lintian_output(output) 

1225 rejects = list(lintian.generate_reject_messages(parsed_tags, lintiantags)) 

1226 if len(rejects) != 0: 1226 ↛ 1227line 1226 didn't jump to line 1227 because the condition on line 1226 was never true

1227 raise Reject("\n".join(rejects)) 

1228 

1229 return True 

1230 

1231 

1232class SourceFormatCheck(Check): 

1233 """Check source format is allowed in the target suite""" 

1234 

1235 @override 

1236 def per_suite_check( 

1237 self, upload: "daklib.archive.ArchiveUpload", suite: Suite 

1238 ) -> bool: 

1239 source = upload.changes.source 

1240 session = upload.session 

1241 if source is None: 

1242 return True 

1243 

1244 source_format = source.dsc["Format"] 

1245 query = ( 

1246 session.query(SrcFormat) 

1247 .filter_by(format_name=source_format) 

1248 .filter(SrcFormat.suites.contains(suite)) 

1249 ) 

1250 if query.first() is None: 

1251 raise Reject( 

1252 "source format {0} is not allowed in suite {1}".format( 

1253 source_format, suite.suite_name 

1254 ) 

1255 ) 

1256 

1257 return True 

1258 

1259 

1260class SuiteCheck(Check): 

1261 @override 

1262 def per_suite_check( 

1263 self, upload: "daklib.archive.ArchiveUpload", suite: Suite 

1264 ) -> bool: 

1265 if not suite.accept_source_uploads and upload.changes.source is not None: 1265 ↛ 1266line 1265 didn't jump to line 1266 because the condition on line 1265 was never true

1266 raise Reject( 

1267 'The suite "{0}" does not accept source uploads.'.format( 

1268 suite.suite_name 

1269 ) 

1270 ) 

1271 if not suite.accept_binary_uploads and len(upload.changes.binaries) != 0: 1271 ↛ 1272line 1271 didn't jump to line 1272 because the condition on line 1271 was never true

1272 raise Reject( 

1273 'The suite "{0}" does not accept binary uploads.'.format( 

1274 suite.suite_name 

1275 ) 

1276 ) 

1277 return True 

1278 

1279 

1280class SuiteArchitectureCheck(Check): 

1281 @override 

1282 def per_suite_check( 

1283 self, upload: "daklib.archive.ArchiveUpload", suite: Suite 

1284 ) -> bool: 

1285 session = upload.session 

1286 for arch in upload.changes.architectures: 

1287 query = ( 

1288 session.query(Architecture) 

1289 .filter_by(arch_string=arch) 

1290 .filter(Architecture.suites.contains(suite)) 

1291 ) 

1292 if query.first() is None: 

1293 raise Reject( 

1294 "Architecture {0} is not allowed in suite {1}".format( 

1295 arch, suite.suite_name 

1296 ) 

1297 ) 

1298 

1299 return True 

1300 

1301 

1302class VersionCheck(Check): 

1303 """Check version constraints""" 

1304 

1305 def _highest_source_version( 

1306 self, session: "Session", source_name: str, suite: Suite 

1307 ) -> str | None: 

1308 db_source = ( 

1309 session.query(DBSource) 

1310 .filter_by(source=source_name) 

1311 .filter(DBSource.suites.contains(suite)) 

1312 .order_by(DBSource.version.desc()) 

1313 .first() 

1314 ) 

1315 if db_source is None: 

1316 return None 

1317 else: 

1318 return db_source.version 

1319 

1320 def _highest_binary_version( 

1321 self, session: "Session", binary_name: str, suite: Suite, architecture: str 

1322 ) -> str | None: 

1323 db_binary = ( 

1324 session.query(DBBinary) 

1325 .filter_by(package=binary_name) 

1326 .filter(DBBinary.suites.contains(suite)) 

1327 .join(DBBinary.architecture) 

1328 .filter(Architecture.arch_string.in_(["all", architecture])) 

1329 .order_by(DBBinary.version.desc()) 

1330 .first() 

1331 ) 

1332 if db_binary is None: 

1333 return None 

1334 else: 

1335 return db_binary.version 

1336 

1337 def _version_checks( 

1338 self, 

1339 upload: "daklib.archive.ArchiveUpload", 

1340 suite: Suite, 

1341 other_suite: Suite, 

1342 op: Callable[[int], bool], 

1343 op_name: str, 

1344 ) -> None: 

1345 session = upload.session 

1346 

1347 if upload.changes.source is not None: 

1348 source_name = upload.changes.source.dsc["Source"] 

1349 source_version = upload.changes.source.dsc["Version"] 

1350 v = self._highest_source_version(session, source_name, other_suite) 

1351 if v is not None and not op(version_compare(source_version, v)): 1351 ↛ 1352line 1351 didn't jump to line 1352 because the condition on line 1351 was never true

1352 raise Reject( 

1353 "Version check failed:\n" 

1354 "Your upload included the source package {0}, version {1},\n" 

1355 "however {3} already has version {2}.\n" 

1356 "Uploads to {5} must have a {4} version than present in {3}.".format( 

1357 source_name, 

1358 source_version, 

1359 v, 

1360 other_suite.suite_name, 

1361 op_name, 

1362 suite.suite_name, 

1363 ) 

1364 ) 

1365 

1366 for binary in upload.changes.binaries: 

1367 binary_name = binary.control["Package"] 

1368 binary_version = binary.control["Version"] 

1369 architecture = binary.control["Architecture"] 

1370 v = self._highest_binary_version( 

1371 session, binary_name, other_suite, architecture 

1372 ) 

1373 if v is not None and not op(version_compare(binary_version, v)): 1373 ↛ 1374line 1373 didn't jump to line 1374 because the condition on line 1373 was never true

1374 raise Reject( 

1375 "Version check failed:\n" 

1376 "Your upload included the binary package {0}, version {1}, for {2},\n" 

1377 "however {4} already has version {3}.\n" 

1378 "Uploads to {6} must have a {5} version than present in {4}.".format( 

1379 binary_name, 

1380 binary_version, 

1381 architecture, 

1382 v, 

1383 other_suite.suite_name, 

1384 op_name, 

1385 suite.suite_name, 

1386 ) 

1387 ) 

1388 

1389 @override 

1390 def per_suite_check( 

1391 self, upload: "daklib.archive.ArchiveUpload", suite: Suite 

1392 ) -> bool: 

1393 session = upload.session 

1394 

1395 vc_newer = ( 

1396 session.query(dbconn.VersionCheck) 

1397 .filter_by(suite=suite) 

1398 .filter(dbconn.VersionCheck.check.in_(["MustBeNewerThan", "Enhances"])) 

1399 ) 

1400 must_be_newer_than = [vc.reference for vc in vc_newer] 

1401 # Must be newer than old versions in `suite` 

1402 must_be_newer_than.append(suite) 

1403 

1404 for s in must_be_newer_than: 

1405 self._version_checks(upload, suite, s, lambda result: result > 0, "higher") 

1406 

1407 vc_older = session.query(dbconn.VersionCheck).filter_by( 

1408 suite=suite, check="MustBeOlderThan" 

1409 ) 

1410 must_be_older_than = [vc.reference for vc in vc_older] 

1411 

1412 for s in must_be_older_than: 1412 ↛ 1413line 1412 didn't jump to line 1413 because the loop on line 1412 never started

1413 self._version_checks(upload, suite, s, lambda result: result < 0, "lower") 

1414 

1415 return True 

1416 

1417 @property 

1418 @override 

1419 def forcable(self) -> bool: 

1420 return True