Coverage for daklib/upload.py: 86%

332 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""module to handle uploads not yet installed to the archive 

18 

19This module provides classes to handle uploads not yet installed to the 

20archive. Central is the :class:`Changes` class which represents a changes file. 

21It provides methods to access the included binary and source packages. 

22""" 

23 

24import errno 

25import functools 

26import os 

27from collections.abc import Collection, Mapping 

28from typing import TYPE_CHECKING, Optional, override 

29 

30import apt_inst 

31import apt_pkg 

32 

33import daklib.dakapt 

34import daklib.packagelist 

35from daklib.aptversion import AptVersion 

36from daklib.gpg import SignedFile 

37from daklib.regexes import ( 

38 re_field_source, 

39 re_file_binary, 

40 re_file_buildinfo, 

41 re_file_dsc, 

42 re_file_safe, 

43 re_file_source, 

44 re_file_source_tag2upload, 

45) 

46 

47if TYPE_CHECKING: 

48 import datetime 

49 import re 

50 

51 

52class UploadException(Exception): 

53 pass 

54 

55 

56class InvalidChangesException(UploadException): 

57 pass 

58 

59 

60class InvalidBinaryException(UploadException): 

61 pass 

62 

63 

64class InvalidSourceException(UploadException): 

65 pass 

66 

67 

68class InvalidHashException(UploadException): 

69 def __init__( 

70 self, filename: str, hash_name: str, expected: str | int, actual: str | int 

71 ): 

72 self.filename = filename 

73 self.hash_name = hash_name 

74 self.expected = expected 

75 self.actual = actual 

76 

77 @override 

78 def __str__(self): 

79 return ( 

80 "Invalid {0} hash for {1}:\n" 

81 "According to the control file the {0} hash should be {2},\n" 

82 "but {1} has {3}.\n" 

83 "\n" 

84 "If you did not include {1} in your upload, a different version\n" 

85 "might already be known to the archive software." 

86 ).format(self.hash_name, self.filename, self.expected, self.actual) 

87 

88 

89class FileDoesNotExist(UploadException): 

90 def __init__(self, filename: str): 

91 self.filename = filename 

92 

93 @override 

94 def __str__(self): 

95 return "Refers to non-existing file '{0}'".format(self.filename) 

96 

97 

98class HashedFile: 

99 """file with checksums""" 

100 

101 def __init__( 

102 self, 

103 filename: str, 

104 size: int, 

105 md5sum: str, 

106 sha1sum: str, 

107 sha256sum: str, 

108 section: Optional[str] = None, 

109 priority: Optional[str] = None, 

110 input_filename: Optional[str] = None, 

111 ): 

112 self.filename: str = filename 

113 """name of the file""" 

114 

115 if input_filename is None: 115 ↛ 117line 115 didn't jump to line 117 because the condition on line 115 was always true

116 input_filename = filename 

117 self.input_filename: str = input_filename 

118 """name of the file on disk 

119 

120 Used for temporary files that should not be installed using their on-disk name. 

121 """ 

122 

123 self.size: int = size 

124 """size in bytes""" 

125 

126 self.md5sum: str = md5sum 

127 """MD5 hash in hexdigits""" 

128 

129 self.sha1sum: str = sha1sum 

130 """SHA1 hash in hexdigits""" 

131 

132 self.sha256sum: str = sha256sum 

133 """SHA256 hash in hexdigits""" 

134 

135 self.section: Optional[str] = section 

136 """section or :const:`None`""" 

137 

138 self.priority: Optional[str] = priority 

139 """priority or :const:`None`""" 

140 

141 @classmethod 

142 def from_file( 

143 cls, 

144 directory: str, 

145 filename: str, 

146 section: Optional[str] = None, 

147 priority: Optional[str] = None, 

148 ) -> "HashedFile": 

149 """create with values for an existing file 

150 

151 Create a :class:`HashedFile` object that refers to an already existing file. 

152 

153 :param directory: directory the file is located in 

154 :param filename: filename 

155 :param section: optional section as given in .changes files 

156 :param priority: optional priority as given in .changes files 

157 :return: :class:`HashedFile` object for the given file 

158 """ 

159 path = os.path.join(directory, filename) 

160 with open(path, "r") as fh: 

161 size = os.fstat(fh.fileno()).st_size 

162 hashes = daklib.dakapt.DakHashes(fh) 

163 return cls( 

164 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority 

165 ) 

166 

167 def check(self, directory: str) -> None: 

168 """Validate hashes 

169 

170 Check if size and hashes match the expected value. 

171 

172 :param directory: directory the file is located in 

173 :raises InvalidHashException: if there is a hash mismatch 

174 """ 

175 path = os.path.join(directory, self.input_filename) 

176 try: 

177 with open(path) as fh: 

178 self.check_fh(fh) 

179 except OSError as e: 

180 if e.errno == errno.ENOENT: 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true

181 raise FileDoesNotExist(self.input_filename) 

182 raise 

183 

184 def check_fh(self, fh) -> None: 

185 size = os.fstat(fh.fileno()).st_size 

186 fh.seek(0) 

187 hashes = daklib.dakapt.DakHashes(fh) 

188 

189 if size != self.size: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 raise InvalidHashException(self.filename, "size", self.size, size) 

191 

192 if hashes.md5 != self.md5sum: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5) 

194 

195 if hashes.sha1 != self.sha1sum: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 raise InvalidHashException( 

197 self.filename, "sha1sum", self.sha1sum, hashes.sha1 

198 ) 

199 

200 if hashes.sha256 != self.sha256sum: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 raise InvalidHashException( 

202 self.filename, "sha256sum", self.sha256sum, hashes.sha256 

203 ) 

204 

205 

206def parse_file_list( 

207 control: Mapping[str, str], 

208 has_priority_and_section: bool, 

209 safe_file_regexp: "re.Pattern" = re_file_safe, 

210 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"), 

211) -> dict[str, HashedFile]: 

212 """Parse Files and Checksums-* fields 

213 

214 :param control: control file to take fields from 

215 :param has_priority_and_section: Files field include section and priority 

216 (as in .changes) 

217 :return: dict mapping filenames to :class:`HashedFile` objects 

218 

219 :raises InvalidChangesException: missing fields or other grave errors 

220 """ 

221 entries: dict[str, dict[str, str | int]] = {} 

222 entry: dict[str, str | int] | None 

223 

224 for line in control.get(fields[0], "").split("\n"): 

225 if len(line) == 0: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true

226 continue 

227 

228 if has_priority_and_section: 

229 (md5sum, size, section, priority, filename) = line.split() 

230 entry = dict( 

231 md5sum=md5sum, 

232 size=int(size), 

233 section=section, 

234 priority=priority, 

235 filename=filename, 

236 ) 

237 else: 

238 (md5sum, size, filename) = line.split() 

239 entry = dict(md5sum=md5sum, size=int(size), filename=filename) 

240 

241 entries[filename] = entry 

242 

243 for line in control.get(fields[1], "").split("\n"): 

244 if len(line) == 0: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true

245 continue 

246 (sha1sum, size, filename) = line.split() 

247 entry = entries.get(filename) 

248 if entry is None: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 raise InvalidChangesException( 

250 "{0} is listed in {1}, but not in {2}.".format( 

251 filename, fields[1], fields[0] 

252 ) 

253 ) 

254 if entry is not None and entry.get("size", None) != int(size): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 raise InvalidChangesException( 

256 "Size for {0} in {1} and {2} fields differ.".format( 

257 filename, fields[0], fields[1] 

258 ) 

259 ) 

260 entry["sha1sum"] = sha1sum 

261 

262 for line in control.get(fields[2], "").split("\n"): 

263 if len(line) == 0: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 continue 

265 (sha256sum, size, filename) = line.split() 

266 entry = entries.get(filename) 

267 if entry is None: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 raise InvalidChangesException( 

269 "{0} is listed in {1}, but not in {2}.".format( 

270 filename, fields[2], fields[0] 

271 ) 

272 ) 

273 if entry is not None and entry.get("size", None) != int(size): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 raise InvalidChangesException( 

275 "Size for {0} in {1} and {2} fields differ.".format( 

276 filename, fields[0], fields[2] 

277 ) 

278 ) 

279 entry["sha256sum"] = sha256sum 

280 

281 files = {} 

282 for filename, entry in entries.items(): 

283 if "size" not in entry: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 raise InvalidChangesException("No size for {0}.".format(filename)) 

285 if "md5sum" not in entry: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 raise InvalidChangesException("No md5sum for {0}.".format(filename)) 

287 if "sha1sum" not in entry: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 raise InvalidChangesException("No sha1sum for {0}.".format(filename)) 

289 if "sha256sum" not in entry: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 raise InvalidChangesException("No sha256sum for {0}.".format(filename)) 

291 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 raise InvalidChangesException( 

293 f"References file with unsafe filename '{filename}'." 

294 ) 

295 files[filename] = HashedFile(**entry) # type: ignore[arg-type] 

296 

297 return files 

298 

299 

300def _mangle_source_name(source: str) -> str: 

301 """mangle source name for comparison 

302 

303 This is a hack to ensure "grub-efi-*-signed" is processed before 

304 "grub2". 

305 """ 

306 

307 if source == "grub2": 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true

308 return "grub" 

309 return source 

310 

311 

312@functools.total_ordering 

313class Changes: 

314 """Representation of a .changes file""" 

315 

316 def __init__( 

317 self, 

318 directory: str, 

319 filename: str, 

320 keyrings: Collection[str], 

321 require_signature: bool = True, 

322 ): 

323 if not re_file_safe.match(filename): 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 raise InvalidChangesException("{0}: unsafe filename".format(filename)) 

325 

326 self.directory: str = directory 

327 """directory the .changes is located in""" 

328 

329 self.filename: str = filename 

330 """name of the .changes file""" 

331 

332 with open(self.path, "rb") as fd: 

333 data = fd.read() 

334 self.signature = SignedFile(data, keyrings, require_signature) 

335 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents) 

336 """dict to access fields of the .changes file""" 

337 

338 self._binaries: "Optional[list[Binary]]" = None 

339 self._source: "Optional[Source]" = None 

340 self._files: Optional[dict[str, HashedFile]] = None 

341 self._keyrings = keyrings 

342 self._require_signature: bool = require_signature 

343 

344 @property 

345 def path(self) -> str: 

346 """path to the .changes file""" 

347 return os.path.join(self.directory, self.filename) 

348 

349 @property 

350 def primary_fingerprint(self) -> str: 

351 """fingerprint of the key used for signing the .changes file""" 

352 return self.signature.primary_fingerprint 

353 

354 @property 

355 def valid_signature(self) -> bool: 

356 """:const:`True` if the .changes has a valid signature""" 

357 return self.signature.valid 

358 

359 @property 

360 def weak_signature(self) -> bool: 

361 """:const:`True` if the .changes was signed using a weak algorithm""" 

362 return self.signature.weak_signature 

363 

364 @property 

365 def signature_timestamp(self) -> "datetime.datetime": 

366 return self.signature.signature_timestamp 

367 

368 @property 

369 def contents_sha1(self) -> str: 

370 return self.signature.contents_sha1 

371 

372 @property 

373 def architectures(self) -> list[str]: 

374 """list of architectures included in the upload""" 

375 return self.changes.get("Architecture", "").split() 

376 

377 @property 

378 def distributions(self) -> list[str]: 

379 """list of target distributions for the upload""" 

380 return self.changes["Distribution"].split() 

381 

382 @property 

383 def source(self) -> "Optional[Source]": 

384 """included source or :const:`None`""" 

385 if self._source is None: 

386 source_files = [] 

387 for f in self.files.values(): 

388 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename): 

389 source_files.append(f) 

390 if len(source_files) > 0: 

391 self._source = Source( 

392 self.directory, 

393 source_files, 

394 self._keyrings, 

395 self._require_signature, 

396 ) 

397 return self._source 

398 

399 @property 

400 def source_tag2upload_files(self) -> list[HashedFile]: 

401 """ 

402 extra source files 

403 """ 

404 return [ 

405 f 

406 for f in self.files.values() 

407 if re_file_source_tag2upload.match(f.filename) 

408 ] 

409 

410 @property 

411 def sourceful(self) -> bool: 

412 """:const:`True` if the upload includes source""" 

413 return "source" in self.architectures 

414 

415 @property 

416 def source_name(self) -> str: 

417 """source package name""" 

418 m = re_field_source.match(self.changes["Source"]) 

419 assert m is not None 

420 return m.group("package") 

421 

422 @property 

423 def binaries(self) -> "list[Binary]": 

424 """included binary packages""" 

425 if self._binaries is None: 

426 self._binaries = [ 

427 Binary(self.directory, f) 

428 for f in self.files.values() 

429 if re_file_binary.match(f.filename) 

430 ] 

431 return self._binaries 

432 

433 @property 

434 def byhand_files(self) -> list[HashedFile]: 

435 """included byhand files""" 

436 byhand = [] 

437 

438 for f in self.files.values(): 

439 assert f.section is not None 

440 if f.section == "byhand" or f.section.startswith("raw-"): 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 byhand.append(f) 

442 continue 

443 if ( 

444 re_file_dsc.match(f.filename) 

445 or re_file_source.match(f.filename) 

446 or re_file_binary.match(f.filename) 

447 ): 

448 continue 

449 if re_file_buildinfo.match(f.filename): 449 ↛ 452line 449 didn't jump to line 452 because the condition on line 449 was always true

450 continue 

451 

452 raise InvalidChangesException( 

453 "{0}: {1} looks like a byhand package, but is in section {2}".format( 

454 self.filename, f.filename, f.section 

455 ) 

456 ) 

457 

458 return byhand 

459 

460 @property 

461 def buildinfo_files(self) -> list[HashedFile]: 

462 """included buildinfo files""" 

463 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)] 

464 

465 @property 

466 def binary_names(self) -> list[str]: 

467 """names of included binary packages""" 

468 return self.changes.get("Binary", "").split() 

469 

470 @property 

471 def closed_bugs(self) -> list[str]: 

472 """bugs closed by this upload""" 

473 return self.changes.get("Closes", "").split() 

474 

475 @property 

476 def files(self) -> dict[str, HashedFile]: 

477 """dict mapping filenames to :class:`HashedFile` objects""" 

478 if self._files is None: 

479 self._files = parse_file_list(self.changes, True) 

480 return self._files 

481 

482 @property 

483 def bytes(self) -> int: 

484 """total size of files included in this upload in bytes""" 

485 return sum(f.size for f in self.files.values()) 

486 

487 def _key(self) -> tuple[str, AptVersion, bool, str]: 

488 """tuple used to compare two changes files 

489 

490 We sort by source name and version first. If these are identical, 

491 we sort changes that include source before those without source (so 

492 that sourceful uploads get processed first), and finally fall back 

493 to the filename (this should really never happen). 

494 """ 

495 return ( 

496 _mangle_source_name(self.changes.get("Source", "")), 

497 AptVersion(self.changes.get("Version", "")), 

498 not self.sourceful, 

499 self.filename, 

500 ) 

501 

502 @override 

503 def __eq__(self, other: object) -> bool: 

504 if not isinstance(other, Changes): 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true

505 return NotImplemented 

506 return self._key() == other._key() 

507 

508 def __lt__(self, other: "Changes") -> bool: 

509 return self._key() < other._key() 

510 

511 

512class Binary: 

513 """Representation of a binary package""" 

514 

515 def __init__(self, directory: str, hashed_file: HashedFile): 

516 self.hashed_file: HashedFile = hashed_file 

517 """file object for the .deb""" 

518 

519 path = os.path.join(directory, hashed_file.input_filename) 

520 data = apt_inst.DebFile(path).control.extractdata("control") 

521 

522 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data) 

523 """dict to access fields in DEBIAN/control""" 

524 

525 @classmethod 

526 def from_file(cls, directory, filename) -> "Binary": 

527 hashed_file = HashedFile.from_file(directory, filename) 

528 return cls(directory, hashed_file) 

529 

530 @property 

531 def source(self) -> tuple[str, str]: 

532 """get tuple with source package name and version""" 

533 source = self.control.get("Source", None) 

534 if source is None: 

535 return (self.control["Package"], self.control["Version"]) 

536 match = re_field_source.match(source) 

537 if not match: 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true

538 raise InvalidBinaryException( 

539 "{0}: Invalid Source field.".format(self.hashed_file.filename) 

540 ) 

541 version = match.group("version") 

542 if version is None: 

543 version = self.control["Version"] 

544 return (match.group("package"), version) 

545 

546 @property 

547 def name(self) -> str: 

548 return self.control["Package"] 

549 

550 @property 

551 def type(self) -> str: 

552 """package type ('deb' or 'udeb')""" 

553 match = re_file_binary.match(self.hashed_file.filename) 

554 if not match: 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true

555 raise InvalidBinaryException( 

556 "{0}: Does not match re_file_binary".format(self.hashed_file.filename) 

557 ) 

558 return match.group("type") 

559 

560 @property 

561 def component(self) -> str: 

562 """component name""" 

563 fields = self.control["Section"].split("/") 

564 if len(fields) > 1: 

565 return fields[0] 

566 return "main" 

567 

568 

569class Source: 

570 """Representation of a source package""" 

571 

572 def __init__( 

573 self, 

574 directory: str, 

575 hashed_files: list[HashedFile], 

576 keyrings: Collection[str], 

577 require_signature=True, 

578 ): 

579 self.hashed_files: list[HashedFile] = hashed_files 

580 """list of source files (including the .dsc itself)""" 

581 

582 dsc_file = None 

583 for f in hashed_files: 

584 if re_file_dsc.match(f.filename): 

585 if dsc_file is not None: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 raise InvalidSourceException( 

587 "Multiple .dsc found ({0} and {1})".format( 

588 self._dsc_file.filename, f.filename 

589 ) 

590 ) 

591 else: 

592 dsc_file = f 

593 

594 if dsc_file is None: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 raise InvalidSourceException("No .dsc included in source files") 

596 self._dsc_file: HashedFile = dsc_file 

597 

598 # make sure the hash for the dsc is valid before we use it 

599 self._dsc_file.check(directory) 

600 

601 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename) 

602 with open(dsc_file_path, "rb") as fd: 

603 data = fd.read() 

604 self.signature = SignedFile(data, keyrings, require_signature) 

605 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents) 

606 """dict to access fields in the .dsc file""" 

607 

608 self.package_list: daklib.packagelist.PackageList = ( 

609 daklib.packagelist.PackageList(self.dsc) 

610 ) 

611 """Information about packages built by the source.""" 

612 

613 self._files: Optional[dict[str, HashedFile]] = None 

614 

615 @classmethod 

616 def from_file( 

617 cls, 

618 directory: str, 

619 filename: str, 

620 keyrings: Collection[str], 

621 require_signature=True, 

622 ) -> "Source": 

623 hashed_file = HashedFile.from_file(directory, filename) 

624 return cls(directory, [hashed_file], keyrings, require_signature) 

625 

626 @property 

627 def files(self) -> dict[str, HashedFile]: 

628 """dict mapping filenames to :class:`HashedFile` objects for additional source files 

629 

630 This list does not include the .dsc itself. 

631 """ 

632 if self._files is None: 

633 self._files = parse_file_list(self.dsc, False) 

634 return self._files 

635 

636 @property 

637 def primary_fingerprint(self) -> str: 

638 """fingerprint of the key used to sign the .dsc""" 

639 return self.signature.primary_fingerprint 

640 

641 @property 

642 def valid_signature(self) -> bool: 

643 """:const:`True` if the .dsc has a valid signature""" 

644 return self.signature.valid 

645 

646 @property 

647 def weak_signature(self) -> bool: 

648 """:const:`True` if the .dsc was signed using a weak algorithm""" 

649 return self.signature.weak_signature 

650 

651 @property 

652 def component(self) -> str: 

653 """guessed component name 

654 

655 Might be wrong. Don't rely on this. 

656 """ 

657 if "Section" not in self.dsc: 657 ↛ 659line 657 didn't jump to line 659 because the condition on line 657 was always true

658 return "main" 

659 fields = self.dsc["Section"].split("/") 

660 if len(fields) > 1: 

661 return fields[0] 

662 return "main" 

663 

664 @property 

665 def filename(self) -> str: 

666 """filename of .dsc file""" 

667 return self._dsc_file.filename