1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# Parts based on code that is 

4# Copyright (C) 2001-2006, James Troup <james@nocrew.org> 

5# Copyright (C) 2009-2010, Joerg Jaspert <joerg@debian.org> 

6# 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11# 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License along 

18# with this program; if not, write to the Free Software Foundation, Inc., 

19# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

20 

21"""module provided pre-acceptance tests 

22 

23Please read the documentation for the :class:`Check` class for the interface. 

24""" 

25 

26import datetime 

27import os 

28import re 

29import subprocess 

30import tempfile 

31import textwrap 

32import time 

33from collections.abc import Iterable 

34from typing import TYPE_CHECKING 

35 

36import apt_inst 

37import apt_pkg 

38import yaml 

39from apt_pkg import version_compare 

40 

41import daklib.dbconn as dbconn 

42import daklib.lintian as lintian 

43import daklib.upload 

44import daklib.utils as utils 

45from daklib.config import Config 

46from daklib.dbconn import ( 

47 ACL, 

48 ACLPerSource, 

49 Architecture, 

50 DBBinary, 

51 DBSource, 

52 SignatureHistory, 

53 SrcFormat, 

54 Suite, 

55 get_source_in_suite, 

56) 

57from daklib.regexes import ( 

58 re_field_package, 

59 re_field_source, 

60 re_field_version, 

61 re_field_version_upstream, 

62 re_file_binary, 

63 re_file_changes, 

64 re_file_dsc, 

65 re_file_orig, 

66 re_file_source, 

67 re_isanum, 

68) 

69from daklib.textutils import ParseMaintError, fix_maintainer 

70 

71if TYPE_CHECKING: 71 ↛ 72line 71 didn't jump to line 72, because the condition on line 71 was never true

72 import daklib.archive 

73 

74 

75def check_fields_for_valid_utf8(filename, control): 

76 """Check all fields of a control file for valid UTF-8""" 

77 for field in control.keys(): 

78 try: 

79 # Access the field value to make `TagSection` try to decode it. 

80 # We should also do the same for the field name, but this requires 

81 # https://bugs.debian.org/995118 to be fixed. 

82 # TODO: make sure the field name `field` is valid UTF-8 too 

83 control[field] 

84 except UnicodeDecodeError: 

85 raise Reject( 

86 "{0}: The {1} field is not valid UTF-8".format(filename, field) 

87 ) 

88 

89 

90class Reject(Exception): 

91 """exception raised by failing checks""" 

92 

93 pass 

94 

95 

96class RejectExternalFilesMismatch(Reject): 

97 """exception raised by failing the external hashes check""" 

98 

99 def __str__(self): 

100 return ( 

101 "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" 

102 % self.args[:4] 

103 ) 

104 

105 

106class RejectACL(Reject): 

107 """exception raise by failing ACL checks""" 

108 

109 def __init__(self, acl, reason): 

110 self.acl = acl 

111 self.reason = reason 

112 

113 def __str__(self): 

114 return "ACL {0}: {1}".format(self.acl.name, self.reason) 

115 

116 

117class Check: 

118 """base class for checks 

119 

120 checks are called by :class:`daklib.archive.ArchiveUpload`. Failing tests should 

121 raise a :exc:`daklib.checks.Reject` exception including a human-readable 

122 description why the upload should be rejected. 

123 """ 

124 

125 def check(self, upload: "daklib.archive.ArchiveUpload"): 

126 """do checks 

127 

128 :param upload: upload to check 

129 

130 :raises Reject: upload should be rejected 

131 """ 

132 raise NotImplementedError 

133 

134 def per_suite_check(self, upload: "daklib.archive.ArchiveUpload", suite: Suite): 

135 """do per-suite checks 

136 

137 :param upload: upload to check 

138 :param suite: suite to check 

139 

140 :raises Reject: upload should be rejected 

141 """ 

142 raise NotImplementedError 

143 

144 @property 

145 def forcable(self) -> bool: 

146 """allow to force ignore failing test 

147 

148 :const:`True` if it is acceptable to force ignoring a failing test, 

149 :const:`False` otherwise 

150 """ 

151 return False 

152 

153 

154class SignatureAndHashesCheck(Check): 

155 """Check signature of changes and dsc file (if included in upload) 

156 

157 Make sure the signature is valid and done by a known user. 

158 """ 

159 

160 def check_replay(self, upload) -> bool: 

161 # Use private session as we want to remember having seen the .changes 

162 # in all cases. 

163 session = upload.session 

164 history = SignatureHistory.from_signed_file(upload.changes) 

165 r = history.query(session) 

166 if r is not None: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true

167 raise Reject( 

168 "Signature for changes file was already seen at {0}.\nPlease refresh the signature of the changes file if you want to upload it again.".format( 

169 r.seen 

170 ) 

171 ) 

172 return True 

173 

174 def check(self, upload): 

175 allow_source_untrusted_sig_keys = Config().value_list( 

176 "Dinstall::AllowSourceUntrustedSigKeys" 

177 ) 

178 

179 changes = upload.changes 

180 if not changes.valid_signature: 180 ↛ 181line 180 didn't jump to line 181, because the condition on line 180 was never true

181 raise Reject("Signature for .changes not valid.") 

182 self.check_replay(upload) 

183 self._check_hashes(upload, changes.filename, changes.files.values()) 

184 

185 source = None 

186 try: 

187 source = changes.source 

188 except Exception as e: 

189 raise Reject("Invalid dsc file: {0}".format(e)) 

190 if source is not None: 

191 if changes.primary_fingerprint not in allow_source_untrusted_sig_keys: 191 ↛ 196line 191 didn't jump to line 196, because the condition on line 191 was never false

192 if not source.valid_signature: 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true

193 raise Reject("Signature for .dsc not valid.") 

194 if source.primary_fingerprint != changes.primary_fingerprint: 194 ↛ 195line 194 didn't jump to line 195, because the condition on line 194 was never true

195 raise Reject(".changes and .dsc not signed by the same key.") 

196 self._check_hashes(upload, source.filename, source.files.values()) 

197 

198 if upload.fingerprint is None or upload.fingerprint.uid is None: 

199 raise Reject(".changes signed by unknown key.") 

200 

201 def _check_hashes( 

202 self, 

203 upload: "daklib.archive.ArchiveUpload", 

204 filename: str, 

205 files: Iterable[daklib.upload.HashedFile], 

206 ): 

207 """Make sure hashes match existing files 

208 

209 :param upload: upload we are processing 

210 :param filename: name of the file the expected hash values are taken from 

211 :param files: files to check the hashes for 

212 """ 

213 try: 

214 for f in files: 

215 f.check(upload.directory) 

216 except daklib.upload.FileDoesNotExist as e: 216 ↛ 224line 216 didn't jump to line 224

217 raise Reject( 

218 "{0}: {1}\n" 

219 "Perhaps you need to include the file in your upload?\n\n" 

220 "If the orig tarball is missing, the -sa flag for dpkg-buildpackage will be your friend.".format( 

221 filename, str(e) 

222 ) 

223 ) 

224 except daklib.upload.UploadException as e: 

225 raise Reject("{0}: {1}".format(filename, str(e))) 

226 

227 

228class WeakSignatureCheck(Check): 

229 """Check that .changes and .dsc are not signed using a weak algorithm""" 

230 

231 def check(self, upload): 

232 changes = upload.changes 

233 if changes.weak_signature: 233 ↛ 234line 233 didn't jump to line 234, because the condition on line 233 was never true

234 raise Reject( 

235 "The .changes was signed using a weak algorithm (such as SHA-1)" 

236 ) 

237 

238 source = changes.source 

239 if source is not None: 

240 if source.weak_signature: 240 ↛ 241line 240 didn't jump to line 241, because the condition on line 240 was never true

241 raise Reject( 

242 "The source package was signed using a weak algorithm (such as SHA-1)" 

243 ) 

244 

245 return True 

246 

247 

248class SignatureTimestampCheck(Check): 

249 """Check timestamp of .changes signature""" 

250 

251 def check(self, upload): 

252 changes = upload.changes 

253 

254 now = datetime.datetime.utcnow() 

255 timestamp = changes.signature_timestamp 

256 age = now - timestamp 

257 

258 age_max = datetime.timedelta(days=365) 

259 age_min = datetime.timedelta(days=-7) 

260 

261 if age > age_max: 261 ↛ 262line 261 didn't jump to line 262, because the condition on line 261 was never true

262 raise Reject( 

263 "{0}: Signature from {1} is too old (maximum age is {2} days)".format( 

264 changes.filename, timestamp, age_max.days 

265 ) 

266 ) 

267 if age < age_min: 267 ↛ 268line 267 didn't jump to line 268, because the condition on line 267 was never true

268 raise Reject( 

269 "{0}: Signature from {1} is too far in the future (tolerance is {2} days)".format( 

270 changes.filename, timestamp, abs(age_min.days) 

271 ) 

272 ) 

273 

274 return True 

275 

276 

277class ChangesCheck(Check): 

278 """Check changes file for syntax errors.""" 

279 

280 def check(self, upload): 

281 changes = upload.changes 

282 control = changes.changes 

283 fn = changes.filename 

284 

285 for field in ( 

286 "Distribution", 

287 "Source", 

288 "Architecture", 

289 "Version", 

290 "Maintainer", 

291 "Files", 

292 "Changes", 

293 ): 

294 if field not in control: 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true

295 raise Reject("{0}: misses mandatory field {1}".format(fn, field)) 

296 

297 if len(changes.binaries) > 0: 

298 for field in ("Binary", "Description"): 

299 if field not in control: 299 ↛ 300line 299 didn't jump to line 300, because the condition on line 299 was never true

300 raise Reject( 

301 "{0}: binary upload requires {1} field".format(fn, field) 

302 ) 

303 

304 check_fields_for_valid_utf8(fn, control) 

305 

306 source_match = re_field_source.match(control["Source"]) 

307 if not source_match: 307 ↛ 308line 307 didn't jump to line 308, because the condition on line 307 was never true

308 raise Reject("{0}: Invalid Source field".format(fn)) 

309 version_match = re_field_version.match(control["Version"]) 

310 if not version_match: 310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true

311 raise Reject("{0}: Invalid Version field".format(fn)) 

312 version_without_epoch = version_match.group("without_epoch") 

313 

314 match = re_file_changes.match(fn) 

315 if not match: 315 ↛ 316line 315 didn't jump to line 316, because the condition on line 315 was never true

316 raise Reject("{0}: Does not match re_file_changes".format(fn)) 

317 if match.group("package") != source_match.group("package"): 317 ↛ 318line 317 didn't jump to line 318, because the condition on line 317 was never true

318 raise Reject("{0}: Filename does not match Source field".format(fn)) 

319 if match.group("version") != version_without_epoch: 319 ↛ 320line 319 didn't jump to line 320, because the condition on line 319 was never true

320 raise Reject("{0}: Filename does not match Version field".format(fn)) 

321 

322 for bn in changes.binary_names: 

323 if not re_field_package.match(bn): 323 ↛ 324line 323 didn't jump to line 324, because the condition on line 323 was never true

324 raise Reject("{0}: Invalid binary package name {1}".format(fn, bn)) 

325 

326 if changes.sourceful and changes.source is None: 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true

327 raise Reject("Changes has architecture source, but no source found.") 

328 if changes.source is not None and not changes.sourceful: 328 ↛ 329line 328 didn't jump to line 329, because the condition on line 328 was never true

329 raise Reject("Upload includes source, but changes does not say so.") 

330 

331 try: 

332 fix_maintainer(changes.changes["Maintainer"]) 

333 except ParseMaintError as e: 

334 raise Reject( 

335 "{0}: Failed to parse Maintainer field: {1}".format(changes.filename, e) 

336 ) 

337 

338 try: 

339 changed_by = changes.changes.get("Changed-By") 

340 if changed_by is not None: 340 ↛ 347line 340 didn't jump to line 347, because the condition on line 340 was never false

341 fix_maintainer(changed_by) 

342 except ParseMaintError as e: 

343 raise Reject( 

344 "{0}: Failed to parse Changed-By field: {1}".format(changes.filename, e) 

345 ) 

346 

347 try: 

348 changes.byhand_files 

349 except daklib.upload.InvalidChangesException as e: 

350 raise Reject("{0}".format(e)) 

351 

352 if len(changes.files) == 0: 352 ↛ 353line 352 didn't jump to line 353, because the condition on line 352 was never true

353 raise Reject("Changes includes no files.") 

354 

355 for bugnum in changes.closed_bugs: 355 ↛ 356line 355 didn't jump to line 356, because the loop on line 355 never started

356 if not re_isanum.match(bugnum): 

357 raise Reject( 

358 '{0}: "{1}" in Closes field is not a number'.format( 

359 changes.filename, bugnum 

360 ) 

361 ) 

362 

363 return True 

364 

365 

366class ExternalHashesCheck(Check): 

367 """Checks hashes in .changes and .dsc against an external database.""" 

368 

369 def check_single(self, session, f): 

370 q = session.execute( 

371 "SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE :pattern", 

372 {"pattern": "%/{}".format(f.filename)}, 

373 ) 

374 (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or ( 

375 None, 

376 None, 

377 None, 

378 None, 

379 ) 

380 

381 if not ext_size: 

382 return 

383 

384 if ext_size != f.size: 

385 raise RejectExternalFilesMismatch(f.filename, "size", f.size, ext_size) 

386 

387 if ext_md5sum != f.md5sum: 

388 raise RejectExternalFilesMismatch( 

389 f.filename, "md5sum", f.md5sum, ext_md5sum 

390 ) 

391 

392 if ext_sha1sum != f.sha1sum: 

393 raise RejectExternalFilesMismatch( 

394 f.filename, "sha1sum", f.sha1sum, ext_sha1sum 

395 ) 

396 

397 if ext_sha256sum != f.sha256sum: 

398 raise RejectExternalFilesMismatch( 

399 f.filename, "sha256sum", f.sha256sum, ext_sha256sum 

400 ) 

401 

402 def check(self, upload): 

403 cnf = Config() 

404 

405 if not cnf.use_extfiles: 405 ↛ 408line 405 didn't jump to line 408, because the condition on line 405 was never false

406 return 

407 

408 session = upload.session 

409 changes = upload.changes 

410 

411 for f in changes.files.values(): 

412 self.check_single(session, f) 

413 source = changes.source 

414 if source is not None: 

415 for f in source.files.values(): 

416 self.check_single(session, f) 

417 

418 

419class BinaryCheck(Check): 

420 """Check binary packages for syntax errors.""" 

421 

422 def check(self, upload): 

423 debug_deb_name_postfix = "-dbgsym" 

424 # XXX: Handle dynamic debug section name here 

425 

426 self._architectures = set() 

427 

428 for binary in upload.changes.binaries: 

429 self.check_binary(upload, binary) 

430 

431 for arch in upload.changes.architectures: 

432 if arch == "source": 

433 continue 

434 if arch not in self._architectures: 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true

435 raise Reject( 

436 "{}: Architecture field includes {}, but no binary packages for {} are included in the upload".format( 

437 upload.changes.filename, arch, arch 

438 ) 

439 ) 

440 

441 binaries = { 

442 binary.control["Package"]: binary for binary in upload.changes.binaries 

443 } 

444 

445 for name, binary in list(binaries.items()): 

446 if name in upload.changes.binary_names: 446 ↛ 449line 446 didn't jump to line 449, because the condition on line 446 was never false

447 # Package is listed in Binary field. Everything is good. 

448 pass 

449 elif daklib.utils.is_in_debug_section(binary.control): 

450 # If we have a binary package in the debug section, we 

451 # can allow it to not be present in the Binary field 

452 # in the .changes file, so long as its name (without 

453 # -dbgsym) is present in the Binary list. 

454 if not name.endswith(debug_deb_name_postfix): 

455 raise Reject( 

456 "Package {0} is in the debug section, but " 

457 "does not end in {1}.".format(name, debug_deb_name_postfix) 

458 ) 

459 

460 # Right, so, it's named properly, let's check that 

461 # the corresponding package is in the Binary list 

462 origin_package_name = name[: -len(debug_deb_name_postfix)] 

463 if origin_package_name not in upload.changes.binary_names: 

464 raise Reject( 

465 "Debug package {debug}'s corresponding binary package " 

466 "{origin} is not present in the Binary field.".format( 

467 debug=name, origin=origin_package_name 

468 ) 

469 ) 

470 else: 

471 # Someone was a nasty little hacker and put a package 

472 # into the .changes that isn't in debian/control. Bad, 

473 # bad person. 

474 raise Reject( 

475 "Package {0} is not mentioned in Binary field in changes".format( 

476 name 

477 ) 

478 ) 

479 

480 return True 

481 

482 def check_binary(self, upload, binary): 

483 fn = binary.hashed_file.filename 

484 control = binary.control 

485 

486 for field in ("Package", "Architecture", "Version", "Description", "Section"): 

487 if field not in control: 487 ↛ 488line 487 didn't jump to line 488, because the condition on line 487 was never true

488 raise Reject("{0}: Missing mandatory field {1}.".format(fn, field)) 

489 

490 check_fields_for_valid_utf8(fn, control) 

491 

492 # check fields 

493 

494 package = control["Package"] 

495 if not re_field_package.match(package): 495 ↛ 496line 495 didn't jump to line 496, because the condition on line 495 was never true

496 raise Reject("{0}: Invalid Package field".format(fn)) 

497 

498 version = control["Version"] 

499 version_match = re_field_version.match(version) 

500 if not version_match: 500 ↛ 501line 500 didn't jump to line 501, because the condition on line 500 was never true

501 raise Reject("{0}: Invalid Version field".format(fn)) 

502 version_without_epoch = version_match.group("without_epoch") 

503 

504 architecture = control["Architecture"] 

505 if architecture not in upload.changes.architectures: 505 ↛ 506line 505 didn't jump to line 506, because the condition on line 505 was never true

506 raise Reject( 

507 "{0}: Architecture not in Architecture field in changes file".format(fn) 

508 ) 

509 if architecture == "source": 509 ↛ 510line 509 didn't jump to line 510, because the condition on line 509 was never true

510 raise Reject( 

511 '{0}: Architecture "source" invalid for binary packages'.format(fn) 

512 ) 

513 self._architectures.add(architecture) 

514 

515 source = control.get("Source") 

516 if source is not None and not re_field_source.match(source): 516 ↛ 517line 516 didn't jump to line 517, because the condition on line 516 was never true

517 raise Reject("{0}: Invalid Source field".format(fn)) 

518 

519 section = control.get("Section", "") 

520 if section == "" or section == "unknown" or section.endswith("/unknown"): 520 ↛ 521line 520 didn't jump to line 521, because the condition on line 520 was never true

521 raise Reject( 

522 '{0}: The "Section" field must be present and use a real section name.'.format( 

523 fn 

524 ) 

525 ) 

526 

527 # check filename 

528 

529 match = re_file_binary.match(fn) 

530 if package != match.group("package"): 530 ↛ 531line 530 didn't jump to line 531, because the condition on line 530 was never true

531 raise Reject("{0}: filename does not match Package field".format(fn)) 

532 if version_without_epoch != match.group("version"): 532 ↛ 533line 532 didn't jump to line 533, because the condition on line 532 was never true

533 raise Reject("{0}: filename does not match Version field".format(fn)) 

534 if architecture != match.group("architecture"): 534 ↛ 535line 534 didn't jump to line 535, because the condition on line 534 was never true

535 raise Reject("{0}: filename does not match Architecture field".format(fn)) 

536 

537 # check dependency field syntax 

538 

539 def check_dependency_field( 

540 field, 

541 control, 

542 dependency_parser=apt_pkg.parse_depends, 

543 allow_alternatives=True, 

544 allow_relations=("", "<", "<=", "=", ">=", ">"), 

545 ): 

546 value = control.get(field) 

547 if value is not None: 

548 if value.strip() == "": 548 ↛ 549line 548 didn't jump to line 549, because the condition on line 548 was never true

549 raise Reject("{0}: empty {1} field".format(fn, field)) 

550 try: 

551 depends = dependency_parser(value) 

552 except: 

553 raise Reject("{0}: APT could not parse {1} field".format(fn, field)) 

554 for group in depends: 

555 if not allow_alternatives and len(group) != 1: 555 ↛ 556line 555 didn't jump to line 556, because the condition on line 555 was never true

556 raise Reject( 

557 "{0}: {1}: alternatives are not allowed".format(fn, field) 

558 ) 

559 for dep_pkg, dep_ver, dep_rel in group: 

560 if dep_rel not in allow_relations: 560 ↛ 561line 560 didn't jump to line 561, because the condition on line 560 was never true

561 raise Reject( 

562 "{}: {}: depends on {}, but only relations {} are allowed for this field".format( 

563 fn, 

564 field, 

565 " ".join(dep_pkg, dep_rel, dep_ver), 

566 allow_relations, 

567 ) 

568 ) 

569 

570 for field in ( 

571 "Breaks", 

572 "Conflicts", 

573 "Depends", 

574 "Enhances", 

575 "Pre-Depends", 

576 "Recommends", 

577 "Replaces", 

578 "Suggests", 

579 ): 

580 check_dependency_field(field, control) 

581 

582 check_dependency_field( 

583 "Provides", control, allow_alternatives=False, allow_relations=("", "=") 

584 ) 

585 check_dependency_field( 

586 "Built-Using", 

587 control, 

588 dependency_parser=apt_pkg.parse_src_depends, 

589 allow_alternatives=False, 

590 allow_relations=("=",), 

591 ) 

592 

593 

594_DEB_ALLOWED_MEMBERS = { 

595 "debian-binary", 

596 *(f"control.tar.{comp}" for comp in ("gz", "xz")), 

597 *(f"data.tar.{comp}" for comp in ("gz", "bz2", "xz")), 

598} 

599 

600 

601class BinaryMembersCheck(Check): 

602 """check members of .deb file""" 

603 

604 def check(self, upload): 

605 for binary in upload.changes.binaries: 

606 filename = binary.hashed_file.filename 

607 path = os.path.join(upload.directory, filename) 

608 self._check_binary(filename, path) 

609 return True 

610 

611 def _check_binary(self, filename: str, path: str) -> None: 

612 deb = apt_inst.DebFile(path) 

613 members = set(member.name for member in deb.getmembers()) 

614 if blocked_members := members - _DEB_ALLOWED_MEMBERS: 614 ↛ 615line 614 didn't jump to line 615, because the condition on line 614 was never true

615 raise Reject( 

616 f"{filename}: Contains blocked members {', '.join(blocked_members)}" 

617 ) 

618 

619 

620class BinaryTimestampCheck(Check): 

621 """check timestamps of files in binary packages 

622 

623 Files in the near future cause ugly warnings and extreme time travel 

624 can cause errors on extraction. 

625 """ 

626 

627 def check(self, upload): 

628 cnf = Config() 

629 future_cutoff = time.time() + cnf.find_i( 

630 "Dinstall::FutureTimeTravelGrace", 24 * 3600 

631 ) 

632 past_cutoff = time.mktime( 

633 time.strptime(cnf.find("Dinstall::PastCutoffYear", "1975"), "%Y") 

634 ) 

635 

636 class TarTime: 

637 def __init__(self): 

638 self.future_files: dict[str, int] = {} 

639 self.past_files: dict[str, int] = {} 

640 

641 def callback(self, member, data) -> None: 

642 if member.mtime > future_cutoff: 642 ↛ 643line 642 didn't jump to line 643, because the condition on line 642 was never true

643 self.future_files[member.name] = member.mtime 

644 elif member.mtime < past_cutoff: 644 ↛ 645line 644 didn't jump to line 645, because the condition on line 644 was never true

645 self.past_files[member.name] = member.mtime 

646 

647 def format_reason(filename, direction, files) -> str: 

648 reason = ( 

649 "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format( 

650 filename, len(files), direction 

651 ) 

652 ) 

653 for fn, ts in files.items(): 

654 reason += " {0} ({1})".format(fn, time.ctime(ts)) 

655 return reason 

656 

657 for binary in upload.changes.binaries: 

658 filename = binary.hashed_file.filename 

659 path = os.path.join(upload.directory, filename) 

660 deb = apt_inst.DebFile(path) 

661 tar = TarTime() 

662 for archive in (deb.control, deb.data): 

663 archive.go(tar.callback) 

664 if tar.future_files: 664 ↛ 665line 664 didn't jump to line 665, because the condition on line 664 was never true

665 raise Reject(format_reason(filename, "future", tar.future_files)) 

666 if tar.past_files: 666 ↛ 667line 666 didn't jump to line 667, because the condition on line 666 was never true

667 raise Reject(format_reason(filename, "past", tar.past_files)) 

668 

669 

670class SourceCheck(Check): 

671 """Check source package for syntax errors.""" 

672 

673 def check_filename(self, control, filename, regex: re.Pattern) -> None: 

674 # In case we have an .orig.tar.*, we have to strip the Debian revison 

675 # from the version number. So handle this special case first. 

676 is_orig = True 

677 match = re_file_orig.match(filename) 

678 if not match: 

679 is_orig = False 

680 match = regex.match(filename) 

681 

682 if not match: 682 ↛ 683line 682 didn't jump to line 683, because the condition on line 682 was never true

683 raise Reject( 

684 "{0}: does not match regular expression for source filenames".format( 

685 filename 

686 ) 

687 ) 

688 if match.group("package") != control["Source"]: 688 ↛ 689line 688 didn't jump to line 689, because the condition on line 688 was never true

689 raise Reject("{0}: filename does not match Source field".format(filename)) 

690 

691 version = control["Version"] 

692 if is_orig: 

693 upstream_match = re_field_version_upstream.match(version) 

694 if not upstream_match: 694 ↛ 695line 694 didn't jump to line 695, because the condition on line 694 was never true

695 raise Reject( 

696 "{0}: Source package includes upstream tarball, but {1} has no Debian revision.".format( 

697 filename, version 

698 ) 

699 ) 

700 version = upstream_match.group("upstream") 

701 version_match = re_field_version.match(version) 

702 version_without_epoch = version_match.group("without_epoch") 

703 if match.group("version") != version_without_epoch: 703 ↛ 704line 703 didn't jump to line 704, because the condition on line 703 was never true

704 raise Reject("{0}: filename does not match Version field".format(filename)) 

705 

706 def check(self, upload): 

707 if upload.changes.source is None: 

708 if upload.changes.sourceful: 708 ↛ 709line 708 didn't jump to line 709, because the condition on line 708 was never true

709 raise Reject( 

710 "{}: Architecture field includes source, but no source package is included in the upload".format( 

711 upload.changes.filename 

712 ) 

713 ) 

714 return True 

715 

716 if not upload.changes.sourceful: 716 ↛ 717line 716 didn't jump to line 717, because the condition on line 716 was never true

717 raise Reject( 

718 "{}: Architecture field does not include source, but a source package is included in the upload".format( 

719 upload.changes.filename 

720 ) 

721 ) 

722 

723 changes = upload.changes.changes 

724 source = upload.changes.source 

725 control = source.dsc 

726 dsc_fn = source._dsc_file.filename 

727 

728 check_fields_for_valid_utf8(dsc_fn, control) 

729 

730 # check fields 

731 if not re_field_package.match(control["Source"]): 731 ↛ 732line 731 didn't jump to line 732, because the condition on line 731 was never true

732 raise Reject("{0}: Invalid Source field".format(dsc_fn)) 

733 if control["Source"] != changes["Source"]: 733 ↛ 734line 733 didn't jump to line 734, because the condition on line 733 was never true

734 raise Reject( 

735 "{0}: Source field does not match Source field in changes".format( 

736 dsc_fn 

737 ) 

738 ) 

739 if control["Version"] != changes["Version"]: 739 ↛ 740line 739 didn't jump to line 740, because the condition on line 739 was never true

740 raise Reject( 

741 "{0}: Version field does not match Version field in changes".format( 

742 dsc_fn 

743 ) 

744 ) 

745 

746 # check filenames 

747 self.check_filename(control, dsc_fn, re_file_dsc) 

748 for f in source.files.values(): 

749 self.check_filename(control, f.filename, re_file_source) 

750 

751 # check dependency field syntax 

752 for field in ( 

753 "Build-Conflicts", 

754 "Build-Conflicts-Indep", 

755 "Build-Depends", 

756 "Build-Depends-Arch", 

757 "Build-Depends-Indep", 

758 ): 

759 value = control.get(field) 

760 if value is not None: 

761 if value.strip() == "": 761 ↛ 762line 761 didn't jump to line 762, because the condition on line 761 was never true

762 raise Reject("{0}: empty {1} field".format(dsc_fn, field)) 

763 try: 

764 apt_pkg.parse_src_depends(value) 

765 except Exception as e: 

766 raise Reject( 

767 "{0}: APT could not parse {1} field: {2}".format( 

768 dsc_fn, field, e 

769 ) 

770 ) 

771 

772 rejects = utils.check_dsc_files(dsc_fn, control, list(source.files.keys())) 

773 if len(rejects) > 0: 773 ↛ 774line 773 didn't jump to line 774, because the condition on line 773 was never true

774 raise Reject("\n".join(rejects)) 

775 

776 return True 

777 

778 

779class SingleDistributionCheck(Check): 

780 """Check that the .changes targets only a single distribution.""" 

781 

782 def check(self, upload): 

783 if len(upload.changes.distributions) != 1: 783 ↛ 784line 783 didn't jump to line 784, because the condition on line 783 was never true

784 raise Reject("Only uploads to a single distribution are allowed.") 

785 

786 

787class ACLCheck(Check): 

788 """Check the uploader is allowed to upload the packages in .changes""" 

789 

790 def _does_hijack(self, session, upload, suite): 

791 # Try to catch hijacks. 

792 # This doesn't work correctly. Uploads to experimental can still 

793 # "hijack" binaries from unstable. Also one can hijack packages 

794 # via buildds (but people who try this should not be DMs). 

795 for binary_name in upload.changes.binary_names: 

796 binaries = ( 

797 session.query(DBBinary) 

798 .join(DBBinary.source) 

799 .filter(DBBinary.suites.contains(suite)) 

800 .filter(DBBinary.package == binary_name) 

801 ) 

802 for binary in binaries: 

803 if binary.source.source != upload.changes.changes["Source"]: 803 ↛ 804line 803 didn't jump to line 804, because the condition on line 803 was never true

804 return True, binary.package, binary.source.source 

805 return False, None, None 

806 

807 def _check_acl(self, session, upload, acl): 

808 source_name = upload.changes.source_name 

809 

810 if acl.match_fingerprint and upload.fingerprint not in acl.fingerprints: 810 ↛ 811line 810 didn't jump to line 811, because the condition on line 810 was never true

811 return None, None 

812 if ( 812 ↛ 816line 812 didn't jump to line 816

813 acl.match_keyring is not None 

814 and upload.fingerprint.keyring != acl.match_keyring 

815 ): 

816 return None, None 

817 

818 if not acl.allow_new: 

819 if upload.new: 

820 return False, "NEW uploads are not allowed" 

821 for f in upload.changes.files.values(): 

822 if f.section == "byhand" or f.section.startswith("raw-"): 822 ↛ 823line 822 didn't jump to line 823, because the condition on line 822 was never true

823 return False, "BYHAND uploads are not allowed" 

824 if not acl.allow_source and upload.changes.source is not None: 824 ↛ 825line 824 didn't jump to line 825, because the condition on line 824 was never true

825 return False, "sourceful uploads are not allowed" 

826 binaries = upload.changes.binaries 

827 if len(binaries) != 0: 

828 if not acl.allow_binary: 828 ↛ 829line 828 didn't jump to line 829, because the condition on line 828 was never true

829 return False, "binary uploads are not allowed" 

830 if upload.changes.source is None and not acl.allow_binary_only: 830 ↛ 831line 830 didn't jump to line 831, because the condition on line 830 was never true

831 return False, "binary-only uploads are not allowed" 

832 if not acl.allow_binary_all: 832 ↛ 833line 832 didn't jump to line 833, because the condition on line 832 was never true

833 uploaded_arches = set(upload.changes.architectures) 

834 uploaded_arches.discard("source") 

835 allowed_arches = set(a.arch_string for a in acl.architectures) 

836 forbidden_arches = uploaded_arches - allowed_arches 

837 if len(forbidden_arches) != 0: 

838 return ( 

839 False, 

840 "uploads for architecture(s) {0} are not allowed".format( 

841 ", ".join(forbidden_arches) 

842 ), 

843 ) 

844 if not acl.allow_hijack: 

845 for suite in upload.final_suites: 

846 does_hijack, hijacked_binary, hijacked_from = self._does_hijack( 

847 session, upload, suite 

848 ) 

849 if does_hijack: 849 ↛ 850line 849 didn't jump to line 850, because the condition on line 849 was never true

850 return ( 

851 False, 

852 "hijacks are not allowed (binary={0}, other-source={1})".format( 

853 hijacked_binary, hijacked_from 

854 ), 

855 ) 

856 

857 acl_per_source = ( 

858 session.query(ACLPerSource) 

859 .filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name) 

860 .first() 

861 ) 

862 if acl.allow_per_source: 

863 if acl_per_source is None: 

864 return False, "not allowed to upload source package '{0}'".format( 

865 source_name 

866 ) 

867 if acl.deny_per_source and acl_per_source is not None: 867 ↛ 868line 867 didn't jump to line 868, because the condition on line 867 was never true

868 return ( 

869 False, 

870 acl_per_source.reason 

871 or "forbidden to upload source package '{0}'".format(source_name), 

872 ) 

873 

874 return True, None 

875 

876 def check(self, upload): 

877 session = upload.session 

878 fingerprint = upload.fingerprint 

879 keyring = fingerprint.keyring 

880 

881 if keyring is None: 881 ↛ 882line 881 didn't jump to line 882, because the condition on line 881 was never true

882 raise Reject( 

883 "No keyring for fingerprint {0}".format(fingerprint.fingerprint) 

884 ) 

885 if not keyring.active: 885 ↛ 886line 885 didn't jump to line 886, because the condition on line 885 was never true

886 raise Reject("Keyring {0} is not active".format(keyring.name)) 

887 

888 acl = fingerprint.acl or keyring.acl 

889 if acl is None: 889 ↛ 890line 889 didn't jump to line 890, because the condition on line 889 was never true

890 raise Reject("No ACL for fingerprint {0}".format(fingerprint.fingerprint)) 

891 result, reason = self._check_acl(session, upload, acl) 

892 if not result: 

893 raise RejectACL(acl, reason) 

894 

895 for acl in session.query(ACL).filter_by(is_global=True): 

896 result, reason = self._check_acl(session, upload, acl) 

897 if result is False: 897 ↛ 898line 897 didn't jump to line 898, because the condition on line 897 was never true

898 raise RejectACL(acl, reason) 

899 

900 return True 

901 

902 def per_suite_check(self, upload, suite): 

903 acls = suite.acls 

904 if len(acls) != 0: 904 ↛ 905line 904 didn't jump to line 905, because the condition on line 904 was never true

905 accept = False 

906 for acl in acls: 

907 result, reason = self._check_acl(upload.session, upload, acl) 

908 if result is False: 

909 raise Reject(reason) 

910 accept = accept or result 

911 if not accept: 

912 raise Reject( 

913 "Not accepted by any per-suite acl (suite={0})".format( 

914 suite.suite_name 

915 ) 

916 ) 

917 return True 

918 

919 

920class TransitionCheck(Check): 

921 """check for a transition""" 

922 

923 def check(self, upload): 

924 if not upload.changes.sourceful: 

925 return True 

926 

927 transitions = self.get_transitions() 

928 if transitions is None: 928 ↛ 931line 928 didn't jump to line 931, because the condition on line 928 was never false

929 return True 

930 

931 session = upload.session 

932 

933 control = upload.changes.changes 

934 source = re_field_source.match(control["Source"]).group("package") 

935 

936 for trans in transitions: 

937 t = transitions[trans] 

938 transition_source = t["source"] 

939 expected = t["new"] 

940 

941 # Will be None if nothing is in testing. 

942 current = get_source_in_suite(transition_source, "testing", session) 

943 if current is not None: 

944 compare = apt_pkg.version_compare(current.version, expected) 

945 

946 if current is None or compare < 0: 

947 # This is still valid, the current version in testing is older than 

948 # the new version we wait for, or there is none in testing yet 

949 

950 # Check if the source we look at is affected by this. 

951 if source in t["packages"]: 

952 # The source is affected, lets reject it. 

953 

954 rejectmsg = "{0}: part of the {1} transition.\n\n".format( 

955 source, trans 

956 ) 

957 

958 if current is not None: 

959 currentlymsg = "at version {0}".format(current.version) 

960 else: 

961 currentlymsg = "not present in testing" 

962 

963 rejectmsg += "Transition description: {0}\n\n".format(t["reason"]) 

964 

965 rejectmsg += "\n".join( 

966 textwrap.wrap( 

967 """Your package 

968is part of a testing transition designed to get {0} migrated (it is 

969currently {1}, we need version {2}). This transition is managed by the 

970Release Team, and {3} is the Release-Team member responsible for it. 

971Please mail debian-release@lists.debian.org or contact {3} directly if you 

972need further assistance. You might want to upload to experimental until this 

973transition is done.""".format( 

974 transition_source, currentlymsg, expected, t["rm"] 

975 ) 

976 ) 

977 ) 

978 

979 raise Reject(rejectmsg) 

980 

981 return True 

982 

983 def get_transitions(self): 

984 cnf = Config() 

985 path = cnf.get("Dinstall::ReleaseTransitions", "") 

986 if path == "" or not os.path.exists(path): 986 ↛ 989line 986 didn't jump to line 989, because the condition on line 986 was never false

987 return None 

988 

989 with open(path, "r") as fd: 

990 contents = fd.read() 

991 try: 

992 transitions = yaml.safe_load(contents) 

993 return transitions 

994 except yaml.YAMLError as msg: 

995 utils.warn( 

996 "Not checking transitions, the transitions file is broken: {0}".format( 

997 msg 

998 ) 

999 ) 

1000 

1001 return None 

1002 

1003 

1004class NoSourceOnlyCheck(Check): 

1005 def is_source_only_upload(self, upload) -> bool: 

1006 changes = upload.changes 

1007 if changes.source is not None and len(changes.binaries) == 0: 

1008 return True 

1009 return False 

1010 

1011 """Check for source-only upload 

1012 

1013 Source-only uploads are only allowed if Dinstall::AllowSourceOnlyUploads is 

1014 set. Otherwise they are rejected. 

1015 

1016 Source-only uploads are only accepted for source packages having a 

1017 Package-List field that also lists architectures per package. This 

1018 check can be disabled via 

1019 Dinstall::AllowSourceOnlyUploadsWithoutPackageList. 

1020 

1021 Source-only uploads to NEW are only allowed if 

1022 Dinstall::AllowSourceOnlyNew is set. 

1023 

1024 Uploads not including architecture-independent packages are only 

1025 allowed if Dinstall::AllowNoArchIndepUploads is set. 

1026 

1027 """ 

1028 

1029 def check(self, upload): 

1030 if not self.is_source_only_upload(upload): 

1031 return True 

1032 

1033 allow_source_only_uploads = Config().find_b("Dinstall::AllowSourceOnlyUploads") 

1034 allow_source_only_uploads_without_package_list = Config().find_b( 

1035 "Dinstall::AllowSourceOnlyUploadsWithoutPackageList" 

1036 ) 

1037 allow_source_only_new = Config().find_b("Dinstall::AllowSourceOnlyNew") 

1038 allow_source_only_new_keys = Config().value_list( 

1039 "Dinstall::AllowSourceOnlyNewKeys" 

1040 ) 

1041 allow_source_only_new_sources = Config().value_list( 

1042 "Dinstall::AllowSourceOnlyNewSources" 

1043 ) 

1044 allow_no_arch_indep_uploads = Config().find_b( 

1045 "Dinstall::AllowNoArchIndepUploads", True 

1046 ) 

1047 changes = upload.changes 

1048 

1049 if not allow_source_only_uploads: 1049 ↛ 1050line 1049 didn't jump to line 1050, because the condition on line 1049 was never true

1050 raise Reject("Source-only uploads are not allowed.") 

1051 if ( 1051 ↛ 1055line 1051 didn't jump to line 1055

1052 not allow_source_only_uploads_without_package_list 

1053 and changes.source.package_list.fallback 

1054 ): 

1055 raise Reject( 

1056 "Source-only uploads are only allowed if a Package-List field that also list architectures is included in the source package. dpkg (>= 1.17.7) includes this information." 

1057 ) 

1058 if ( 1058 ↛ 1064line 1058 didn't jump to line 1064

1059 not allow_source_only_new 

1060 and upload.new 

1061 and changes.primary_fingerprint not in allow_source_only_new_keys 

1062 and changes.source_name not in allow_source_only_new_sources 

1063 ): 

1064 raise Reject("Source-only uploads to NEW are not allowed.") 

1065 

1066 if ( 

1067 "all" not in changes.architectures 

1068 and changes.source.package_list.has_arch_indep_packages() 

1069 ): 

1070 if not allow_no_arch_indep_uploads: 1070 ↛ 1071line 1070 didn't jump to line 1071, because the condition on line 1070 was never true

1071 raise Reject("Uploads must include architecture-independent packages.") 

1072 

1073 return True 

1074 

1075 

1076class NewOverrideCheck(Check): 

1077 """Override NEW requirement""" 

1078 

1079 def check(self, upload): 

1080 if not upload.new: 

1081 return True 

1082 

1083 new_override_keys = Config().value_list("Dinstall::NewOverrideKeys") 

1084 changes = upload.changes 

1085 

1086 if changes.primary_fingerprint in new_override_keys: 1086 ↛ 1087line 1086 didn't jump to line 1087, because the condition on line 1086 was never true

1087 upload.new = False 

1088 

1089 return True 

1090 

1091 

1092class ArchAllBinNMUCheck(Check): 

1093 """Check for arch:all binNMUs""" 

1094 

1095 def check(self, upload): 

1096 changes = upload.changes 

1097 

1098 if ( 1098 ↛ 1102line 1098 didn't jump to line 1102

1099 "all" in changes.architectures 

1100 and changes.changes.get("Binary-Only") == "yes" 

1101 ): 

1102 raise Reject("arch:all binNMUs are not allowed.") 

1103 

1104 return True 

1105 

1106 

1107class LintianCheck(Check): 

1108 """Check package using lintian""" 

1109 

1110 def check(self, upload): 

1111 changes = upload.changes 

1112 

1113 # Only check sourceful uploads. 

1114 if changes.source is None: 

1115 return True 

1116 # Only check uploads to unstable or experimental. 

1117 if ( 1117 ↛ 1121line 1117 didn't jump to line 1121

1118 "unstable" not in changes.distributions 

1119 and "experimental" not in changes.distributions 

1120 ): 

1121 return True 

1122 

1123 cnf = Config() 

1124 if "Dinstall::LintianTags" not in cnf: 

1125 return True 

1126 tagfile = cnf["Dinstall::LintianTags"] 

1127 

1128 with open(tagfile, "r") as sourcefile: 

1129 sourcecontent = sourcefile.read() 

1130 try: 

1131 lintiantags = yaml.safe_load(sourcecontent)["lintian"] 

1132 except yaml.YAMLError as msg: 

1133 raise Exception( 

1134 "Could not read lintian tags file {0}, YAML error: {1}".format( 

1135 tagfile, msg 

1136 ) 

1137 ) 

1138 

1139 with tempfile.NamedTemporaryFile(mode="w+t") as temptagfile: 

1140 os.fchmod(temptagfile.fileno(), 0o644) 

1141 for tags in lintiantags.values(): 

1142 for tag in tags: 

1143 print(tag, file=temptagfile) 

1144 temptagfile.flush() 

1145 

1146 changespath = os.path.join(upload.directory, changes.filename) 

1147 

1148 cmd = [] 

1149 user = cnf.get("Dinstall::UnprivUser") or None 

1150 if user is not None: 1150 ↛ 1151line 1150 didn't jump to line 1151, because the condition on line 1150 was never true

1151 cmd.extend(["sudo", "-H", "-u", user]) 

1152 cmd.extend( 

1153 [ 

1154 "/usr/bin/lintian", 

1155 "--show-overrides", 

1156 "--tags-from-file", 

1157 temptagfile.name, 

1158 changespath, 

1159 ] 

1160 ) 

1161 process = subprocess.run( 

1162 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8" 

1163 ) 

1164 output = process.stdout 

1165 result = process.returncode 

1166 

1167 if result == 2: 1167 ↛ 1168line 1167 didn't jump to line 1168, because the condition on line 1167 was never true

1168 utils.warn( 

1169 "lintian failed for %s [return code: %s]." % (changespath, result) 

1170 ) 

1171 utils.warn(utils.prefix_multi_line_string(output, " [possible output:] ")) 

1172 

1173 parsed_tags = lintian.parse_lintian_output(output) 

1174 rejects = list(lintian.generate_reject_messages(parsed_tags, lintiantags)) 

1175 if len(rejects) != 0: 1175 ↛ 1176line 1175 didn't jump to line 1176, because the condition on line 1175 was never true

1176 raise Reject("\n".join(rejects)) 

1177 

1178 return True 

1179 

1180 

1181class SourceFormatCheck(Check): 

1182 """Check source format is allowed in the target suite""" 

1183 

1184 def per_suite_check(self, upload, suite): 

1185 source = upload.changes.source 

1186 session = upload.session 

1187 if source is None: 

1188 return True 

1189 

1190 source_format = source.dsc["Format"] 

1191 query = ( 

1192 session.query(SrcFormat) 

1193 .filter_by(format_name=source_format) 

1194 .filter(SrcFormat.suites.contains(suite)) 

1195 ) 

1196 if query.first() is None: 

1197 raise Reject( 

1198 "source format {0} is not allowed in suite {1}".format( 

1199 source_format, suite.suite_name 

1200 ) 

1201 ) 

1202 

1203 

1204class SuiteCheck(Check): 

1205 def per_suite_check(self, upload, suite): 

1206 if not suite.accept_source_uploads and upload.changes.source is not None: 1206 ↛ 1207line 1206 didn't jump to line 1207, because the condition on line 1206 was never true

1207 raise Reject( 

1208 'The suite "{0}" does not accept source uploads.'.format( 

1209 suite.suite_name 

1210 ) 

1211 ) 

1212 if not suite.accept_binary_uploads and len(upload.changes.binaries) != 0: 1212 ↛ 1213line 1212 didn't jump to line 1213, because the condition on line 1212 was never true

1213 raise Reject( 

1214 'The suite "{0}" does not accept binary uploads.'.format( 

1215 suite.suite_name 

1216 ) 

1217 ) 

1218 return True 

1219 

1220 

1221class SuiteArchitectureCheck(Check): 

1222 def per_suite_check(self, upload, suite): 

1223 session = upload.session 

1224 for arch in upload.changes.architectures: 

1225 query = ( 

1226 session.query(Architecture) 

1227 .filter_by(arch_string=arch) 

1228 .filter(Architecture.suites.contains(suite)) 

1229 ) 

1230 if query.first() is None: 

1231 raise Reject( 

1232 "Architecture {0} is not allowed in suite {1}".format( 

1233 arch, suite.suite_name 

1234 ) 

1235 ) 

1236 

1237 return True 

1238 

1239 

1240class VersionCheck(Check): 

1241 """Check version constraints""" 

1242 

1243 def _highest_source_version(self, session, source_name, suite): 

1244 db_source = ( 

1245 session.query(DBSource) 

1246 .filter_by(source=source_name) 

1247 .filter(DBSource.suites.contains(suite)) 

1248 .order_by(DBSource.version.desc()) 

1249 .first() 

1250 ) 

1251 if db_source is None: 

1252 return None 

1253 else: 

1254 return db_source.version 

1255 

1256 def _highest_binary_version(self, session, binary_name, suite, architecture): 

1257 db_binary = ( 

1258 session.query(DBBinary) 

1259 .filter_by(package=binary_name) 

1260 .filter(DBBinary.suites.contains(suite)) 

1261 .join(DBBinary.architecture) 

1262 .filter(Architecture.arch_string.in_(["all", architecture])) 

1263 .order_by(DBBinary.version.desc()) 

1264 .first() 

1265 ) 

1266 if db_binary is None: 

1267 return None 

1268 else: 

1269 return db_binary.version 

1270 

1271 def _version_checks(self, upload, suite, other_suite, op, op_name): 

1272 session = upload.session 

1273 

1274 if upload.changes.source is not None: 

1275 source_name = upload.changes.source.dsc["Source"] 

1276 source_version = upload.changes.source.dsc["Version"] 

1277 v = self._highest_source_version(session, source_name, other_suite) 

1278 if v is not None and not op(version_compare(source_version, v)): 1278 ↛ 1279line 1278 didn't jump to line 1279, because the condition on line 1278 was never true

1279 raise Reject( 

1280 "Version check failed:\n" 

1281 "Your upload included the source package {0}, version {1},\n" 

1282 "however {3} already has version {2}.\n" 

1283 "Uploads to {5} must have a {4} version than present in {3}.".format( 

1284 source_name, 

1285 source_version, 

1286 v, 

1287 other_suite.suite_name, 

1288 op_name, 

1289 suite.suite_name, 

1290 ) 

1291 ) 

1292 

1293 for binary in upload.changes.binaries: 

1294 binary_name = binary.control["Package"] 

1295 binary_version = binary.control["Version"] 

1296 architecture = binary.control["Architecture"] 

1297 v = self._highest_binary_version( 

1298 session, binary_name, other_suite, architecture 

1299 ) 

1300 if v is not None and not op(version_compare(binary_version, v)): 1300 ↛ 1301line 1300 didn't jump to line 1301, because the condition on line 1300 was never true

1301 raise Reject( 

1302 "Version check failed:\n" 

1303 "Your upload included the binary package {0}, version {1}, for {2},\n" 

1304 "however {4} already has version {3}.\n" 

1305 "Uploads to {6} must have a {5} version than present in {4}.".format( 

1306 binary_name, 

1307 binary_version, 

1308 architecture, 

1309 v, 

1310 other_suite.suite_name, 

1311 op_name, 

1312 suite.suite_name, 

1313 ) 

1314 ) 

1315 

1316 def per_suite_check(self, upload, suite): 

1317 session = upload.session 

1318 

1319 vc_newer = ( 

1320 session.query(dbconn.VersionCheck) 

1321 .filter_by(suite=suite) 

1322 .filter(dbconn.VersionCheck.check.in_(["MustBeNewerThan", "Enhances"])) 

1323 ) 

1324 must_be_newer_than = [vc.reference for vc in vc_newer] 

1325 # Must be newer than old versions in `suite` 

1326 must_be_newer_than.append(suite) 

1327 

1328 for s in must_be_newer_than: 

1329 self._version_checks(upload, suite, s, lambda result: result > 0, "higher") 

1330 

1331 vc_older = session.query(dbconn.VersionCheck).filter_by( 

1332 suite=suite, check="MustBeOlderThan" 

1333 ) 

1334 must_be_older_than = [vc.reference for vc in vc_older] 

1335 

1336 for s in must_be_older_than: 1336 ↛ 1337line 1336 didn't jump to line 1337, because the loop on line 1336 never started

1337 self._version_checks(upload, suite, s, lambda result: result < 0, "lower") 

1338 

1339 return True 

1340 

1341 @property 

1342 def forcable(self) -> bool: 

1343 return True