Coverage for daklib/upload.py: 86%

338 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-03-14 12:19 +0000

1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""module to handle uploads not yet installed to the archive 

18 

19This module provides classes to handle uploads not yet installed to the 

20archive. Central is the :class:`Changes` class which represents a changes file. 

21It provides methods to access the included binary and source packages. 

22""" 

23 

24import errno 

25import functools 

26import os 

27from collections.abc import Collection, Mapping 

28from typing import TYPE_CHECKING, Optional, override 

29 

30import apt_inst 

31import apt_pkg 

32 

33import daklib.dakapt 

34import daklib.packagelist 

35from daklib.aptversion import AptVersion 

36from daklib.gpg import SignedFile 

37from daklib.regexes import ( 

38 re_field_source, 

39 re_file_binary, 

40 re_file_buildinfo, 

41 re_file_dsc, 

42 re_file_safe, 

43 re_file_source, 

44 re_file_source_tag2upload, 

45) 

46 

47if TYPE_CHECKING: 

48 import datetime 

49 import re 

50 

51 

52class UploadException(Exception): 

53 pass 

54 

55 

56class InvalidChangesException(UploadException): 

57 pass 

58 

59 

60class InvalidBinaryException(UploadException): 

61 pass 

62 

63 

64class InvalidSourceException(UploadException): 

65 pass 

66 

67 

68class InvalidHashException(UploadException): 

69 def __init__( 

70 self, filename: str, hash_name: str, expected: str | int, actual: str | int 

71 ): 

72 self.filename = filename 

73 self.hash_name = hash_name 

74 self.expected = expected 

75 self.actual = actual 

76 

77 @override 

78 def __str__(self): 

79 return ( 

80 "Invalid {0} hash for {1}:\n" 

81 "According to the control file the {0} hash should be {2},\n" 

82 "but {1} has {3}.\n" 

83 "\n" 

84 "If you did not include {1} in your upload, a different version\n" 

85 "might already be known to the archive software." 

86 ).format(self.hash_name, self.filename, self.expected, self.actual) 

87 

88 

89class InvalidFilenameException(UploadException): 

90 def __init__(self, filename: str): 

91 self.filename: str = filename 

92 

93 @override 

94 def __str__(self): 

95 return "Invalid filename '{0}'.".format(self.filename) 

96 

97 

98class FileDoesNotExist(UploadException): 

99 def __init__(self, filename: str): 

100 self.filename = filename 

101 

102 @override 

103 def __str__(self): 

104 return "Refers to non-existing file '{0}'".format(self.filename) 

105 

106 

107class HashedFile: 

108 """file with checksums""" 

109 

110 def __init__( 

111 self, 

112 filename: str, 

113 size: int, 

114 md5sum: str, 

115 sha1sum: str, 

116 sha256sum: str, 

117 section: Optional[str] = None, 

118 priority: Optional[str] = None, 

119 input_filename: Optional[str] = None, 

120 ): 

121 self.filename: str = filename 

122 """name of the file""" 

123 

124 if input_filename is None: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true

125 input_filename = filename 

126 self.input_filename: str = input_filename 

127 """name of the file on disk 

128 

129 Used for temporary files that should not be installed using their on-disk name. 

130 """ 

131 

132 self.size: int = size 

133 """size in bytes""" 

134 

135 self.md5sum: str = md5sum 

136 """MD5 hash in hexdigits""" 

137 

138 self.sha1sum: str = sha1sum 

139 """SHA1 hash in hexdigits""" 

140 

141 self.sha256sum: str = sha256sum 

142 """SHA256 hash in hexdigits""" 

143 

144 self.section: Optional[str] = section 

145 """section or :const:`None`""" 

146 

147 self.priority: Optional[str] = priority 

148 """priority or :const:`None`""" 

149 

150 @classmethod 

151 def from_file( 

152 cls, 

153 directory: str, 

154 filename: str, 

155 section: Optional[str] = None, 

156 priority: Optional[str] = None, 

157 ) -> "HashedFile": 

158 """create with values for an existing file 

159 

160 Create a :class:`HashedFile` object that refers to an already existing file. 

161 

162 :param directory: directory the file is located in 

163 :param filename: filename 

164 :param section: optional section as given in .changes files 

165 :param priority: optional priority as given in .changes files 

166 :return: :class:`HashedFile` object for the given file 

167 """ 

168 path = os.path.join(directory, filename) 

169 with open(path, "r") as fh: 

170 size = os.fstat(fh.fileno()).st_size 

171 hashes = daklib.dakapt.DakHashes(fh) 

172 return cls( 

173 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority 

174 ) 

175 

176 def check(self, directory: str) -> None: 

177 """Validate hashes 

178 

179 Check if size and hashes match the expected value. 

180 

181 :param directory: directory the file is located in 

182 :raises InvalidHashException: if there is a hash mismatch 

183 """ 

184 path = os.path.join(directory, self.input_filename) 

185 try: 

186 with open(path) as fh: 

187 self.check_fh(fh) 

188 except OSError as e: 

189 if e.errno == errno.ENOENT: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 raise FileDoesNotExist(self.input_filename) 

191 raise 

192 

193 def check_fh(self, fh) -> None: 

194 size = os.fstat(fh.fileno()).st_size 

195 fh.seek(0) 

196 hashes = daklib.dakapt.DakHashes(fh) 

197 

198 if size != self.size: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 raise InvalidHashException(self.filename, "size", self.size, size) 

200 

201 if hashes.md5 != self.md5sum: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5) 

203 

204 if hashes.sha1 != self.sha1sum: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 raise InvalidHashException( 

206 self.filename, "sha1sum", self.sha1sum, hashes.sha1 

207 ) 

208 

209 if hashes.sha256 != self.sha256sum: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise InvalidHashException( 

211 self.filename, "sha256sum", self.sha256sum, hashes.sha256 

212 ) 

213 

214 

215def parse_file_list( 

216 control: Mapping[str, str], 

217 has_priority_and_section: bool, 

218 safe_file_regexp: "re.Pattern" = re_file_safe, 

219 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"), 

220) -> dict[str, HashedFile]: 

221 """Parse Files and Checksums-* fields 

222 

223 :param control: control file to take fields from 

224 :param has_priority_and_section: Files field include section and priority 

225 (as in .changes) 

226 :return: dict mapping filenames to :class:`HashedFile` objects 

227 

228 :raises InvalidChangesException: missing fields or other grave errors 

229 """ 

230 entries: dict[str, dict[str, str | int]] = {} 

231 entry: dict[str, str | int] | None 

232 

233 for line in control.get(fields[0], "").split("\n"): 

234 if len(line) == 0: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 continue 

236 

237 if has_priority_and_section: 

238 (md5sum, size, section, priority, filename) = line.split() 

239 entry = dict( 

240 md5sum=md5sum, 

241 size=int(size), 

242 section=section, 

243 priority=priority, 

244 filename=filename, 

245 ) 

246 else: 

247 (md5sum, size, filename) = line.split() 

248 entry = dict(md5sum=md5sum, size=int(size), filename=filename) 

249 

250 entries[filename] = entry 

251 

252 for line in control.get(fields[1], "").split("\n"): 

253 if len(line) == 0: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 continue 

255 (sha1sum, size, filename) = line.split() 

256 entry = entries.get(filename) 

257 if entry is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 raise InvalidChangesException( 

259 "{0} is listed in {1}, but not in {2}.".format( 

260 filename, fields[1], fields[0] 

261 ) 

262 ) 

263 if entry is not None and entry.get("size", None) != int(size): 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 raise InvalidChangesException( 

265 "Size for {0} in {1} and {2} fields differ.".format( 

266 filename, fields[0], fields[1] 

267 ) 

268 ) 

269 entry["sha1sum"] = sha1sum 

270 

271 for line in control.get(fields[2], "").split("\n"): 

272 if len(line) == 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 continue 

274 (sha256sum, size, filename) = line.split() 

275 entry = entries.get(filename) 

276 if entry is None: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 raise InvalidChangesException( 

278 "{0} is listed in {1}, but not in {2}.".format( 

279 filename, fields[2], fields[0] 

280 ) 

281 ) 

282 if entry is not None and entry.get("size", None) != int(size): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 raise InvalidChangesException( 

284 "Size for {0} in {1} and {2} fields differ.".format( 

285 filename, fields[0], fields[2] 

286 ) 

287 ) 

288 entry["sha256sum"] = sha256sum 

289 

290 files = {} 

291 for filename, entry in entries.items(): 

292 if "size" not in entry: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 raise InvalidChangesException("No size for {0}.".format(filename)) 

294 if "md5sum" not in entry: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true

295 raise InvalidChangesException("No md5sum for {0}.".format(filename)) 

296 if "sha1sum" not in entry: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 raise InvalidChangesException("No sha1sum for {0}.".format(filename)) 

298 if "sha256sum" not in entry: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true

299 raise InvalidChangesException("No sha256sum for {0}.".format(filename)) 

300 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 raise InvalidChangesException( 

302 f"References file with unsafe filename '{filename}'." 

303 ) 

304 files[filename] = HashedFile(**entry) # type: ignore[arg-type] 

305 

306 return files 

307 

308 

309def _mangle_source_name(source: str) -> str: 

310 """mangle source name for comparison 

311 

312 This is a hack to ensure "grub-efi-*-signed" is processed before 

313 "grub2". 

314 """ 

315 

316 if source == "grub2": 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 return "grub" 

318 return source 

319 

320 

321@functools.total_ordering 

322class Changes: 

323 """Representation of a .changes file""" 

324 

325 def __init__( 

326 self, 

327 directory: str, 

328 filename: str, 

329 keyrings: Collection[str], 

330 require_signature: bool = True, 

331 ): 

332 if not re_file_safe.match(filename): 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 raise InvalidChangesException("{0}: unsafe filename".format(filename)) 

334 

335 self.directory: str = directory 

336 """directory the .changes is located in""" 

337 

338 self.filename: str = filename 

339 """name of the .changes file""" 

340 

341 with open(self.path, "rb") as fd: 

342 data = fd.read() 

343 self.signature = SignedFile(data, keyrings, require_signature) 

344 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents) 

345 """dict to access fields of the .changes file""" 

346 

347 self._binaries: "Optional[list[Binary]]" = None 

348 self._source: "Optional[Source]" = None 

349 self._files: Optional[dict[str, HashedFile]] = None 

350 self._keyrings = keyrings 

351 self._require_signature: bool = require_signature 

352 

353 @property 

354 def path(self) -> str: 

355 """path to the .changes file""" 

356 return os.path.join(self.directory, self.filename) 

357 

358 @property 

359 def primary_fingerprint(self) -> str: 

360 """fingerprint of the key used for signing the .changes file""" 

361 return self.signature.primary_fingerprint 

362 

363 @property 

364 def valid_signature(self) -> bool: 

365 """:const:`True` if the .changes has a valid signature""" 

366 return self.signature.valid 

367 

368 @property 

369 def weak_signature(self) -> bool: 

370 """:const:`True` if the .changes was signed using a weak algorithm""" 

371 return self.signature.weak_signature 

372 

373 @property 

374 def signature_timestamp(self) -> "datetime.datetime": 

375 return self.signature.signature_timestamp 

376 

377 @property 

378 def contents_sha1(self) -> str: 

379 return self.signature.contents_sha1 

380 

381 @property 

382 def architectures(self) -> list[str]: 

383 """list of architectures included in the upload""" 

384 return self.changes.get("Architecture", "").split() 

385 

386 @property 

387 def distributions(self) -> list[str]: 

388 """list of target distributions for the upload""" 

389 return self.changes["Distribution"].split() 

390 

391 @property 

392 def source(self) -> "Optional[Source]": 

393 """included source or :const:`None`""" 

394 if self._source is None: 

395 source_files = [] 

396 for f in self.files.values(): 

397 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename): 

398 source_files.append(f) 

399 if len(source_files) > 0: 

400 self._source = Source( 

401 self.directory, 

402 source_files, 

403 self._keyrings, 

404 self._require_signature, 

405 ) 

406 return self._source 

407 

408 @property 

409 def source_tag2upload_files(self) -> list[HashedFile]: 

410 """ 

411 extra source files 

412 """ 

413 return [ 

414 f 

415 for f in self.files.values() 

416 if re_file_source_tag2upload.match(f.filename) 

417 ] 

418 

419 @property 

420 def sourceful(self) -> bool: 

421 """:const:`True` if the upload includes source""" 

422 return "source" in self.architectures 

423 

424 @property 

425 def source_name(self) -> str: 

426 """source package name""" 

427 m = re_field_source.match(self.changes["Source"]) 

428 assert m is not None 

429 return m.group("package") 

430 

431 @property 

432 def binaries(self) -> "list[Binary]": 

433 """included binary packages""" 

434 if self._binaries is None: 

435 self._binaries = [ 

436 Binary(self.directory, f) 

437 for f in self.files.values() 

438 if re_file_binary.match(f.filename) 

439 ] 

440 return self._binaries 

441 

442 @property 

443 def byhand_files(self) -> list[HashedFile]: 

444 """included byhand files""" 

445 byhand = [] 

446 

447 for f in self.files.values(): 

448 assert f.section is not None 

449 if f.section == "byhand" or f.section.startswith("raw-"): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 byhand.append(f) 

451 continue 

452 if ( 

453 re_file_dsc.match(f.filename) 

454 or re_file_source.match(f.filename) 

455 or re_file_binary.match(f.filename) 

456 ): 

457 continue 

458 if re_file_buildinfo.match(f.filename): 458 ↛ 461line 458 didn't jump to line 461 because the condition on line 458 was always true

459 continue 

460 

461 raise InvalidChangesException( 

462 "{0}: {1} looks like a byhand package, but is in section {2}".format( 

463 self.filename, f.filename, f.section 

464 ) 

465 ) 

466 

467 return byhand 

468 

469 @property 

470 def buildinfo_files(self) -> list[HashedFile]: 

471 """included buildinfo files""" 

472 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)] 

473 

474 @property 

475 def binary_names(self) -> list[str]: 

476 """names of included binary packages""" 

477 return self.changes.get("Binary", "").split() 

478 

479 @property 

480 def closed_bugs(self) -> list[str]: 

481 """bugs closed by this upload""" 

482 return self.changes.get("Closes", "").split() 

483 

484 @property 

485 def files(self) -> dict[str, HashedFile]: 

486 """dict mapping filenames to :class:`HashedFile` objects""" 

487 if self._files is None: 

488 self._files = parse_file_list(self.changes, True) 

489 return self._files 

490 

491 @property 

492 def bytes(self) -> int: 

493 """total size of files included in this upload in bytes""" 

494 return sum(f.size for f in self.files.values()) 

495 

496 def _key(self) -> tuple[str, AptVersion, bool, str]: 

497 """tuple used to compare two changes files 

498 

499 We sort by source name and version first. If these are identical, 

500 we sort changes that include source before those without source (so 

501 that sourceful uploads get processed first), and finally fall back 

502 to the filename (this should really never happen). 

503 """ 

504 return ( 

505 _mangle_source_name(self.changes.get("Source", "")), 

506 AptVersion(self.changes.get("Version", "")), 

507 not self.sourceful, 

508 self.filename, 

509 ) 

510 

511 @override 

512 def __eq__(self, other: object) -> bool: 

513 if not isinstance(other, Changes): 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true

514 return NotImplemented 

515 return self._key() == other._key() 

516 

517 def __lt__(self, other: "Changes") -> bool: 

518 return self._key() < other._key() 

519 

520 

521class Binary: 

522 """Representation of a binary package""" 

523 

524 def __init__(self, directory: str, hashed_file: HashedFile): 

525 self.hashed_file: HashedFile = hashed_file 

526 """file object for the .deb""" 

527 

528 path = os.path.join(directory, hashed_file.input_filename) 

529 data = apt_inst.DebFile(path).control.extractdata("control") 

530 

531 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data) 

532 """dict to access fields in DEBIAN/control""" 

533 

534 @classmethod 

535 def from_file(cls, directory, filename) -> "Binary": 

536 hashed_file = HashedFile.from_file(directory, filename) 

537 return cls(directory, hashed_file) 

538 

539 @property 

540 def source(self) -> tuple[str, str]: 

541 """get tuple with source package name and version""" 

542 source = self.control.get("Source", None) 

543 if source is None: 

544 return (self.control["Package"], self.control["Version"]) 

545 match = re_field_source.match(source) 

546 if not match: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true

547 raise InvalidBinaryException( 

548 "{0}: Invalid Source field.".format(self.hashed_file.filename) 

549 ) 

550 version = match.group("version") 

551 if version is None: 

552 version = self.control["Version"] 

553 return (match.group("package"), version) 

554 

555 @property 

556 def name(self) -> str: 

557 return self.control["Package"] 

558 

559 @property 

560 def type(self) -> str: 

561 """package type ('deb' or 'udeb')""" 

562 match = re_file_binary.match(self.hashed_file.filename) 

563 if not match: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true

564 raise InvalidBinaryException( 

565 "{0}: Does not match re_file_binary".format(self.hashed_file.filename) 

566 ) 

567 return match.group("type") 

568 

569 @property 

570 def component(self) -> str: 

571 """component name""" 

572 fields = self.control["Section"].split("/") 

573 if len(fields) > 1: 

574 return fields[0] 

575 return "main" 

576 

577 

578class Source: 

579 """Representation of a source package""" 

580 

581 def __init__( 

582 self, 

583 directory: str, 

584 hashed_files: list[HashedFile], 

585 keyrings: Collection[str], 

586 require_signature=True, 

587 ): 

588 self.hashed_files: list[HashedFile] = hashed_files 

589 """list of source files (including the .dsc itself)""" 

590 

591 dsc_file = None 

592 for f in hashed_files: 

593 if re_file_dsc.match(f.filename): 

594 if dsc_file is not None: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 raise InvalidSourceException( 

596 "Multiple .dsc found ({0} and {1})".format( 

597 self._dsc_file.filename, f.filename 

598 ) 

599 ) 

600 else: 

601 dsc_file = f 

602 

603 if dsc_file is None: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 raise InvalidSourceException("No .dsc included in source files") 

605 self._dsc_file: HashedFile = dsc_file 

606 

607 # make sure the hash for the dsc is valid before we use it 

608 self._dsc_file.check(directory) 

609 

610 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename) 

611 with open(dsc_file_path, "rb") as fd: 

612 data = fd.read() 

613 self.signature = SignedFile(data, keyrings, require_signature) 

614 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents) 

615 """dict to access fields in the .dsc file""" 

616 

617 self.package_list: daklib.packagelist.PackageList = ( 

618 daklib.packagelist.PackageList(self.dsc) 

619 ) 

620 """Information about packages built by the source.""" 

621 

622 self._files: Optional[dict[str, HashedFile]] = None 

623 

624 @classmethod 

625 def from_file( 

626 cls, 

627 directory: str, 

628 filename: str, 

629 keyrings: Collection[str], 

630 require_signature=True, 

631 ) -> "Source": 

632 hashed_file = HashedFile.from_file(directory, filename) 

633 return cls(directory, [hashed_file], keyrings, require_signature) 

634 

635 @property 

636 def files(self) -> dict[str, HashedFile]: 

637 """dict mapping filenames to :class:`HashedFile` objects for additional source files 

638 

639 This list does not include the .dsc itself. 

640 """ 

641 if self._files is None: 

642 self._files = parse_file_list(self.dsc, False) 

643 return self._files 

644 

645 @property 

646 def primary_fingerprint(self) -> str: 

647 """fingerprint of the key used to sign the .dsc""" 

648 return self.signature.primary_fingerprint 

649 

650 @property 

651 def valid_signature(self) -> bool: 

652 """:const:`True` if the .dsc has a valid signature""" 

653 return self.signature.valid 

654 

655 @property 

656 def weak_signature(self) -> bool: 

657 """:const:`True` if the .dsc was signed using a weak algorithm""" 

658 return self.signature.weak_signature 

659 

660 @property 

661 def component(self) -> str: 

662 """guessed component name 

663 

664 Might be wrong. Don't rely on this. 

665 """ 

666 if "Section" not in self.dsc: 666 ↛ 668line 666 didn't jump to line 668 because the condition on line 666 was always true

667 return "main" 

668 fields = self.dsc["Section"].split("/") 

669 if len(fields) > 1: 

670 return fields[0] 

671 return "main" 

672 

673 @property 

674 def filename(self) -> str: 

675 """filename of .dsc file""" 

676 return self._dsc_file.filename