1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""module to handle uploads not yet installed to the archive 

18 

19This module provides classes to handle uploads not yet installed to the 

20archive. Central is the :class:`Changes` class which represents a changes file. 

21It provides methods to access the included binary and source packages. 

22""" 

23 

24import errno 

25import functools 

26import os 

27from collections.abc import Mapping 

28from typing import TYPE_CHECKING, Optional 

29 

30import apt_inst 

31import apt_pkg 

32 

33import daklib.dakapt 

34import daklib.packagelist 

35from daklib.aptversion import AptVersion 

36from daklib.gpg import SignedFile 

37from daklib.regexes import ( 

38 re_field_source, 

39 re_file_binary, 

40 re_file_buildinfo, 

41 re_file_dsc, 

42 re_file_safe, 

43 re_file_source, 

44 re_file_source_tag2upload, 

45) 

46 

47if TYPE_CHECKING: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true

48 import datetime 

49 import re 

50 

51 

52class UploadException(Exception): 

53 pass 

54 

55 

56class InvalidChangesException(UploadException): 

57 pass 

58 

59 

60class InvalidBinaryException(UploadException): 

61 pass 

62 

63 

64class InvalidSourceException(UploadException): 

65 pass 

66 

67 

68class InvalidHashException(UploadException): 

69 def __init__(self, filename: str, hash_name: str, expected, actual): 

70 self.filename = filename 

71 self.hash_name = hash_name 

72 self.expected = expected 

73 self.actual = actual 

74 

75 def __str__(self): 

76 return ( 

77 "Invalid {0} hash for {1}:\n" 

78 "According to the control file the {0} hash should be {2},\n" 

79 "but {1} has {3}.\n" 

80 "\n" 

81 "If you did not include {1} in your upload, a different version\n" 

82 "might already be known to the archive software." 

83 ).format(self.hash_name, self.filename, self.expected, self.actual) 

84 

85 

86class InvalidFilenameException(UploadException): 

87 def __init__(self, filename: str): 

88 self.filename: str = filename 

89 

90 def __str__(self): 

91 return "Invalid filename '{0}'.".format(self.filename) 

92 

93 

94class FileDoesNotExist(UploadException): 

95 def __init__(self, filename: str): 

96 self.filename = filename 

97 

98 def __str__(self): 

99 return "Refers to non-existing file '{0}'".format(self.filename) 

100 

101 

102class HashedFile: 

103 """file with checksums""" 

104 

105 def __init__( 

106 self, 

107 filename: str, 

108 size: int, 

109 md5sum: str, 

110 sha1sum: str, 

111 sha256sum: str, 

112 section: Optional[str] = None, 

113 priority: Optional[str] = None, 

114 input_filename: Optional[str] = None, 

115 ): 

116 self.filename: str = filename 

117 """name of the file""" 

118 

119 if input_filename is None: 119 ↛ 121line 119 didn't jump to line 121, because the condition on line 119 was never false

120 input_filename = filename 

121 self.input_filename: str = input_filename 

122 """name of the file on disk 

123 

124 Used for temporary files that should not be installed using their on-disk name. 

125 """ 

126 

127 self.size: int = size 

128 """size in bytes""" 

129 

130 self.md5sum: str = md5sum 

131 """MD5 hash in hexdigits""" 

132 

133 self.sha1sum: str = sha1sum 

134 """SHA1 hash in hexdigits""" 

135 

136 self.sha256sum: str = sha256sum 

137 """SHA256 hash in hexdigits""" 

138 

139 self.section: Optional[str] = section 

140 """section or :const:`None`""" 

141 

142 self.priority: Optional[str] = priority 

143 """priority or :const:`None`""" 

144 

145 @classmethod 

146 def from_file( 

147 cls, 

148 directory: str, 

149 filename: str, 

150 section: Optional[str] = None, 

151 priority: Optional[str] = None, 

152 ) -> "HashedFile": 

153 """create with values for an existing file 

154 

155 Create a :class:`HashedFile` object that refers to an already existing file. 

156 

157 :param directory: directory the file is located in 

158 :param filename: filename 

159 :param section: optional section as given in .changes files 

160 :param priority: optional priority as given in .changes files 

161 :return: :class:`HashedFile` object for the given file 

162 """ 

163 path = os.path.join(directory, filename) 

164 with open(path, "r") as fh: 

165 size = os.fstat(fh.fileno()).st_size 

166 hashes = daklib.dakapt.DakHashes(fh) 

167 return cls( 

168 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority 

169 ) 

170 

171 def check(self, directory: str) -> None: 

172 """Validate hashes 

173 

174 Check if size and hashes match the expected value. 

175 

176 :param directory: directory the file is located in 

177 :raises InvalidHashException: if there is a hash mismatch 

178 """ 

179 path = os.path.join(directory, self.input_filename) 

180 try: 

181 with open(path) as fh: 

182 self.check_fh(fh) 

183 except OSError as e: 

184 if e.errno == errno.ENOENT: 184 ↛ 186line 184 didn't jump to line 186, because the condition on line 184 was never false

185 raise FileDoesNotExist(self.input_filename) 

186 raise 

187 

188 def check_fh(self, fh) -> None: 

189 size = os.fstat(fh.fileno()).st_size 

190 fh.seek(0) 

191 hashes = daklib.dakapt.DakHashes(fh) 

192 

193 if size != self.size: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true

194 raise InvalidHashException(self.filename, "size", self.size, size) 

195 

196 if hashes.md5 != self.md5sum: 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true

197 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5) 

198 

199 if hashes.sha1 != self.sha1sum: 199 ↛ 200line 199 didn't jump to line 200, because the condition on line 199 was never true

200 raise InvalidHashException( 

201 self.filename, "sha1sum", self.sha1sum, hashes.sha1 

202 ) 

203 

204 if hashes.sha256 != self.sha256sum: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true

205 raise InvalidHashException( 

206 self.filename, "sha256sum", self.sha256sum, hashes.sha256 

207 ) 

208 

209 

210def parse_file_list( 

211 control: Mapping[str, str], 

212 has_priority_and_section: bool, 

213 safe_file_regexp: "re.Pattern" = re_file_safe, 

214 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"), 

215) -> dict[str, HashedFile]: 

216 """Parse Files and Checksums-* fields 

217 

218 :param control: control file to take fields from 

219 :param has_priority_and_section: Files field include section and priority 

220 (as in .changes) 

221 :return: dict mapping filenames to :class:`HashedFile` objects 

222 

223 :raises InvalidChangesException: missing fields or other grave errors 

224 """ 

225 entries = {} 

226 

227 for line in control.get(fields[0], "").split("\n"): 

228 if len(line) == 0: 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true

229 continue 

230 

231 if has_priority_and_section: 

232 (md5sum, size, section, priority, filename) = line.split() 

233 entry = dict( 

234 md5sum=md5sum, 

235 size=int(size), 

236 section=section, 

237 priority=priority, 

238 filename=filename, 

239 ) 

240 else: 

241 (md5sum, size, filename) = line.split() 

242 entry = dict(md5sum=md5sum, size=int(size), filename=filename) 

243 

244 entries[filename] = entry 

245 

246 for line in control.get(fields[1], "").split("\n"): 

247 if len(line) == 0: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true

248 continue 

249 (sha1sum, size, filename) = line.split() 

250 entry = entries.get(filename) 

251 if entry is None: 251 ↛ 252line 251 didn't jump to line 252, because the condition on line 251 was never true

252 raise InvalidChangesException( 

253 "{0} is listed in {1}, but not in {2}.".format( 

254 filename, fields[1], fields[0] 

255 ) 

256 ) 

257 if entry is not None and entry.get("size", None) != int(size): 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true

258 raise InvalidChangesException( 

259 "Size for {0} in {1} and {2} fields differ.".format( 

260 filename, fields[0], fields[1] 

261 ) 

262 ) 

263 entry["sha1sum"] = sha1sum 

264 

265 for line in control.get(fields[2], "").split("\n"): 

266 if len(line) == 0: 266 ↛ 267line 266 didn't jump to line 267, because the condition on line 266 was never true

267 continue 

268 (sha256sum, size, filename) = line.split() 

269 entry = entries.get(filename) 

270 if entry is None: 270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true

271 raise InvalidChangesException( 

272 "{0} is listed in {1}, but not in {2}.".format( 

273 filename, fields[2], fields[0] 

274 ) 

275 ) 

276 if entry is not None and entry.get("size", None) != int(size): 276 ↛ 277line 276 didn't jump to line 277, because the condition on line 276 was never true

277 raise InvalidChangesException( 

278 "Size for {0} in {1} and {2} fields differ.".format( 

279 filename, fields[0], fields[2] 

280 ) 

281 ) 

282 entry["sha256sum"] = sha256sum 

283 

284 files = {} 

285 for entry in entries.values(): 

286 filename = entry["filename"] 

287 if "size" not in entry: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true

288 raise InvalidChangesException("No size for {0}.".format(filename)) 

289 if "md5sum" not in entry: 289 ↛ 290line 289 didn't jump to line 290, because the condition on line 289 was never true

290 raise InvalidChangesException("No md5sum for {0}.".format(filename)) 

291 if "sha1sum" not in entry: 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true

292 raise InvalidChangesException("No sha1sum for {0}.".format(filename)) 

293 if "sha256sum" not in entry: 293 ↛ 294line 293 didn't jump to line 294, because the condition on line 293 was never true

294 raise InvalidChangesException("No sha256sum for {0}.".format(filename)) 

295 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true

296 raise InvalidChangesException( 

297 f"References file with unsafe filename '{filename}'." 

298 ) 

299 files[filename] = HashedFile(**entry) 

300 

301 return files 

302 

303 

304@functools.total_ordering 

305class Changes: 

306 """Representation of a .changes file""" 

307 

308 def __init__( 

309 self, directory: str, filename: str, keyrings, require_signature: bool = True 

310 ): 

311 if not re_file_safe.match(filename): 311 ↛ 312line 311 didn't jump to line 312, because the condition on line 311 was never true

312 raise InvalidChangesException("{0}: unsafe filename".format(filename)) 

313 

314 self.directory: str = directory 

315 """directory the .changes is located in""" 

316 

317 self.filename: str = filename 

318 """name of the .changes file""" 

319 

320 with open(self.path, "rb") as fd: 

321 data = fd.read() 

322 self.signature = SignedFile(data, keyrings, require_signature) 

323 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents) 

324 """dict to access fields of the .changes file""" 

325 

326 self._binaries: "Optional[list[Binary]]" = None 

327 self._source: "Optional[Source]" = None 

328 self._files: Optional[dict[str, HashedFile]] = None 

329 self._keyrings = keyrings 

330 self._require_signature: bool = require_signature 

331 

332 @property 

333 def path(self) -> str: 

334 """path to the .changes file""" 

335 return os.path.join(self.directory, self.filename) 

336 

337 @property 

338 def primary_fingerprint(self) -> str: 

339 """fingerprint of the key used for signing the .changes file""" 

340 return self.signature.primary_fingerprint 

341 

342 @property 

343 def valid_signature(self) -> bool: 

344 """:const:`True` if the .changes has a valid signature""" 

345 return self.signature.valid 

346 

347 @property 

348 def weak_signature(self) -> bool: 

349 """:const:`True` if the .changes was signed using a weak algorithm""" 

350 return self.signature.weak_signature 

351 

352 @property 

353 def signature_timestamp(self) -> "datetime.datetime": 

354 return self.signature.signature_timestamp 

355 

356 @property 

357 def contents_sha1(self) -> str: 

358 return self.signature.contents_sha1 

359 

360 @property 

361 def architectures(self) -> list[str]: 

362 """list of architectures included in the upload""" 

363 return self.changes.get("Architecture", "").split() 

364 

365 @property 

366 def distributions(self) -> list[str]: 

367 """list of target distributions for the upload""" 

368 return self.changes["Distribution"].split() 

369 

370 @property 

371 def source(self) -> "Optional[Source]": 

372 """included source or :const:`None`""" 

373 if self._source is None: 

374 source_files = [] 

375 for f in self.files.values(): 

376 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename): 

377 source_files.append(f) 

378 if len(source_files) > 0: 

379 self._source = Source( 

380 self.directory, 

381 source_files, 

382 self._keyrings, 

383 self._require_signature, 

384 ) 

385 return self._source 

386 

387 @property 

388 def source_tag2upload_files(self) -> list[HashedFile]: 

389 """ 

390 extra source files 

391 """ 

392 return [ 

393 f 

394 for f in self.files.values() 

395 if re_file_source_tag2upload.match(f.filename) 

396 ] 

397 

398 @property 

399 def sourceful(self) -> bool: 

400 """:const:`True` if the upload includes source""" 

401 return "source" in self.architectures 

402 

403 @property 

404 def source_name(self) -> str: 

405 """source package name""" 

406 return re_field_source.match(self.changes["Source"]).group("package") 

407 

408 @property 

409 def binaries(self) -> "list[Binary]": 

410 """included binary packages""" 

411 if self._binaries is None: 

412 self._binaries = [ 

413 Binary(self.directory, f) 

414 for f in self.files.values() 

415 if re_file_binary.match(f.filename) 

416 ] 

417 return self._binaries 

418 

419 @property 

420 def byhand_files(self) -> list[HashedFile]: 

421 """included byhand files""" 

422 byhand = [] 

423 

424 for f in self.files.values(): 

425 if f.section == "byhand" or f.section[:4] == "raw-": 425 ↛ 426line 425 didn't jump to line 426, because the condition on line 425 was never true

426 byhand.append(f) 

427 continue 

428 if ( 

429 re_file_dsc.match(f.filename) 

430 or re_file_source.match(f.filename) 

431 or re_file_binary.match(f.filename) 

432 ): 

433 continue 

434 if re_file_buildinfo.match(f.filename): 434 ↛ 437line 434 didn't jump to line 437, because the condition on line 434 was never false

435 continue 

436 

437 raise InvalidChangesException( 

438 "{0}: {1} looks like a byhand package, but is in section {2}".format( 

439 self.filename, f.filename, f.section 

440 ) 

441 ) 

442 

443 return byhand 

444 

445 @property 

446 def buildinfo_files(self) -> list[HashedFile]: 

447 """included buildinfo files""" 

448 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)] 

449 

450 @property 

451 def binary_names(self) -> list[str]: 

452 """names of included binary packages""" 

453 return self.changes.get("Binary", "").split() 

454 

455 @property 

456 def closed_bugs(self) -> list[str]: 

457 """bugs closed by this upload""" 

458 return self.changes.get("Closes", "").split() 

459 

460 @property 

461 def files(self) -> dict[str, HashedFile]: 

462 """dict mapping filenames to :class:`HashedFile` objects""" 

463 if self._files is None: 

464 self._files = parse_file_list(self.changes, True) 

465 return self._files 

466 

467 @property 

468 def bytes(self) -> int: 

469 """total size of files included in this upload in bytes""" 

470 return sum(f.size for f in self.files.values()) 

471 

472 def _key(self) -> tuple[str, AptVersion, bool, str]: 

473 """tuple used to compare two changes files 

474 

475 We sort by source name and version first. If these are identical, 

476 we sort changes that include source before those without source (so 

477 that sourceful uploads get processed first), and finally fall back 

478 to the filename (this should really never happen). 

479 """ 

480 return ( 

481 self.changes.get("Source", ""), 

482 AptVersion(self.changes.get("Version", "")), 

483 not self.sourceful, 

484 self.filename, 

485 ) 

486 

487 def __eq__(self, other: object) -> bool: 

488 if not isinstance(other, Changes): 488 ↛ 489line 488 didn't jump to line 489, because the condition on line 488 was never true

489 return NotImplemented 

490 return self._key() == other._key() 

491 

492 def __lt__(self, other: "Changes") -> bool: 

493 return self._key() < other._key() 

494 

495 

496class Binary: 

497 """Representation of a binary package""" 

498 

499 def __init__(self, directory: str, hashed_file: HashedFile): 

500 self.hashed_file: HashedFile = hashed_file 

501 """file object for the .deb""" 

502 

503 path = os.path.join(directory, hashed_file.input_filename) 

504 data = apt_inst.DebFile(path).control.extractdata("control") 

505 

506 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data) 

507 """dict to access fields in DEBIAN/control""" 

508 

509 @classmethod 

510 def from_file(cls, directory, filename) -> "Binary": 

511 hashed_file = HashedFile.from_file(directory, filename) 

512 return cls(directory, hashed_file) 

513 

514 @property 

515 def source(self) -> tuple[str, str]: 

516 """get tuple with source package name and version""" 

517 source = self.control.get("Source", None) 

518 if source is None: 

519 return (self.control["Package"], self.control["Version"]) 

520 match = re_field_source.match(source) 

521 if not match: 521 ↛ 522line 521 didn't jump to line 522, because the condition on line 521 was never true

522 raise InvalidBinaryException( 

523 "{0}: Invalid Source field.".format(self.hashed_file.filename) 

524 ) 

525 version = match.group("version") 

526 if version is None: 

527 version = self.control["Version"] 

528 return (match.group("package"), version) 

529 

530 @property 

531 def name(self) -> str: 

532 return self.control["Package"] 

533 

534 @property 

535 def type(self) -> str: 

536 """package type ('deb' or 'udeb')""" 

537 match = re_file_binary.match(self.hashed_file.filename) 

538 if not match: 538 ↛ 539line 538 didn't jump to line 539, because the condition on line 538 was never true

539 raise InvalidBinaryException( 

540 "{0}: Does not match re_file_binary".format(self.hashed_file.filename) 

541 ) 

542 return match.group("type") 

543 

544 @property 

545 def component(self) -> str: 

546 """component name""" 

547 fields = self.control["Section"].split("/") 

548 if len(fields) > 1: 

549 return fields[0] 

550 return "main" 

551 

552 

553class Source: 

554 """Representation of a source package""" 

555 

556 def __init__( 

557 self, 

558 directory: str, 

559 hashed_files: list[HashedFile], 

560 keyrings, 

561 require_signature=True, 

562 ): 

563 self.hashed_files: list[HashedFile] = hashed_files 

564 """list of source files (including the .dsc itself)""" 

565 

566 dsc_file = None 

567 for f in hashed_files: 

568 if re_file_dsc.match(f.filename): 

569 if dsc_file is not None: 569 ↛ 570line 569 didn't jump to line 570, because the condition on line 569 was never true

570 raise InvalidSourceException( 

571 "Multiple .dsc found ({0} and {1})".format( 

572 self._dsc_file.filename, f.filename 

573 ) 

574 ) 

575 else: 

576 dsc_file = f 

577 

578 if dsc_file is None: 578 ↛ 579line 578 didn't jump to line 579, because the condition on line 578 was never true

579 raise InvalidSourceException("No .dsc included in source files") 

580 self._dsc_file: HashedFile = dsc_file 

581 

582 # make sure the hash for the dsc is valid before we use it 

583 self._dsc_file.check(directory) 

584 

585 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename) 

586 with open(dsc_file_path, "rb") as fd: 

587 data = fd.read() 

588 self.signature = SignedFile(data, keyrings, require_signature) 

589 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents) 

590 """dict to access fields in the .dsc file""" 

591 

592 self.package_list: daklib.packagelist.PackageList = ( 

593 daklib.packagelist.PackageList(self.dsc) 

594 ) 

595 """Information about packages built by the source.""" 

596 

597 self._files: Optional[dict[str, HashedFile]] = None 

598 

599 @classmethod 

600 def from_file( 

601 cls, directory, filename, keyrings, require_signature=True 

602 ) -> "Source": 

603 hashed_file = HashedFile.from_file(directory, filename) 

604 return cls(directory, [hashed_file], keyrings, require_signature) 

605 

606 @property 

607 def files(self) -> dict[str, HashedFile]: 

608 """dict mapping filenames to :class:`HashedFile` objects for additional source files 

609 

610 This list does not include the .dsc itself. 

611 """ 

612 if self._files is None: 

613 self._files = parse_file_list(self.dsc, False) 

614 return self._files 

615 

616 @property 

617 def primary_fingerprint(self) -> str: 

618 """fingerprint of the key used to sign the .dsc""" 

619 return self.signature.primary_fingerprint 

620 

621 @property 

622 def valid_signature(self) -> bool: 

623 """:const:`True` if the .dsc has a valid signature""" 

624 return self.signature.valid 

625 

626 @property 

627 def weak_signature(self) -> bool: 

628 """:const:`True` if the .dsc was signed using a weak algorithm""" 

629 return self.signature.weak_signature 

630 

631 @property 

632 def component(self) -> str: 

633 """guessed component name 

634 

635 Might be wrong. Don't rely on this. 

636 """ 

637 if "Section" not in self.dsc: 637 ↛ 639line 637 didn't jump to line 639, because the condition on line 637 was never false

638 return "main" 

639 fields = self.dsc["Section"].split("/") 

640 if len(fields) > 1: 

641 return fields[0] 

642 return "main" 

643 

644 @property 

645 def filename(self) -> str: 

646 """filename of .dsc file""" 

647 return self._dsc_file.filename