1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# Parts based on code that is 

4# Copyright (C) 2001-2006, James Troup <james@nocrew.org> 

5# Copyright (C) 2009-2010, Joerg Jaspert <joerg@debian.org> 

6# 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11# 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License along 

18# with this program; if not, write to the Free Software Foundation, Inc., 

19# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

20 

21"""module provided pre-acceptance tests 

22 

23Please read the documentation for the :class:`Check` class for the interface. 

24""" 

25 

26import datetime 

27import os 

28import re 

29import subprocess 

30import tempfile 

31import textwrap 

32import time 

33from collections.abc import Iterable 

34from typing import TYPE_CHECKING 

35 

36import apt_inst 

37import apt_pkg 

38import yaml 

39from apt_pkg import version_compare 

40 

41import daklib.dbconn as dbconn 

42import daklib.gpg 

43import daklib.lintian as lintian 

44import daklib.upload 

45import daklib.utils as utils 

46from daklib.config import Config 

47from daklib.dbconn import ( 

48 ACL, 

49 ACLPerSource, 

50 Architecture, 

51 DBBinary, 

52 DBSource, 

53 SignatureHistory, 

54 SrcFormat, 

55 Suite, 

56 get_source_in_suite, 

57) 

58from daklib.regexes import ( 

59 re_field_package, 

60 re_field_source, 

61 re_field_version, 

62 re_field_version_upstream, 

63 re_file_binary, 

64 re_file_changes, 

65 re_file_dsc, 

66 re_file_orig, 

67 re_file_source, 

68 re_isanum, 

69) 

70from daklib.textutils import ParseMaintError, fix_maintainer 

71 

72if TYPE_CHECKING: 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true

73 import daklib.archive 

74 

75 

76def check_fields_for_valid_utf8(filename, control): 

77 """Check all fields of a control file for valid UTF-8""" 

78 for field in control.keys(): 

79 try: 

80 # Access the field value to make `TagSection` try to decode it. 

81 # We should also do the same for the field name, but this requires 

82 # https://bugs.debian.org/995118 to be fixed. 

83 # TODO: make sure the field name `field` is valid UTF-8 too 

84 control[field] 

85 except UnicodeDecodeError: 

86 raise Reject( 

87 "{0}: The {1} field is not valid UTF-8".format(filename, field) 

88 ) 

89 

90 

91class Reject(Exception): 

92 """exception raised by failing checks""" 

93 

94 pass 

95 

96 

97class RejectExternalFilesMismatch(Reject): 

98 """exception raised by failing the external hashes check""" 

99 

100 def __str__(self): 

101 return ( 

102 "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" 

103 % self.args[:4] 

104 ) 

105 

106 

107class RejectACL(Reject): 

108 """exception raise by failing ACL checks""" 

109 

110 def __init__(self, acl, reason): 

111 self.acl = acl 

112 self.reason = reason 

113 

114 def __str__(self): 

115 return "ACL {0}: {1}".format(self.acl.name, self.reason) 

116 

117 

118class Check: 

119 """base class for checks 

120 

121 checks are called by :class:`daklib.archive.ArchiveUpload`. Failing tests should 

122 raise a :exc:`daklib.checks.Reject` exception including a human-readable 

123 description why the upload should be rejected. 

124 """ 

125 

126 def check(self, upload: "daklib.archive.ArchiveUpload"): 

127 """do checks 

128 

129 :param upload: upload to check 

130 

131 :raises Reject: upload should be rejected 

132 """ 

133 raise NotImplementedError 

134 

135 def per_suite_check(self, upload: "daklib.archive.ArchiveUpload", suite: Suite): 

136 """do per-suite checks 

137 

138 :param upload: upload to check 

139 :param suite: suite to check 

140 

141 :raises Reject: upload should be rejected 

142 """ 

143 raise NotImplementedError 

144 

145 @property 

146 def forcable(self) -> bool: 

147 """allow to force ignore failing test 

148 

149 :const:`True` if it is acceptable to force ignoring a failing test, 

150 :const:`False` otherwise 

151 """ 

152 return False 

153 

154 

155class SignatureAndHashesCheck(Check): 

156 """Check signature of changes and dsc file (if included in upload) 

157 

158 Make sure the signature is valid and done by a known user. 

159 """ 

160 

161 def check_replay(self, upload) -> bool: 

162 # Use private session as we want to remember having seen the .changes 

163 # in all cases. 

164 session = upload.session 

165 history = SignatureHistory.from_signed_file(upload.changes) 

166 r = history.query(session) 

167 if r is not None: 167 ↛ 168line 167 didn't jump to line 168, because the condition on line 167 was never true

168 raise Reject( 

169 "Signature for changes file was already seen at {0}.\nPlease refresh the signature of the changes file if you want to upload it again.".format( 

170 r.seen 

171 ) 

172 ) 

173 return True 

174 

175 def check(self, upload): 

176 allow_source_untrusted_sig_keys = Config().value_list( 

177 "Dinstall::AllowSourceUntrustedSigKeys" 

178 ) 

179 

180 changes = upload.changes 

181 if not changes.valid_signature: 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true

182 raise Reject("Signature for .changes not valid.") 

183 self.check_replay(upload) 

184 self._check_hashes(upload, changes.filename, changes.files.values()) 

185 

186 source = None 

187 try: 

188 source = changes.source 

189 except Exception as e: 

190 raise Reject("Invalid dsc file: {0}".format(e)) 

191 if source is not None: 

192 if changes.primary_fingerprint not in allow_source_untrusted_sig_keys: 192 ↛ 197line 192 didn't jump to line 197, because the condition on line 192 was never false

193 if not source.valid_signature: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true

194 raise Reject("Signature for .dsc not valid.") 

195 if source.primary_fingerprint != changes.primary_fingerprint: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true

196 raise Reject(".changes and .dsc not signed by the same key.") 

197 self._check_hashes(upload, source.filename, source.files.values()) 

198 

199 if upload.fingerprint is None or upload.fingerprint.uid is None: 

200 raise Reject(".changes signed by unknown key.") 

201 

202 def _check_hashes( 

203 self, 

204 upload: "daklib.archive.ArchiveUpload", 

205 filename: str, 

206 files: Iterable[daklib.upload.HashedFile], 

207 ): 

208 """Make sure hashes match existing files 

209 

210 :param upload: upload we are processing 

211 :param filename: name of the file the expected hash values are taken from 

212 :param files: files to check the hashes for 

213 """ 

214 try: 

215 for f in files: 

216 f.check(upload.directory) 

217 except daklib.upload.FileDoesNotExist as e: 217 ↛ 225line 217 didn't jump to line 225

218 raise Reject( 

219 "{0}: {1}\n" 

220 "Perhaps you need to include the file in your upload?\n\n" 

221 "If the orig tarball is missing, the -sa flag for dpkg-buildpackage will be your friend.".format( 

222 filename, str(e) 

223 ) 

224 ) 

225 except daklib.upload.UploadException as e: 

226 raise Reject("{0}: {1}".format(filename, str(e))) 

227 

228 

229class WeakSignatureCheck(Check): 

230 """Check that .changes and .dsc are not signed using a weak algorithm""" 

231 

232 def check(self, upload): 

233 changes = upload.changes 

234 if changes.weak_signature: 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true

235 raise Reject( 

236 "The .changes was signed using a weak algorithm (such as SHA-1)" 

237 ) 

238 

239 source = changes.source 

240 if source is not None: 

241 if source.weak_signature: 241 ↛ 242line 241 didn't jump to line 242, because the condition on line 241 was never true

242 raise Reject( 

243 "The source package was signed using a weak algorithm (such as SHA-1)" 

244 ) 

245 

246 return True 

247 

248 

249def check_signature_timestamp(prefix: str, signed_file: daklib.gpg.SignedFile) -> bool: 

250 now = datetime.datetime.utcnow() 

251 timestamp = signed_file.signature_timestamp 

252 age = now - timestamp 

253 

254 age_max = datetime.timedelta(days=365) 

255 age_min = datetime.timedelta(days=-7) 

256 

257 if age > age_max: 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true

258 raise Reject( 

259 "{0}: Signature from {1} is too old (maximum age is {2} days)".format( 

260 prefix, timestamp, age_max.days 

261 ) 

262 ) 

263 if age < age_min: 263 ↛ 264line 263 didn't jump to line 264, because the condition on line 263 was never true

264 raise Reject( 

265 "{0}: Signature from {1} is too far in the future (tolerance is {2} days)".format( 

266 prefix, timestamp, abs(age_min.days) 

267 ) 

268 ) 

269 

270 

271class SignatureTimestampCheck(Check): 

272 """Check timestamp of .changes signature""" 

273 

274 def check(self, upload): 

275 return check_signature_timestamp( 

276 upload.changes.filename, upload.changes.signature 

277 ) 

278 

279 

280class ChangesCheck(Check): 

281 """Check changes file for syntax errors.""" 

282 

283 def check(self, upload): 

284 changes = upload.changes 

285 control = changes.changes 

286 fn = changes.filename 

287 

288 for field in ( 

289 "Distribution", 

290 "Source", 

291 "Architecture", 

292 "Version", 

293 "Maintainer", 

294 "Files", 

295 "Changes", 

296 ): 

297 if field not in control: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 raise Reject("{0}: misses mandatory field {1}".format(fn, field)) 

299 

300 if len(changes.binaries) > 0: 

301 for field in ("Binary", "Description"): 

302 if field not in control: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true

303 raise Reject( 

304 "{0}: binary upload requires {1} field".format(fn, field) 

305 ) 

306 

307 check_fields_for_valid_utf8(fn, control) 

308 

309 source_match = re_field_source.match(control["Source"]) 

310 if not source_match: 310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true

311 raise Reject("{0}: Invalid Source field".format(fn)) 

312 version_match = re_field_version.match(control["Version"]) 

313 if not version_match: 313 ↛ 314line 313 didn't jump to line 314, because the condition on line 313 was never true

314 raise Reject("{0}: Invalid Version field".format(fn)) 

315 version_without_epoch = version_match.group("without_epoch") 

316 

317 match = re_file_changes.match(fn) 

318 if not match: 318 ↛ 319line 318 didn't jump to line 319, because the condition on line 318 was never true

319 raise Reject("{0}: Does not match re_file_changes".format(fn)) 

320 if match.group("package") != source_match.group("package"): 320 ↛ 321line 320 didn't jump to line 321, because the condition on line 320 was never true

321 raise Reject("{0}: Filename does not match Source field".format(fn)) 

322 if match.group("version") != version_without_epoch: 322 ↛ 323line 322 didn't jump to line 323, because the condition on line 322 was never true

323 raise Reject("{0}: Filename does not match Version field".format(fn)) 

324 

325 for bn in changes.binary_names: 

326 if not re_field_package.match(bn): 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true

327 raise Reject("{0}: Invalid binary package name {1}".format(fn, bn)) 

328 

329 if changes.sourceful and changes.source is None: 329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true

330 raise Reject("Changes has architecture source, but no source found.") 

331 if changes.source is not None and not changes.sourceful: 331 ↛ 332line 331 didn't jump to line 332, because the condition on line 331 was never true

332 raise Reject("Upload includes source, but changes does not say so.") 

333 

334 try: 

335 fix_maintainer(changes.changes["Maintainer"]) 

336 except ParseMaintError as e: 

337 raise Reject( 

338 "{0}: Failed to parse Maintainer field: {1}".format(changes.filename, e) 

339 ) 

340 

341 try: 

342 changed_by = changes.changes.get("Changed-By") 

343 if changed_by is not None: 343 ↛ 350line 343 didn't jump to line 350, because the condition on line 343 was never false

344 fix_maintainer(changed_by) 

345 except ParseMaintError as e: 

346 raise Reject( 

347 "{0}: Failed to parse Changed-By field: {1}".format(changes.filename, e) 

348 ) 

349 

350 try: 

351 changes.byhand_files 

352 except daklib.upload.InvalidChangesException as e: 

353 raise Reject("{0}".format(e)) 

354 

355 if len(changes.files) == 0: 355 ↛ 356line 355 didn't jump to line 356, because the condition on line 355 was never true

356 raise Reject("Changes includes no files.") 

357 

358 for bugnum in changes.closed_bugs: 358 ↛ 359line 358 didn't jump to line 359, because the loop on line 358 never started

359 if not re_isanum.match(bugnum): 

360 raise Reject( 

361 '{0}: "{1}" in Closes field is not a number'.format( 

362 changes.filename, bugnum 

363 ) 

364 ) 

365 

366 return True 

367 

368 

369class ExternalHashesCheck(Check): 

370 """Checks hashes in .changes and .dsc against an external database.""" 

371 

372 def check_single(self, session, f): 

373 q = session.execute( 

374 "SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE :pattern", 

375 {"pattern": "%/{}".format(f.filename)}, 

376 ) 

377 (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or ( 

378 None, 

379 None, 

380 None, 

381 None, 

382 ) 

383 

384 if not ext_size: 

385 return 

386 

387 if ext_size != f.size: 

388 raise RejectExternalFilesMismatch(f.filename, "size", f.size, ext_size) 

389 

390 if ext_md5sum != f.md5sum: 

391 raise RejectExternalFilesMismatch( 

392 f.filename, "md5sum", f.md5sum, ext_md5sum 

393 ) 

394 

395 if ext_sha1sum != f.sha1sum: 

396 raise RejectExternalFilesMismatch( 

397 f.filename, "sha1sum", f.sha1sum, ext_sha1sum 

398 ) 

399 

400 if ext_sha256sum != f.sha256sum: 

401 raise RejectExternalFilesMismatch( 

402 f.filename, "sha256sum", f.sha256sum, ext_sha256sum 

403 ) 

404 

405 def check(self, upload): 

406 cnf = Config() 

407 

408 if not cnf.use_extfiles: 408 ↛ 411line 408 didn't jump to line 411, because the condition on line 408 was never false

409 return 

410 

411 session = upload.session 

412 changes = upload.changes 

413 

414 for f in changes.files.values(): 

415 self.check_single(session, f) 

416 source = changes.source 

417 if source is not None: 

418 for f in source.files.values(): 

419 self.check_single(session, f) 

420 

421 

422class BinaryCheck(Check): 

423 """Check binary packages for syntax errors.""" 

424 

425 def check(self, upload): 

426 debug_deb_name_postfix = "-dbgsym" 

427 # XXX: Handle dynamic debug section name here 

428 

429 self._architectures = set() 

430 

431 for binary in upload.changes.binaries: 

432 self.check_binary(upload, binary) 

433 

434 for arch in upload.changes.architectures: 

435 if arch == "source": 

436 continue 

437 if arch not in self._architectures: 437 ↛ 438line 437 didn't jump to line 438, because the condition on line 437 was never true

438 raise Reject( 

439 "{}: Architecture field includes {}, but no binary packages for {} are included in the upload".format( 

440 upload.changes.filename, arch, arch 

441 ) 

442 ) 

443 

444 binaries = { 

445 binary.control["Package"]: binary for binary in upload.changes.binaries 

446 } 

447 

448 for name, binary in list(binaries.items()): 

449 if name in upload.changes.binary_names: 449 ↛ 452line 449 didn't jump to line 452, because the condition on line 449 was never false

450 # Package is listed in Binary field. Everything is good. 

451 pass 

452 elif daklib.utils.is_in_debug_section(binary.control): 

453 # If we have a binary package in the debug section, we 

454 # can allow it to not be present in the Binary field 

455 # in the .changes file, so long as its name (without 

456 # -dbgsym) is present in the Binary list. 

457 if not name.endswith(debug_deb_name_postfix): 

458 raise Reject( 

459 "Package {0} is in the debug section, but " 

460 "does not end in {1}.".format(name, debug_deb_name_postfix) 

461 ) 

462 

463 # Right, so, it's named properly, let's check that 

464 # the corresponding package is in the Binary list 

465 origin_package_name = name[: -len(debug_deb_name_postfix)] 

466 if origin_package_name not in upload.changes.binary_names: 

467 raise Reject( 

468 "Debug package {debug}'s corresponding binary package " 

469 "{origin} is not present in the Binary field.".format( 

470 debug=name, origin=origin_package_name 

471 ) 

472 ) 

473 else: 

474 # Someone was a nasty little hacker and put a package 

475 # into the .changes that isn't in debian/control. Bad, 

476 # bad person. 

477 raise Reject( 

478 "Package {0} is not mentioned in Binary field in changes".format( 

479 name 

480 ) 

481 ) 

482 

483 return True 

484 

485 def check_binary(self, upload, binary): 

486 fn = binary.hashed_file.filename 

487 control = binary.control 

488 

489 for field in ("Package", "Architecture", "Version", "Description", "Section"): 

490 if field not in control: 490 ↛ 491line 490 didn't jump to line 491, because the condition on line 490 was never true

491 raise Reject("{0}: Missing mandatory field {1}.".format(fn, field)) 

492 

493 check_fields_for_valid_utf8(fn, control) 

494 

495 # check fields 

496 

497 package = control["Package"] 

498 if not re_field_package.match(package): 498 ↛ 499line 498 didn't jump to line 499, because the condition on line 498 was never true

499 raise Reject("{0}: Invalid Package field".format(fn)) 

500 

501 version = control["Version"] 

502 version_match = re_field_version.match(version) 

503 if not version_match: 503 ↛ 504line 503 didn't jump to line 504, because the condition on line 503 was never true

504 raise Reject("{0}: Invalid Version field".format(fn)) 

505 version_without_epoch = version_match.group("without_epoch") 

506 

507 architecture = control["Architecture"] 

508 if architecture not in upload.changes.architectures: 508 ↛ 509line 508 didn't jump to line 509, because the condition on line 508 was never true

509 raise Reject( 

510 "{0}: Architecture not in Architecture field in changes file".format(fn) 

511 ) 

512 if architecture == "source": 512 ↛ 513line 512 didn't jump to line 513, because the condition on line 512 was never true

513 raise Reject( 

514 '{0}: Architecture "source" invalid for binary packages'.format(fn) 

515 ) 

516 self._architectures.add(architecture) 

517 

518 source = control.get("Source") 

519 if source is not None and not re_field_source.match(source): 519 ↛ 520line 519 didn't jump to line 520, because the condition on line 519 was never true

520 raise Reject("{0}: Invalid Source field".format(fn)) 

521 

522 section = control.get("Section", "") 

523 if section == "" or section == "unknown" or section.endswith("/unknown"): 523 ↛ 524line 523 didn't jump to line 524, because the condition on line 523 was never true

524 raise Reject( 

525 '{0}: The "Section" field must be present and use a real section name.'.format( 

526 fn 

527 ) 

528 ) 

529 

530 # check filename 

531 

532 match = re_file_binary.match(fn) 

533 if package != match.group("package"): 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true

534 raise Reject("{0}: filename does not match Package field".format(fn)) 

535 if version_without_epoch != match.group("version"): 535 ↛ 536line 535 didn't jump to line 536, because the condition on line 535 was never true

536 raise Reject("{0}: filename does not match Version field".format(fn)) 

537 if architecture != match.group("architecture"): 537 ↛ 538line 537 didn't jump to line 538, because the condition on line 537 was never true

538 raise Reject("{0}: filename does not match Architecture field".format(fn)) 

539 

540 # check dependency field syntax 

541 

542 def check_dependency_field( 

543 field, 

544 control, 

545 dependency_parser=apt_pkg.parse_depends, 

546 allow_alternatives=True, 

547 allow_relations=("", "<", "<=", "=", ">=", ">"), 

548 ): 

549 value = control.get(field) 

550 if value is not None: 

551 if value.strip() == "": 551 ↛ 552line 551 didn't jump to line 552, because the condition on line 551 was never true

552 raise Reject("{0}: empty {1} field".format(fn, field)) 

553 try: 

554 depends = dependency_parser(value) 

555 except: 

556 raise Reject("{0}: APT could not parse {1} field".format(fn, field)) 

557 for group in depends: 

558 if not allow_alternatives and len(group) != 1: 558 ↛ 559line 558 didn't jump to line 559, because the condition on line 558 was never true

559 raise Reject( 

560 "{0}: {1}: alternatives are not allowed".format(fn, field) 

561 ) 

562 for dep_pkg, dep_ver, dep_rel in group: 

563 if dep_rel not in allow_relations: 563 ↛ 564line 563 didn't jump to line 564, because the condition on line 563 was never true

564 raise Reject( 

565 "{}: {}: depends on {}, but only relations {} are allowed for this field".format( 

566 fn, 

567 field, 

568 " ".join(dep_pkg, dep_rel, dep_ver), 

569 allow_relations, 

570 ) 

571 ) 

572 

573 for field in ( 

574 "Breaks", 

575 "Conflicts", 

576 "Depends", 

577 "Enhances", 

578 "Pre-Depends", 

579 "Recommends", 

580 "Replaces", 

581 "Suggests", 

582 ): 

583 check_dependency_field(field, control) 

584 

585 check_dependency_field( 

586 "Provides", control, allow_alternatives=False, allow_relations=("", "=") 

587 ) 

588 check_dependency_field( 

589 "Built-Using", 

590 control, 

591 dependency_parser=apt_pkg.parse_src_depends, 

592 allow_alternatives=False, 

593 allow_relations=("=",), 

594 ) 

595 

596 

597_DEB_ALLOWED_MEMBERS = { 

598 "debian-binary", 

599 *(f"control.tar.{comp}" for comp in ("gz", "xz")), 

600 *(f"data.tar.{comp}" for comp in ("gz", "bz2", "xz")), 

601} 

602 

603 

604class BinaryMembersCheck(Check): 

605 """check members of .deb file""" 

606 

607 def check(self, upload): 

608 for binary in upload.changes.binaries: 

609 filename = binary.hashed_file.filename 

610 path = os.path.join(upload.directory, filename) 

611 self._check_binary(filename, path) 

612 return True 

613 

614 def _check_binary(self, filename: str, path: str) -> None: 

615 deb = apt_inst.DebFile(path) 

616 members = set(member.name for member in deb.getmembers()) 

617 if blocked_members := members - _DEB_ALLOWED_MEMBERS: 617 ↛ 618line 617 didn't jump to line 618, because the condition on line 617 was never true

618 raise Reject( 

619 f"{filename}: Contains blocked members {', '.join(blocked_members)}" 

620 ) 

621 

622 

623class BinaryTimestampCheck(Check): 

624 """check timestamps of files in binary packages 

625 

626 Files in the near future cause ugly warnings and extreme time travel 

627 can cause errors on extraction. 

628 """ 

629 

630 def check(self, upload): 

631 cnf = Config() 

632 future_cutoff = time.time() + cnf.find_i( 

633 "Dinstall::FutureTimeTravelGrace", 24 * 3600 

634 ) 

635 past_cutoff = time.mktime( 

636 time.strptime(cnf.find("Dinstall::PastCutoffYear", "1975"), "%Y") 

637 ) 

638 

639 class TarTime: 

640 def __init__(self): 

641 self.future_files: dict[str, int] = {} 

642 self.past_files: dict[str, int] = {} 

643 

644 def callback(self, member, data) -> None: 

645 if member.mtime > future_cutoff: 645 ↛ 646line 645 didn't jump to line 646, because the condition on line 645 was never true

646 self.future_files[member.name] = member.mtime 

647 elif member.mtime < past_cutoff: 647 ↛ 648line 647 didn't jump to line 648, because the condition on line 647 was never true

648 self.past_files[member.name] = member.mtime 

649 

650 def format_reason(filename, direction, files) -> str: 

651 reason = ( 

652 "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format( 

653 filename, len(files), direction 

654 ) 

655 ) 

656 for fn, ts in files.items(): 

657 reason += " {0} ({1})".format(fn, time.ctime(ts)) 

658 return reason 

659 

660 for binary in upload.changes.binaries: 

661 filename = binary.hashed_file.filename 

662 path = os.path.join(upload.directory, filename) 

663 deb = apt_inst.DebFile(path) 

664 tar = TarTime() 

665 for archive in (deb.control, deb.data): 

666 archive.go(tar.callback) 

667 if tar.future_files: 667 ↛ 668line 667 didn't jump to line 668, because the condition on line 667 was never true

668 raise Reject(format_reason(filename, "future", tar.future_files)) 

669 if tar.past_files: 669 ↛ 670line 669 didn't jump to line 670, because the condition on line 669 was never true

670 raise Reject(format_reason(filename, "past", tar.past_files)) 

671 

672 

673class SourceCheck(Check): 

674 """Check source package for syntax errors.""" 

675 

676 def check_filename(self, control, filename, regex: re.Pattern) -> None: 

677 # In case we have an .orig.tar.*, we have to strip the Debian revison 

678 # from the version number. So handle this special case first. 

679 is_orig = True 

680 match = re_file_orig.match(filename) 

681 if not match: 

682 is_orig = False 

683 match = regex.match(filename) 

684 

685 if not match: 685 ↛ 686line 685 didn't jump to line 686, because the condition on line 685 was never true

686 raise Reject( 

687 "{0}: does not match regular expression for source filenames".format( 

688 filename 

689 ) 

690 ) 

691 if match.group("package") != control["Source"]: 691 ↛ 692line 691 didn't jump to line 692, because the condition on line 691 was never true

692 raise Reject("{0}: filename does not match Source field".format(filename)) 

693 

694 version = control["Version"] 

695 if is_orig: 

696 upstream_match = re_field_version_upstream.match(version) 

697 if not upstream_match: 697 ↛ 698line 697 didn't jump to line 698, because the condition on line 697 was never true

698 raise Reject( 

699 "{0}: Source package includes upstream tarball, but {1} has no Debian revision.".format( 

700 filename, version 

701 ) 

702 ) 

703 version = upstream_match.group("upstream") 

704 version_match = re_field_version.match(version) 

705 version_without_epoch = version_match.group("without_epoch") 

706 if match.group("version") != version_without_epoch: 706 ↛ 707line 706 didn't jump to line 707, because the condition on line 706 was never true

707 raise Reject("{0}: filename does not match Version field".format(filename)) 

708 

709 def check(self, upload): 

710 if upload.changes.source is None: 

711 if upload.changes.sourceful: 711 ↛ 712line 711 didn't jump to line 712, because the condition on line 711 was never true

712 raise Reject( 

713 "{}: Architecture field includes source, but no source package is included in the upload".format( 

714 upload.changes.filename 

715 ) 

716 ) 

717 return True 

718 

719 if not upload.changes.sourceful: 719 ↛ 720line 719 didn't jump to line 720, because the condition on line 719 was never true

720 raise Reject( 

721 "{}: Architecture field does not include source, but a source package is included in the upload".format( 

722 upload.changes.filename 

723 ) 

724 ) 

725 

726 changes = upload.changes.changes 

727 source = upload.changes.source 

728 control = source.dsc 

729 dsc_fn = source._dsc_file.filename 

730 

731 check_fields_for_valid_utf8(dsc_fn, control) 

732 

733 # check fields 

734 if not re_field_package.match(control["Source"]): 734 ↛ 735line 734 didn't jump to line 735, because the condition on line 734 was never true

735 raise Reject("{0}: Invalid Source field".format(dsc_fn)) 

736 if control["Source"] != changes["Source"]: 736 ↛ 737line 736 didn't jump to line 737, because the condition on line 736 was never true

737 raise Reject( 

738 "{0}: Source field does not match Source field in changes".format( 

739 dsc_fn 

740 ) 

741 ) 

742 if control["Version"] != changes["Version"]: 742 ↛ 743line 742 didn't jump to line 743, because the condition on line 742 was never true

743 raise Reject( 

744 "{0}: Version field does not match Version field in changes".format( 

745 dsc_fn 

746 ) 

747 ) 

748 

749 # check filenames 

750 self.check_filename(control, dsc_fn, re_file_dsc) 

751 for f in source.files.values(): 

752 self.check_filename(control, f.filename, re_file_source) 

753 

754 # check dependency field syntax 

755 for field in ( 

756 "Build-Conflicts", 

757 "Build-Conflicts-Indep", 

758 "Build-Depends", 

759 "Build-Depends-Arch", 

760 "Build-Depends-Indep", 

761 ): 

762 value = control.get(field) 

763 if value is not None: 

764 if value.strip() == "": 764 ↛ 765line 764 didn't jump to line 765, because the condition on line 764 was never true

765 raise Reject("{0}: empty {1} field".format(dsc_fn, field)) 

766 try: 

767 apt_pkg.parse_src_depends(value) 

768 except Exception as e: 

769 raise Reject( 

770 "{0}: APT could not parse {1} field: {2}".format( 

771 dsc_fn, field, e 

772 ) 

773 ) 

774 

775 rejects = utils.check_dsc_files(dsc_fn, control, list(source.files.keys())) 

776 if len(rejects) > 0: 776 ↛ 777line 776 didn't jump to line 777, because the condition on line 776 was never true

777 raise Reject("\n".join(rejects)) 

778 

779 return True 

780 

781 

782class SingleDistributionCheck(Check): 

783 """Check that the .changes targets only a single distribution.""" 

784 

785 def check(self, upload): 

786 if len(upload.changes.distributions) != 1: 786 ↛ 787line 786 didn't jump to line 787, because the condition on line 786 was never true

787 raise Reject("Only uploads to a single distribution are allowed.") 

788 

789 

790class ACLCheck(Check): 

791 """Check the uploader is allowed to upload the packages in .changes""" 

792 

793 def _does_hijack(self, session, upload, suite): 

794 # Try to catch hijacks. 

795 # This doesn't work correctly. Uploads to experimental can still 

796 # "hijack" binaries from unstable. Also one can hijack packages 

797 # via buildds (but people who try this should not be DMs). 

798 for binary_name in upload.changes.binary_names: 

799 binaries = ( 

800 session.query(DBBinary) 

801 .join(DBBinary.source) 

802 .filter(DBBinary.suites.contains(suite)) 

803 .filter(DBBinary.package == binary_name) 

804 ) 

805 for binary in binaries: 

806 if binary.source.source != upload.changes.changes["Source"]: 806 ↛ 807line 806 didn't jump to line 807, because the condition on line 806 was never true

807 return True, binary.package, binary.source.source 

808 return False, None, None 

809 

810 def _check_acl(self, session, upload, acl): 

811 source_name = upload.changes.source_name 

812 fingerprint = upload.authorized_by_fingerprint 

813 

814 if acl.match_fingerprint and fingerprint not in acl.fingerprints: 814 ↛ 815line 814 didn't jump to line 815, because the condition on line 814 was never true

815 return None, None 

816 if acl.match_keyring is not None and fingerprint.keyring != acl.match_keyring: 816 ↛ 817line 816 didn't jump to line 817, because the condition on line 816 was never true

817 return None, None 

818 

819 if not acl.allow_new: 

820 if upload.new: 

821 return False, "NEW uploads are not allowed" 

822 for f in upload.changes.files.values(): 

823 if f.section == "byhand" or f.section.startswith("raw-"): 823 ↛ 824line 823 didn't jump to line 824, because the condition on line 823 was never true

824 return False, "BYHAND uploads are not allowed" 

825 if not acl.allow_source and upload.changes.source is not None: 825 ↛ 826line 825 didn't jump to line 826, because the condition on line 825 was never true

826 return False, "sourceful uploads are not allowed" 

827 binaries = upload.changes.binaries 

828 if len(binaries) != 0: 

829 if not acl.allow_binary: 829 ↛ 830line 829 didn't jump to line 830, because the condition on line 829 was never true

830 return False, "binary uploads are not allowed" 

831 if upload.changes.source is None and not acl.allow_binary_only: 831 ↛ 832line 831 didn't jump to line 832, because the condition on line 831 was never true

832 return False, "binary-only uploads are not allowed" 

833 if not acl.allow_binary_all: 833 ↛ 834line 833 didn't jump to line 834, because the condition on line 833 was never true

834 uploaded_arches = set(upload.changes.architectures) 

835 uploaded_arches.discard("source") 

836 allowed_arches = set(a.arch_string for a in acl.architectures) 

837 forbidden_arches = uploaded_arches - allowed_arches 

838 if len(forbidden_arches) != 0: 

839 return ( 

840 False, 

841 "uploads for architecture(s) {0} are not allowed".format( 

842 ", ".join(forbidden_arches) 

843 ), 

844 ) 

845 if not acl.allow_hijack: 

846 for suite in upload.final_suites: 

847 does_hijack, hijacked_binary, hijacked_from = self._does_hijack( 

848 session, upload, suite 

849 ) 

850 if does_hijack: 850 ↛ 851line 850 didn't jump to line 851, because the condition on line 850 was never true

851 return ( 

852 False, 

853 "hijacks are not allowed (binary={0}, other-source={1})".format( 

854 hijacked_binary, hijacked_from 

855 ), 

856 ) 

857 

858 acl_per_source = ( 

859 session.query(ACLPerSource) 

860 .filter_by(acl=acl, fingerprint=fingerprint, source=source_name) 

861 .first() 

862 ) 

863 if acl.allow_per_source: 

864 if acl_per_source is None: 

865 return False, "not allowed to upload source package '{0}'".format( 

866 source_name 

867 ) 

868 if acl.deny_per_source and acl_per_source is not None: 868 ↛ 869line 868 didn't jump to line 869, because the condition on line 868 was never true

869 return ( 

870 False, 

871 acl_per_source.reason 

872 or "forbidden to upload source package '{0}'".format(source_name), 

873 ) 

874 

875 return True, None 

876 

877 def check(self, upload): 

878 session = upload.session 

879 fingerprint = upload.authorized_by_fingerprint 

880 keyring = fingerprint.keyring 

881 

882 if keyring is None: 882 ↛ 883line 882 didn't jump to line 883, because the condition on line 882 was never true

883 raise Reject( 

884 "No keyring for fingerprint {0}".format(fingerprint.fingerprint) 

885 ) 

886 if not keyring.active: 886 ↛ 887line 886 didn't jump to line 887, because the condition on line 886 was never true

887 raise Reject("Keyring {0} is not active".format(keyring.name)) 

888 

889 acl = fingerprint.acl or keyring.acl 

890 if acl is None: 890 ↛ 891line 890 didn't jump to line 891, because the condition on line 890 was never true

891 raise Reject("No ACL for fingerprint {0}".format(fingerprint.fingerprint)) 

892 result, reason = self._check_acl(session, upload, acl) 

893 if not result: 

894 raise RejectACL(acl, reason) 

895 

896 for acl in session.query(ACL).filter_by(is_global=True): 

897 result, reason = self._check_acl(session, upload, acl) 

898 if result is False: 898 ↛ 899line 898 didn't jump to line 899, because the condition on line 898 was never true

899 raise RejectACL(acl, reason) 

900 

901 return True 

902 

903 def per_suite_check(self, upload, suite): 

904 acls = suite.acls 

905 if len(acls) != 0: 905 ↛ 906line 905 didn't jump to line 906, because the condition on line 905 was never true

906 accept = False 

907 for acl in acls: 

908 result, reason = self._check_acl(upload.session, upload, acl) 

909 if result is False: 

910 raise Reject(reason) 

911 accept = accept or result 

912 if not accept: 

913 raise Reject( 

914 "Not accepted by any per-suite acl (suite={0})".format( 

915 suite.suite_name 

916 ) 

917 ) 

918 return True 

919 

920 

921class TransitionCheck(Check): 

922 """check for a transition""" 

923 

924 def check(self, upload): 

925 if not upload.changes.sourceful: 

926 return True 

927 

928 transitions = self.get_transitions() 

929 if transitions is None: 929 ↛ 932line 929 didn't jump to line 932, because the condition on line 929 was never false

930 return True 

931 

932 session = upload.session 

933 

934 control = upload.changes.changes 

935 source = re_field_source.match(control["Source"]).group("package") 

936 

937 for trans in transitions: 

938 t = transitions[trans] 

939 transition_source = t["source"] 

940 expected = t["new"] 

941 

942 # Will be None if nothing is in testing. 

943 current = get_source_in_suite(transition_source, "testing", session) 

944 if current is not None: 

945 compare = apt_pkg.version_compare(current.version, expected) 

946 

947 if current is None or compare < 0: 

948 # This is still valid, the current version in testing is older than 

949 # the new version we wait for, or there is none in testing yet 

950 

951 # Check if the source we look at is affected by this. 

952 if source in t["packages"]: 

953 # The source is affected, lets reject it. 

954 

955 rejectmsg = "{0}: part of the {1} transition.\n\n".format( 

956 source, trans 

957 ) 

958 

959 if current is not None: 

960 currentlymsg = "at version {0}".format(current.version) 

961 else: 

962 currentlymsg = "not present in testing" 

963 

964 rejectmsg += "Transition description: {0}\n\n".format(t["reason"]) 

965 

966 rejectmsg += "\n".join( 

967 textwrap.wrap( 

968 """Your package 

969is part of a testing transition designed to get {0} migrated (it is 

970currently {1}, we need version {2}). This transition is managed by the 

971Release Team, and {3} is the Release-Team member responsible for it. 

972Please mail debian-release@lists.debian.org or contact {3} directly if you 

973need further assistance. You might want to upload to experimental until this 

974transition is done.""".format( 

975 transition_source, currentlymsg, expected, t["rm"] 

976 ) 

977 ) 

978 ) 

979 

980 raise Reject(rejectmsg) 

981 

982 return True 

983 

984 def get_transitions(self): 

985 cnf = Config() 

986 path = cnf.get("Dinstall::ReleaseTransitions", "") 

987 if path == "" or not os.path.exists(path): 987 ↛ 990line 987 didn't jump to line 990, because the condition on line 987 was never false

988 return None 

989 

990 with open(path, "r") as fd: 

991 contents = fd.read() 

992 try: 

993 transitions = yaml.safe_load(contents) 

994 return transitions 

995 except yaml.YAMLError as msg: 

996 utils.warn( 

997 "Not checking transitions, the transitions file is broken: {0}".format( 

998 msg 

999 ) 

1000 ) 

1001 

1002 return None 

1003 

1004 

1005class NoSourceOnlyCheck(Check): 

1006 def is_source_only_upload(self, upload) -> bool: 

1007 changes = upload.changes 

1008 if changes.source is not None and len(changes.binaries) == 0: 

1009 return True 

1010 return False 

1011 

1012 """Check for source-only upload 

1013 

1014 Source-only uploads are only allowed if Dinstall::AllowSourceOnlyUploads is 

1015 set. Otherwise they are rejected. 

1016 

1017 Source-only uploads are only accepted for source packages having a 

1018 Package-List field that also lists architectures per package. This 

1019 check can be disabled via 

1020 Dinstall::AllowSourceOnlyUploadsWithoutPackageList. 

1021 

1022 Source-only uploads to NEW are only allowed if 

1023 Dinstall::AllowSourceOnlyNew is set. 

1024 

1025 Uploads not including architecture-independent packages are only 

1026 allowed if Dinstall::AllowNoArchIndepUploads is set. 

1027 

1028 """ 

1029 

1030 def check(self, upload): 

1031 if not self.is_source_only_upload(upload): 

1032 return True 

1033 

1034 allow_source_only_uploads = Config().find_b("Dinstall::AllowSourceOnlyUploads") 

1035 allow_source_only_uploads_without_package_list = Config().find_b( 

1036 "Dinstall::AllowSourceOnlyUploadsWithoutPackageList" 

1037 ) 

1038 allow_source_only_new = Config().find_b("Dinstall::AllowSourceOnlyNew") 

1039 allow_source_only_new_keys = Config().value_list( 

1040 "Dinstall::AllowSourceOnlyNewKeys" 

1041 ) 

1042 allow_source_only_new_sources = Config().value_list( 

1043 "Dinstall::AllowSourceOnlyNewSources" 

1044 ) 

1045 allow_no_arch_indep_uploads = Config().find_b( 

1046 "Dinstall::AllowNoArchIndepUploads", True 

1047 ) 

1048 changes = upload.changes 

1049 

1050 if not allow_source_only_uploads: 1050 ↛ 1051line 1050 didn't jump to line 1051, because the condition on line 1050 was never true

1051 raise Reject("Source-only uploads are not allowed.") 

1052 if ( 1052 ↛ 1056line 1052 didn't jump to line 1056

1053 not allow_source_only_uploads_without_package_list 

1054 and changes.source.package_list.fallback 

1055 ): 

1056 raise Reject( 

1057 "Source-only uploads are only allowed if a Package-List field that also list architectures is included in the source package. dpkg (>= 1.17.7) includes this information." 

1058 ) 

1059 if ( 1059 ↛ 1065line 1059 didn't jump to line 1065

1060 not allow_source_only_new 

1061 and upload.new 

1062 and changes.primary_fingerprint not in allow_source_only_new_keys 

1063 and changes.source_name not in allow_source_only_new_sources 

1064 ): 

1065 raise Reject("Source-only uploads to NEW are not allowed.") 

1066 

1067 if ( 

1068 "all" not in changes.architectures 

1069 and changes.source.package_list.has_arch_indep_packages() 

1070 ): 

1071 if not allow_no_arch_indep_uploads: 1071 ↛ 1072line 1071 didn't jump to line 1072, because the condition on line 1071 was never true

1072 raise Reject("Uploads must include architecture-independent packages.") 

1073 

1074 return True 

1075 

1076 

1077class NewOverrideCheck(Check): 

1078 """Override NEW requirement""" 

1079 

1080 def check(self, upload): 

1081 if not upload.new: 

1082 return True 

1083 

1084 new_override_keys = Config().value_list("Dinstall::NewOverrideKeys") 

1085 changes = upload.changes 

1086 

1087 if changes.primary_fingerprint in new_override_keys: 1087 ↛ 1088line 1087 didn't jump to line 1088, because the condition on line 1087 was never true

1088 upload.new = False 

1089 

1090 return True 

1091 

1092 

1093class ArchAllBinNMUCheck(Check): 

1094 """Check for arch:all binNMUs""" 

1095 

1096 def check(self, upload): 

1097 changes = upload.changes 

1098 

1099 if ( 1099 ↛ 1103line 1099 didn't jump to line 1103

1100 "all" in changes.architectures 

1101 and changes.changes.get("Binary-Only") == "yes" 

1102 ): 

1103 raise Reject("arch:all binNMUs are not allowed.") 

1104 

1105 return True 

1106 

1107 

1108class LintianCheck(Check): 

1109 """Check package using lintian""" 

1110 

1111 def check(self, upload): 

1112 changes = upload.changes 

1113 

1114 # Only check sourceful uploads. 

1115 if changes.source is None: 

1116 return True 

1117 # Only check uploads to unstable or experimental. 

1118 if ( 1118 ↛ 1122line 1118 didn't jump to line 1122

1119 "unstable" not in changes.distributions 

1120 and "experimental" not in changes.distributions 

1121 ): 

1122 return True 

1123 

1124 cnf = Config() 

1125 if "Dinstall::LintianTags" not in cnf: 

1126 return True 

1127 tagfile = cnf["Dinstall::LintianTags"] 

1128 

1129 with open(tagfile, "r") as sourcefile: 

1130 sourcecontent = sourcefile.read() 

1131 try: 

1132 lintiantags = yaml.safe_load(sourcecontent)["lintian"] 

1133 except yaml.YAMLError as msg: 

1134 raise Exception( 

1135 "Could not read lintian tags file {0}, YAML error: {1}".format( 

1136 tagfile, msg 

1137 ) 

1138 ) 

1139 

1140 with tempfile.NamedTemporaryFile(mode="w+t") as temptagfile: 

1141 os.fchmod(temptagfile.fileno(), 0o644) 

1142 for tags in lintiantags.values(): 

1143 for tag in tags: 

1144 print(tag, file=temptagfile) 

1145 temptagfile.flush() 

1146 

1147 changespath = os.path.join(upload.directory, changes.filename) 

1148 

1149 cmd = [] 

1150 user = cnf.get("Dinstall::UnprivUser") or None 

1151 if user is not None: 1151 ↛ 1152line 1151 didn't jump to line 1152, because the condition on line 1151 was never true

1152 cmd.extend(["sudo", "-H", "-u", user]) 

1153 cmd.extend( 

1154 [ 

1155 "/usr/bin/lintian", 

1156 "--show-overrides", 

1157 "--tags-from-file", 

1158 temptagfile.name, 

1159 changespath, 

1160 ] 

1161 ) 

1162 process = subprocess.run( 

1163 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8" 

1164 ) 

1165 output = process.stdout 

1166 result = process.returncode 

1167 

1168 if result == 2: 1168 ↛ 1169line 1168 didn't jump to line 1169, because the condition on line 1168 was never true

1169 utils.warn( 

1170 "lintian failed for %s [return code: %s]." % (changespath, result) 

1171 ) 

1172 utils.warn(utils.prefix_multi_line_string(output, " [possible output:] ")) 

1173 

1174 parsed_tags = lintian.parse_lintian_output(output) 

1175 rejects = list(lintian.generate_reject_messages(parsed_tags, lintiantags)) 

1176 if len(rejects) != 0: 1176 ↛ 1177line 1176 didn't jump to line 1177, because the condition on line 1176 was never true

1177 raise Reject("\n".join(rejects)) 

1178 

1179 return True 

1180 

1181 

1182class SourceFormatCheck(Check): 

1183 """Check source format is allowed in the target suite""" 

1184 

1185 def per_suite_check(self, upload, suite): 

1186 source = upload.changes.source 

1187 session = upload.session 

1188 if source is None: 

1189 return True 

1190 

1191 source_format = source.dsc["Format"] 

1192 query = ( 

1193 session.query(SrcFormat) 

1194 .filter_by(format_name=source_format) 

1195 .filter(SrcFormat.suites.contains(suite)) 

1196 ) 

1197 if query.first() is None: 

1198 raise Reject( 

1199 "source format {0} is not allowed in suite {1}".format( 

1200 source_format, suite.suite_name 

1201 ) 

1202 ) 

1203 

1204 

1205class SuiteCheck(Check): 

1206 def per_suite_check(self, upload, suite): 

1207 if not suite.accept_source_uploads and upload.changes.source is not None: 1207 ↛ 1208line 1207 didn't jump to line 1208, because the condition on line 1207 was never true

1208 raise Reject( 

1209 'The suite "{0}" does not accept source uploads.'.format( 

1210 suite.suite_name 

1211 ) 

1212 ) 

1213 if not suite.accept_binary_uploads and len(upload.changes.binaries) != 0: 1213 ↛ 1214line 1213 didn't jump to line 1214, because the condition on line 1213 was never true

1214 raise Reject( 

1215 'The suite "{0}" does not accept binary uploads.'.format( 

1216 suite.suite_name 

1217 ) 

1218 ) 

1219 return True 

1220 

1221 

1222class SuiteArchitectureCheck(Check): 

1223 def per_suite_check(self, upload, suite): 

1224 session = upload.session 

1225 for arch in upload.changes.architectures: 

1226 query = ( 

1227 session.query(Architecture) 

1228 .filter_by(arch_string=arch) 

1229 .filter(Architecture.suites.contains(suite)) 

1230 ) 

1231 if query.first() is None: 

1232 raise Reject( 

1233 "Architecture {0} is not allowed in suite {1}".format( 

1234 arch, suite.suite_name 

1235 ) 

1236 ) 

1237 

1238 return True 

1239 

1240 

1241class VersionCheck(Check): 

1242 """Check version constraints""" 

1243 

1244 def _highest_source_version(self, session, source_name, suite): 

1245 db_source = ( 

1246 session.query(DBSource) 

1247 .filter_by(source=source_name) 

1248 .filter(DBSource.suites.contains(suite)) 

1249 .order_by(DBSource.version.desc()) 

1250 .first() 

1251 ) 

1252 if db_source is None: 

1253 return None 

1254 else: 

1255 return db_source.version 

1256 

1257 def _highest_binary_version(self, session, binary_name, suite, architecture): 

1258 db_binary = ( 

1259 session.query(DBBinary) 

1260 .filter_by(package=binary_name) 

1261 .filter(DBBinary.suites.contains(suite)) 

1262 .join(DBBinary.architecture) 

1263 .filter(Architecture.arch_string.in_(["all", architecture])) 

1264 .order_by(DBBinary.version.desc()) 

1265 .first() 

1266 ) 

1267 if db_binary is None: 

1268 return None 

1269 else: 

1270 return db_binary.version 

1271 

1272 def _version_checks(self, upload, suite, other_suite, op, op_name): 

1273 session = upload.session 

1274 

1275 if upload.changes.source is not None: 

1276 source_name = upload.changes.source.dsc["Source"] 

1277 source_version = upload.changes.source.dsc["Version"] 

1278 v = self._highest_source_version(session, source_name, other_suite) 

1279 if v is not None and not op(version_compare(source_version, v)): 1279 ↛ 1280line 1279 didn't jump to line 1280, because the condition on line 1279 was never true

1280 raise Reject( 

1281 "Version check failed:\n" 

1282 "Your upload included the source package {0}, version {1},\n" 

1283 "however {3} already has version {2}.\n" 

1284 "Uploads to {5} must have a {4} version than present in {3}.".format( 

1285 source_name, 

1286 source_version, 

1287 v, 

1288 other_suite.suite_name, 

1289 op_name, 

1290 suite.suite_name, 

1291 ) 

1292 ) 

1293 

1294 for binary in upload.changes.binaries: 

1295 binary_name = binary.control["Package"] 

1296 binary_version = binary.control["Version"] 

1297 architecture = binary.control["Architecture"] 

1298 v = self._highest_binary_version( 

1299 session, binary_name, other_suite, architecture 

1300 ) 

1301 if v is not None and not op(version_compare(binary_version, v)): 1301 ↛ 1302line 1301 didn't jump to line 1302, because the condition on line 1301 was never true

1302 raise Reject( 

1303 "Version check failed:\n" 

1304 "Your upload included the binary package {0}, version {1}, for {2},\n" 

1305 "however {4} already has version {3}.\n" 

1306 "Uploads to {6} must have a {5} version than present in {4}.".format( 

1307 binary_name, 

1308 binary_version, 

1309 architecture, 

1310 v, 

1311 other_suite.suite_name, 

1312 op_name, 

1313 suite.suite_name, 

1314 ) 

1315 ) 

1316 

1317 def per_suite_check(self, upload, suite): 

1318 session = upload.session 

1319 

1320 vc_newer = ( 

1321 session.query(dbconn.VersionCheck) 

1322 .filter_by(suite=suite) 

1323 .filter(dbconn.VersionCheck.check.in_(["MustBeNewerThan", "Enhances"])) 

1324 ) 

1325 must_be_newer_than = [vc.reference for vc in vc_newer] 

1326 # Must be newer than old versions in `suite` 

1327 must_be_newer_than.append(suite) 

1328 

1329 for s in must_be_newer_than: 

1330 self._version_checks(upload, suite, s, lambda result: result > 0, "higher") 

1331 

1332 vc_older = session.query(dbconn.VersionCheck).filter_by( 

1333 suite=suite, check="MustBeOlderThan" 

1334 ) 

1335 must_be_older_than = [vc.reference for vc in vc_older] 

1336 

1337 for s in must_be_older_than: 1337 ↛ 1338line 1337 didn't jump to line 1338, because the loop on line 1337 never started

1338 self._version_checks(upload, suite, s, lambda result: result < 0, "lower") 

1339 

1340 return True 

1341 

1342 @property 

1343 def forcable(self) -> bool: 

1344 return True