Coverage for daklib/upload.py: 86%

334 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""module to handle uploads not yet installed to the archive 

18 

19This module provides classes to handle uploads not yet installed to the 

20archive. Central is the :class:`Changes` class which represents a changes file. 

21It provides methods to access the included binary and source packages. 

22""" 

23 

24import errno 

25import functools 

26import os 

27from collections.abc import Collection, Mapping 

28from typing import TYPE_CHECKING, Optional, override 

29 

30import apt_inst 

31import apt_pkg 

32 

33import daklib.dakapt 

34import daklib.packagelist 

35from daklib.aptversion import AptVersion 

36from daklib.gpg import SignedFile 

37from daklib.regexes import ( 

38 re_field_source, 

39 re_file_binary, 

40 re_file_buildinfo, 

41 re_file_dsc, 

42 re_file_safe, 

43 re_file_source, 

44 re_file_source_tag2upload, 

45) 

46 

47if TYPE_CHECKING: 

48 import datetime 

49 import re 

50 

51 

52class UploadException(Exception): 

53 pass 

54 

55 

56class InvalidChangesException(UploadException): 

57 pass 

58 

59 

60class InvalidBinaryException(UploadException): 

61 pass 

62 

63 

64class InvalidSourceException(UploadException): 

65 pass 

66 

67 

68class InvalidHashException(UploadException): 

69 def __init__( 

70 self, filename: str, hash_name: str, expected: str | int, actual: str | int 

71 ): 

72 self.filename = filename 

73 self.hash_name = hash_name 

74 self.expected = expected 

75 self.actual = actual 

76 

77 @override 

78 def __str__(self): 

79 return ( 

80 "Invalid {0} hash for {1}:\n" 

81 "According to the control file the {0} hash should be {2},\n" 

82 "but {1} has {3}.\n" 

83 "\n" 

84 "If you did not include {1} in your upload, a different version\n" 

85 "might already be known to the archive software." 

86 ).format(self.hash_name, self.filename, self.expected, self.actual) 

87 

88 

89class InvalidFilenameException(UploadException): 

90 def __init__(self, filename: str): 

91 self.filename: str = filename 

92 

93 @override 

94 def __str__(self): 

95 return "Invalid filename '{0}'.".format(self.filename) 

96 

97 

98class FileDoesNotExist(UploadException): 

99 def __init__(self, filename: str): 

100 self.filename = filename 

101 

102 @override 

103 def __str__(self): 

104 return "Refers to non-existing file '{0}'".format(self.filename) 

105 

106 

107class HashedFile: 

108 """file with checksums""" 

109 

110 def __init__( 

111 self, 

112 filename: str, 

113 size: int, 

114 md5sum: str, 

115 sha1sum: str, 

116 sha256sum: str, 

117 section: Optional[str] = None, 

118 priority: Optional[str] = None, 

119 input_filename: Optional[str] = None, 

120 ): 

121 self.filename: str = filename 

122 """name of the file""" 

123 

124 if input_filename is None: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true

125 input_filename = filename 

126 self.input_filename: str = input_filename 

127 """name of the file on disk 

128 

129 Used for temporary files that should not be installed using their on-disk name. 

130 """ 

131 

132 self.size: int = size 

133 """size in bytes""" 

134 

135 self.md5sum: str = md5sum 

136 """MD5 hash in hexdigits""" 

137 

138 self.sha1sum: str = sha1sum 

139 """SHA1 hash in hexdigits""" 

140 

141 self.sha256sum: str = sha256sum 

142 """SHA256 hash in hexdigits""" 

143 

144 self.section: Optional[str] = section 

145 """section or :const:`None`""" 

146 

147 self.priority: Optional[str] = priority 

148 """priority or :const:`None`""" 

149 

150 @classmethod 

151 def from_file( 

152 cls, 

153 directory: str, 

154 filename: str, 

155 section: Optional[str] = None, 

156 priority: Optional[str] = None, 

157 ) -> "HashedFile": 

158 """create with values for an existing file 

159 

160 Create a :class:`HashedFile` object that refers to an already existing file. 

161 

162 :param directory: directory the file is located in 

163 :param filename: filename 

164 :param section: optional section as given in .changes files 

165 :param priority: optional priority as given in .changes files 

166 :return: :class:`HashedFile` object for the given file 

167 """ 

168 path = os.path.join(directory, filename) 

169 with open(path, "r") as fh: 

170 size = os.fstat(fh.fileno()).st_size 

171 hashes = daklib.dakapt.DakHashes(fh) 

172 return cls( 

173 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority 

174 ) 

175 

176 def check(self, directory: str) -> None: 

177 """Validate hashes 

178 

179 Check if size and hashes match the expected value. 

180 

181 :param directory: directory the file is located in 

182 :raises InvalidHashException: if there is a hash mismatch 

183 """ 

184 path = os.path.join(directory, self.input_filename) 

185 try: 

186 with open(path) as fh: 

187 self.check_fh(fh) 

188 except OSError as e: 

189 if e.errno == errno.ENOENT: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 raise FileDoesNotExist(self.input_filename) 

191 raise 

192 

193 def check_fh(self, fh) -> None: 

194 size = os.fstat(fh.fileno()).st_size 

195 fh.seek(0) 

196 hashes = daklib.dakapt.DakHashes(fh) 

197 

198 if size != self.size: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 raise InvalidHashException(self.filename, "size", self.size, size) 

200 

201 if hashes.md5 != self.md5sum: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5) 

203 

204 if hashes.sha1 != self.sha1sum: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 raise InvalidHashException( 

206 self.filename, "sha1sum", self.sha1sum, hashes.sha1 

207 ) 

208 

209 if hashes.sha256 != self.sha256sum: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise InvalidHashException( 

211 self.filename, "sha256sum", self.sha256sum, hashes.sha256 

212 ) 

213 

214 

215def parse_file_list( 

216 control: Mapping[str, str], 

217 has_priority_and_section: bool, 

218 safe_file_regexp: "re.Pattern" = re_file_safe, 

219 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"), 

220) -> dict[str, HashedFile]: 

221 """Parse Files and Checksums-* fields 

222 

223 :param control: control file to take fields from 

224 :param has_priority_and_section: Files field include section and priority 

225 (as in .changes) 

226 :return: dict mapping filenames to :class:`HashedFile` objects 

227 

228 :raises InvalidChangesException: missing fields or other grave errors 

229 """ 

230 entries: dict[str, dict[str, str | int]] = {} 

231 entry: dict[str, str | int] | None 

232 

233 for line in control.get(fields[0], "").split("\n"): 

234 if len(line) == 0: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 continue 

236 

237 if has_priority_and_section: 

238 (md5sum, size, section, priority, filename) = line.split() 

239 entry = dict( 

240 md5sum=md5sum, 

241 size=int(size), 

242 section=section, 

243 priority=priority, 

244 filename=filename, 

245 ) 

246 else: 

247 (md5sum, size, filename) = line.split() 

248 entry = dict(md5sum=md5sum, size=int(size), filename=filename) 

249 

250 entries[filename] = entry 

251 

252 for line in control.get(fields[1], "").split("\n"): 

253 if len(line) == 0: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 continue 

255 (sha1sum, size, filename) = line.split() 

256 entry = entries.get(filename) 

257 if entry is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 raise InvalidChangesException( 

259 "{0} is listed in {1}, but not in {2}.".format( 

260 filename, fields[1], fields[0] 

261 ) 

262 ) 

263 if entry is not None and entry.get("size", None) != int(size): 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 raise InvalidChangesException( 

265 "Size for {0} in {1} and {2} fields differ.".format( 

266 filename, fields[0], fields[1] 

267 ) 

268 ) 

269 entry["sha1sum"] = sha1sum 

270 

271 for line in control.get(fields[2], "").split("\n"): 

272 if len(line) == 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 continue 

274 (sha256sum, size, filename) = line.split() 

275 entry = entries.get(filename) 

276 if entry is None: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 raise InvalidChangesException( 

278 "{0} is listed in {1}, but not in {2}.".format( 

279 filename, fields[2], fields[0] 

280 ) 

281 ) 

282 if entry is not None and entry.get("size", None) != int(size): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 raise InvalidChangesException( 

284 "Size for {0} in {1} and {2} fields differ.".format( 

285 filename, fields[0], fields[2] 

286 ) 

287 ) 

288 entry["sha256sum"] = sha256sum 

289 

290 files = {} 

291 for filename, entry in entries.items(): 

292 if "size" not in entry: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 raise InvalidChangesException("No size for {0}.".format(filename)) 

294 if "md5sum" not in entry: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true

295 raise InvalidChangesException("No md5sum for {0}.".format(filename)) 

296 if "sha1sum" not in entry: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 raise InvalidChangesException("No sha1sum for {0}.".format(filename)) 

298 if "sha256sum" not in entry: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true

299 raise InvalidChangesException("No sha256sum for {0}.".format(filename)) 

300 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 raise InvalidChangesException( 

302 f"References file with unsafe filename '{filename}'." 

303 ) 

304 files[filename] = HashedFile(**entry) # type: ignore[arg-type] 

305 

306 return files 

307 

308 

309@functools.total_ordering 

310class Changes: 

311 """Representation of a .changes file""" 

312 

313 def __init__( 

314 self, 

315 directory: str, 

316 filename: str, 

317 keyrings: Collection[str], 

318 require_signature: bool = True, 

319 ): 

320 if not re_file_safe.match(filename): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 raise InvalidChangesException("{0}: unsafe filename".format(filename)) 

322 

323 self.directory: str = directory 

324 """directory the .changes is located in""" 

325 

326 self.filename: str = filename 

327 """name of the .changes file""" 

328 

329 with open(self.path, "rb") as fd: 

330 data = fd.read() 

331 self.signature = SignedFile(data, keyrings, require_signature) 

332 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents) 

333 """dict to access fields of the .changes file""" 

334 

335 self._binaries: "Optional[list[Binary]]" = None 

336 self._source: "Optional[Source]" = None 

337 self._files: Optional[dict[str, HashedFile]] = None 

338 self._keyrings = keyrings 

339 self._require_signature: bool = require_signature 

340 

341 @property 

342 def path(self) -> str: 

343 """path to the .changes file""" 

344 return os.path.join(self.directory, self.filename) 

345 

346 @property 

347 def primary_fingerprint(self) -> str: 

348 """fingerprint of the key used for signing the .changes file""" 

349 return self.signature.primary_fingerprint 

350 

351 @property 

352 def valid_signature(self) -> bool: 

353 """:const:`True` if the .changes has a valid signature""" 

354 return self.signature.valid 

355 

356 @property 

357 def weak_signature(self) -> bool: 

358 """:const:`True` if the .changes was signed using a weak algorithm""" 

359 return self.signature.weak_signature 

360 

361 @property 

362 def signature_timestamp(self) -> "datetime.datetime": 

363 return self.signature.signature_timestamp 

364 

365 @property 

366 def contents_sha1(self) -> str: 

367 return self.signature.contents_sha1 

368 

369 @property 

370 def architectures(self) -> list[str]: 

371 """list of architectures included in the upload""" 

372 return self.changes.get("Architecture", "").split() 

373 

374 @property 

375 def distributions(self) -> list[str]: 

376 """list of target distributions for the upload""" 

377 return self.changes["Distribution"].split() 

378 

379 @property 

380 def source(self) -> "Optional[Source]": 

381 """included source or :const:`None`""" 

382 if self._source is None: 

383 source_files = [] 

384 for f in self.files.values(): 

385 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename): 

386 source_files.append(f) 

387 if len(source_files) > 0: 

388 self._source = Source( 

389 self.directory, 

390 source_files, 

391 self._keyrings, 

392 self._require_signature, 

393 ) 

394 return self._source 

395 

396 @property 

397 def source_tag2upload_files(self) -> list[HashedFile]: 

398 """ 

399 extra source files 

400 """ 

401 return [ 

402 f 

403 for f in self.files.values() 

404 if re_file_source_tag2upload.match(f.filename) 

405 ] 

406 

407 @property 

408 def sourceful(self) -> bool: 

409 """:const:`True` if the upload includes source""" 

410 return "source" in self.architectures 

411 

412 @property 

413 def source_name(self) -> str: 

414 """source package name""" 

415 m = re_field_source.match(self.changes["Source"]) 

416 assert m is not None 

417 return m.group("package") 

418 

419 @property 

420 def binaries(self) -> "list[Binary]": 

421 """included binary packages""" 

422 if self._binaries is None: 

423 self._binaries = [ 

424 Binary(self.directory, f) 

425 for f in self.files.values() 

426 if re_file_binary.match(f.filename) 

427 ] 

428 return self._binaries 

429 

430 @property 

431 def byhand_files(self) -> list[HashedFile]: 

432 """included byhand files""" 

433 byhand = [] 

434 

435 for f in self.files.values(): 

436 assert f.section is not None 

437 if f.section == "byhand" or f.section.startswith("raw-"): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 byhand.append(f) 

439 continue 

440 if ( 

441 re_file_dsc.match(f.filename) 

442 or re_file_source.match(f.filename) 

443 or re_file_binary.match(f.filename) 

444 ): 

445 continue 

446 if re_file_buildinfo.match(f.filename): 446 ↛ 449line 446 didn't jump to line 449 because the condition on line 446 was always true

447 continue 

448 

449 raise InvalidChangesException( 

450 "{0}: {1} looks like a byhand package, but is in section {2}".format( 

451 self.filename, f.filename, f.section 

452 ) 

453 ) 

454 

455 return byhand 

456 

457 @property 

458 def buildinfo_files(self) -> list[HashedFile]: 

459 """included buildinfo files""" 

460 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)] 

461 

462 @property 

463 def binary_names(self) -> list[str]: 

464 """names of included binary packages""" 

465 return self.changes.get("Binary", "").split() 

466 

467 @property 

468 def closed_bugs(self) -> list[str]: 

469 """bugs closed by this upload""" 

470 return self.changes.get("Closes", "").split() 

471 

472 @property 

473 def files(self) -> dict[str, HashedFile]: 

474 """dict mapping filenames to :class:`HashedFile` objects""" 

475 if self._files is None: 

476 self._files = parse_file_list(self.changes, True) 

477 return self._files 

478 

479 @property 

480 def bytes(self) -> int: 

481 """total size of files included in this upload in bytes""" 

482 return sum(f.size for f in self.files.values()) 

483 

484 def _key(self) -> tuple[str, AptVersion, bool, str]: 

485 """tuple used to compare two changes files 

486 

487 We sort by source name and version first. If these are identical, 

488 we sort changes that include source before those without source (so 

489 that sourceful uploads get processed first), and finally fall back 

490 to the filename (this should really never happen). 

491 """ 

492 return ( 

493 self.changes.get("Source", ""), 

494 AptVersion(self.changes.get("Version", "")), 

495 not self.sourceful, 

496 self.filename, 

497 ) 

498 

499 @override 

500 def __eq__(self, other: object) -> bool: 

501 if not isinstance(other, Changes): 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true

502 return NotImplemented 

503 return self._key() == other._key() 

504 

505 def __lt__(self, other: "Changes") -> bool: 

506 return self._key() < other._key() 

507 

508 

509class Binary: 

510 """Representation of a binary package""" 

511 

512 def __init__(self, directory: str, hashed_file: HashedFile): 

513 self.hashed_file: HashedFile = hashed_file 

514 """file object for the .deb""" 

515 

516 path = os.path.join(directory, hashed_file.input_filename) 

517 data = apt_inst.DebFile(path).control.extractdata("control") 

518 

519 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data) 

520 """dict to access fields in DEBIAN/control""" 

521 

522 @classmethod 

523 def from_file(cls, directory, filename) -> "Binary": 

524 hashed_file = HashedFile.from_file(directory, filename) 

525 return cls(directory, hashed_file) 

526 

527 @property 

528 def source(self) -> tuple[str, str]: 

529 """get tuple with source package name and version""" 

530 source = self.control.get("Source", None) 

531 if source is None: 

532 return (self.control["Package"], self.control["Version"]) 

533 match = re_field_source.match(source) 

534 if not match: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 raise InvalidBinaryException( 

536 "{0}: Invalid Source field.".format(self.hashed_file.filename) 

537 ) 

538 version = match.group("version") 

539 if version is None: 

540 version = self.control["Version"] 

541 return (match.group("package"), version) 

542 

543 @property 

544 def name(self) -> str: 

545 return self.control["Package"] 

546 

547 @property 

548 def type(self) -> str: 

549 """package type ('deb' or 'udeb')""" 

550 match = re_file_binary.match(self.hashed_file.filename) 

551 if not match: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 raise InvalidBinaryException( 

553 "{0}: Does not match re_file_binary".format(self.hashed_file.filename) 

554 ) 

555 return match.group("type") 

556 

557 @property 

558 def component(self) -> str: 

559 """component name""" 

560 fields = self.control["Section"].split("/") 

561 if len(fields) > 1: 

562 return fields[0] 

563 return "main" 

564 

565 

566class Source: 

567 """Representation of a source package""" 

568 

569 def __init__( 

570 self, 

571 directory: str, 

572 hashed_files: list[HashedFile], 

573 keyrings: Collection[str], 

574 require_signature=True, 

575 ): 

576 self.hashed_files: list[HashedFile] = hashed_files 

577 """list of source files (including the .dsc itself)""" 

578 

579 dsc_file = None 

580 for f in hashed_files: 

581 if re_file_dsc.match(f.filename): 

582 if dsc_file is not None: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true

583 raise InvalidSourceException( 

584 "Multiple .dsc found ({0} and {1})".format( 

585 self._dsc_file.filename, f.filename 

586 ) 

587 ) 

588 else: 

589 dsc_file = f 

590 

591 if dsc_file is None: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 raise InvalidSourceException("No .dsc included in source files") 

593 self._dsc_file: HashedFile = dsc_file 

594 

595 # make sure the hash for the dsc is valid before we use it 

596 self._dsc_file.check(directory) 

597 

598 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename) 

599 with open(dsc_file_path, "rb") as fd: 

600 data = fd.read() 

601 self.signature = SignedFile(data, keyrings, require_signature) 

602 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents) 

603 """dict to access fields in the .dsc file""" 

604 

605 self.package_list: daklib.packagelist.PackageList = ( 

606 daklib.packagelist.PackageList(self.dsc) 

607 ) 

608 """Information about packages built by the source.""" 

609 

610 self._files: Optional[dict[str, HashedFile]] = None 

611 

612 @classmethod 

613 def from_file( 

614 cls, 

615 directory: str, 

616 filename: str, 

617 keyrings: Collection[str], 

618 require_signature=True, 

619 ) -> "Source": 

620 hashed_file = HashedFile.from_file(directory, filename) 

621 return cls(directory, [hashed_file], keyrings, require_signature) 

622 

623 @property 

624 def files(self) -> dict[str, HashedFile]: 

625 """dict mapping filenames to :class:`HashedFile` objects for additional source files 

626 

627 This list does not include the .dsc itself. 

628 """ 

629 if self._files is None: 

630 self._files = parse_file_list(self.dsc, False) 

631 return self._files 

632 

633 @property 

634 def primary_fingerprint(self) -> str: 

635 """fingerprint of the key used to sign the .dsc""" 

636 return self.signature.primary_fingerprint 

637 

638 @property 

639 def valid_signature(self) -> bool: 

640 """:const:`True` if the .dsc has a valid signature""" 

641 return self.signature.valid 

642 

643 @property 

644 def weak_signature(self) -> bool: 

645 """:const:`True` if the .dsc was signed using a weak algorithm""" 

646 return self.signature.weak_signature 

647 

648 @property 

649 def component(self) -> str: 

650 """guessed component name 

651 

652 Might be wrong. Don't rely on this. 

653 """ 

654 if "Section" not in self.dsc: 654 ↛ 656line 654 didn't jump to line 656 because the condition on line 654 was always true

655 return "main" 

656 fields = self.dsc["Section"].split("/") 

657 if len(fields) > 1: 

658 return fields[0] 

659 return "main" 

660 

661 @property 

662 def filename(self) -> str: 

663 """filename of .dsc file""" 

664 return self._dsc_file.filename