1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""module to handle uploads not yet installed to the archive 

18 

19This module provides classes to handle uploads not yet installed to the 

20archive. Central is the :class:`Changes` class which represents a changes file. 

21It provides methods to access the included binary and source packages. 

22""" 

23 

24import errno 

25import functools 

26import os 

27from collections.abc import Mapping 

28from typing import TYPE_CHECKING, Optional 

29 

30import apt_inst 

31import apt_pkg 

32 

33import daklib.dakapt 

34import daklib.packagelist 

35from daklib.aptversion import AptVersion 

36from daklib.gpg import SignedFile 

37from daklib.regexes import ( 

38 re_field_source, 

39 re_file_binary, 

40 re_file_buildinfo, 

41 re_file_dsc, 

42 re_file_safe, 

43 re_file_source, 

44) 

45 

46if TYPE_CHECKING: 46 ↛ 47line 46 didn't jump to line 47, because the condition on line 46 was never true

47 import datetime 

48 import re 

49 

50 

51class UploadException(Exception): 

52 pass 

53 

54 

55class InvalidChangesException(UploadException): 

56 pass 

57 

58 

59class InvalidBinaryException(UploadException): 

60 pass 

61 

62 

63class InvalidSourceException(UploadException): 

64 pass 

65 

66 

67class InvalidHashException(UploadException): 

68 def __init__(self, filename: str, hash_name: str, expected, actual): 

69 self.filename = filename 

70 self.hash_name = hash_name 

71 self.expected = expected 

72 self.actual = actual 

73 

74 def __str__(self): 

75 return ( 

76 "Invalid {0} hash for {1}:\n" 

77 "According to the control file the {0} hash should be {2},\n" 

78 "but {1} has {3}.\n" 

79 "\n" 

80 "If you did not include {1} in your upload, a different version\n" 

81 "might already be known to the archive software." 

82 ).format(self.hash_name, self.filename, self.expected, self.actual) 

83 

84 

85class InvalidFilenameException(UploadException): 

86 def __init__(self, filename: str): 

87 self.filename: str = filename 

88 

89 def __str__(self): 

90 return "Invalid filename '{0}'.".format(self.filename) 

91 

92 

93class FileDoesNotExist(UploadException): 

94 def __init__(self, filename: str): 

95 self.filename = filename 

96 

97 def __str__(self): 

98 return "Refers to non-existing file '{0}'".format(self.filename) 

99 

100 

101class HashedFile: 

102 """file with checksums""" 

103 

104 def __init__( 

105 self, 

106 filename: str, 

107 size: int, 

108 md5sum: str, 

109 sha1sum: str, 

110 sha256sum: str, 

111 section: Optional[str] = None, 

112 priority: Optional[str] = None, 

113 input_filename: Optional[str] = None, 

114 ): 

115 self.filename: str = filename 

116 """name of the file""" 

117 

118 if input_filename is None: 118 ↛ 120line 118 didn't jump to line 120, because the condition on line 118 was never false

119 input_filename = filename 

120 self.input_filename: str = input_filename 

121 """name of the file on disk 

122 

123 Used for temporary files that should not be installed using their on-disk name. 

124 """ 

125 

126 self.size: int = size 

127 """size in bytes""" 

128 

129 self.md5sum: str = md5sum 

130 """MD5 hash in hexdigits""" 

131 

132 self.sha1sum: str = sha1sum 

133 """SHA1 hash in hexdigits""" 

134 

135 self.sha256sum: str = sha256sum 

136 """SHA256 hash in hexdigits""" 

137 

138 self.section: Optional[str] = section 

139 """section or :const:`None`""" 

140 

141 self.priority: Optional[str] = priority 

142 """priority or :const:`None`""" 

143 

144 @classmethod 

145 def from_file( 

146 cls, 

147 directory: str, 

148 filename: str, 

149 section: Optional[str] = None, 

150 priority: Optional[str] = None, 

151 ) -> "HashedFile": 

152 """create with values for an existing file 

153 

154 Create a :class:`HashedFile` object that refers to an already existing file. 

155 

156 :param directory: directory the file is located in 

157 :param filename: filename 

158 :param section: optional section as given in .changes files 

159 :param priority: optional priority as given in .changes files 

160 :return: :class:`HashedFile` object for the given file 

161 """ 

162 path = os.path.join(directory, filename) 

163 with open(path, "r") as fh: 

164 size = os.fstat(fh.fileno()).st_size 

165 hashes = daklib.dakapt.DakHashes(fh) 

166 return cls( 

167 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority 

168 ) 

169 

170 def check(self, directory: str) -> None: 

171 """Validate hashes 

172 

173 Check if size and hashes match the expected value. 

174 

175 :param directory: directory the file is located in 

176 :raises InvalidHashException: if there is a hash mismatch 

177 """ 

178 path = os.path.join(directory, self.input_filename) 

179 try: 

180 with open(path) as fh: 

181 self.check_fh(fh) 

182 except OSError as e: 

183 if e.errno == errno.ENOENT: 183 ↛ 185line 183 didn't jump to line 185, because the condition on line 183 was never false

184 raise FileDoesNotExist(self.input_filename) 

185 raise 

186 

187 def check_fh(self, fh) -> None: 

188 size = os.fstat(fh.fileno()).st_size 

189 fh.seek(0) 

190 hashes = daklib.dakapt.DakHashes(fh) 

191 

192 if size != self.size: 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true

193 raise InvalidHashException(self.filename, "size", self.size, size) 

194 

195 if hashes.md5 != self.md5sum: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true

196 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5) 

197 

198 if hashes.sha1 != self.sha1sum: 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true

199 raise InvalidHashException( 

200 self.filename, "sha1sum", self.sha1sum, hashes.sha1 

201 ) 

202 

203 if hashes.sha256 != self.sha256sum: 203 ↛ 204line 203 didn't jump to line 204, because the condition on line 203 was never true

204 raise InvalidHashException( 

205 self.filename, "sha256sum", self.sha256sum, hashes.sha256 

206 ) 

207 

208 

209def parse_file_list( 

210 control: Mapping[str, str], 

211 has_priority_and_section: bool, 

212 safe_file_regexp: "re.Pattern" = re_file_safe, 

213 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"), 

214) -> dict[str, HashedFile]: 

215 """Parse Files and Checksums-* fields 

216 

217 :param control: control file to take fields from 

218 :param has_priority_and_section: Files field include section and priority 

219 (as in .changes) 

220 :return: dict mapping filenames to :class:`HashedFile` objects 

221 

222 :raises InvalidChangesException: missing fields or other grave errors 

223 """ 

224 entries = {} 

225 

226 for line in control.get(fields[0], "").split("\n"): 

227 if len(line) == 0: 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true

228 continue 

229 

230 if has_priority_and_section: 

231 (md5sum, size, section, priority, filename) = line.split() 

232 entry = dict( 

233 md5sum=md5sum, 

234 size=int(size), 

235 section=section, 

236 priority=priority, 

237 filename=filename, 

238 ) 

239 else: 

240 (md5sum, size, filename) = line.split() 

241 entry = dict(md5sum=md5sum, size=int(size), filename=filename) 

242 

243 entries[filename] = entry 

244 

245 for line in control.get(fields[1], "").split("\n"): 

246 if len(line) == 0: 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true

247 continue 

248 (sha1sum, size, filename) = line.split() 

249 entry = entries.get(filename) 

250 if entry is None: 250 ↛ 251line 250 didn't jump to line 251, because the condition on line 250 was never true

251 raise InvalidChangesException( 

252 "{0} is listed in {1}, but not in {2}.".format( 

253 filename, fields[1], fields[0] 

254 ) 

255 ) 

256 if entry is not None and entry.get("size", None) != int(size): 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true

257 raise InvalidChangesException( 

258 "Size for {0} in {1} and {2} fields differ.".format( 

259 filename, fields[0], fields[1] 

260 ) 

261 ) 

262 entry["sha1sum"] = sha1sum 

263 

264 for line in control.get(fields[2], "").split("\n"): 

265 if len(line) == 0: 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true

266 continue 

267 (sha256sum, size, filename) = line.split() 

268 entry = entries.get(filename) 

269 if entry is None: 269 ↛ 270line 269 didn't jump to line 270, because the condition on line 269 was never true

270 raise InvalidChangesException( 

271 "{0} is listed in {1}, but not in {2}.".format( 

272 filename, fields[2], fields[0] 

273 ) 

274 ) 

275 if entry is not None and entry.get("size", None) != int(size): 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true

276 raise InvalidChangesException( 

277 "Size for {0} in {1} and {2} fields differ.".format( 

278 filename, fields[0], fields[2] 

279 ) 

280 ) 

281 entry["sha256sum"] = sha256sum 

282 

283 files = {} 

284 for entry in entries.values(): 

285 filename = entry["filename"] 

286 if "size" not in entry: 286 ↛ 287line 286 didn't jump to line 287, because the condition on line 286 was never true

287 raise InvalidChangesException("No size for {0}.".format(filename)) 

288 if "md5sum" not in entry: 288 ↛ 289line 288 didn't jump to line 289, because the condition on line 288 was never true

289 raise InvalidChangesException("No md5sum for {0}.".format(filename)) 

290 if "sha1sum" not in entry: 290 ↛ 291line 290 didn't jump to line 291, because the condition on line 290 was never true

291 raise InvalidChangesException("No sha1sum for {0}.".format(filename)) 

292 if "sha256sum" not in entry: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true

293 raise InvalidChangesException("No sha256sum for {0}.".format(filename)) 

294 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true

295 raise InvalidChangesException( 

296 f"References file with unsafe filename '{filename}'." 

297 ) 

298 files[filename] = HashedFile(**entry) 

299 

300 return files 

301 

302 

303@functools.total_ordering 

304class Changes: 

305 """Representation of a .changes file""" 

306 

307 def __init__( 

308 self, directory: str, filename: str, keyrings, require_signature: bool = True 

309 ): 

310 if not re_file_safe.match(filename): 310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true

311 raise InvalidChangesException("{0}: unsafe filename".format(filename)) 

312 

313 self.directory: str = directory 

314 """directory the .changes is located in""" 

315 

316 self.filename: str = filename 

317 """name of the .changes file""" 

318 

319 with open(self.path, "rb") as fd: 

320 data = fd.read() 

321 self.signature = SignedFile(data, keyrings, require_signature) 

322 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents) 

323 """dict to access fields of the .changes file""" 

324 

325 self._binaries: "Optional[list[Binary]]" = None 

326 self._source: "Optional[Source]" = None 

327 self._files: Optional[dict[str, HashedFile]] = None 

328 self._keyrings = keyrings 

329 self._require_signature: bool = require_signature 

330 

331 @property 

332 def path(self) -> str: 

333 """path to the .changes file""" 

334 return os.path.join(self.directory, self.filename) 

335 

336 @property 

337 def primary_fingerprint(self) -> str: 

338 """fingerprint of the key used for signing the .changes file""" 

339 return self.signature.primary_fingerprint 

340 

341 @property 

342 def valid_signature(self) -> bool: 

343 """:const:`True` if the .changes has a valid signature""" 

344 return self.signature.valid 

345 

346 @property 

347 def weak_signature(self) -> bool: 

348 """:const:`True` if the .changes was signed using a weak algorithm""" 

349 return self.signature.weak_signature 

350 

351 @property 

352 def signature_timestamp(self) -> "datetime.datetime": 

353 return self.signature.signature_timestamp 

354 

355 @property 

356 def contents_sha1(self) -> str: 

357 return self.signature.contents_sha1 

358 

359 @property 

360 def architectures(self) -> list[str]: 

361 """list of architectures included in the upload""" 

362 return self.changes.get("Architecture", "").split() 

363 

364 @property 

365 def distributions(self) -> list[str]: 

366 """list of target distributions for the upload""" 

367 return self.changes["Distribution"].split() 

368 

369 @property 

370 def source(self) -> "Optional[Source]": 

371 """included source or :const:`None`""" 

372 if self._source is None: 

373 source_files = [] 

374 for f in self.files.values(): 

375 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename): 

376 source_files.append(f) 

377 if len(source_files) > 0: 

378 self._source = Source( 

379 self.directory, 

380 source_files, 

381 self._keyrings, 

382 self._require_signature, 

383 ) 

384 return self._source 

385 

386 @property 

387 def sourceful(self) -> bool: 

388 """:const:`True` if the upload includes source""" 

389 return "source" in self.architectures 

390 

391 @property 

392 def source_name(self) -> str: 

393 """source package name""" 

394 return re_field_source.match(self.changes["Source"]).group("package") 

395 

396 @property 

397 def binaries(self) -> "list[Binary]": 

398 """included binary packages""" 

399 if self._binaries is None: 

400 self._binaries = [ 

401 Binary(self.directory, f) 

402 for f in self.files.values() 

403 if re_file_binary.match(f.filename) 

404 ] 

405 return self._binaries 

406 

407 @property 

408 def byhand_files(self) -> list[HashedFile]: 

409 """included byhand files""" 

410 byhand = [] 

411 

412 for f in self.files.values(): 

413 if f.section == "byhand" or f.section[:4] == "raw-": 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true

414 byhand.append(f) 

415 continue 

416 if ( 

417 re_file_dsc.match(f.filename) 

418 or re_file_source.match(f.filename) 

419 or re_file_binary.match(f.filename) 

420 ): 

421 continue 

422 if re_file_buildinfo.match(f.filename): 422 ↛ 425line 422 didn't jump to line 425, because the condition on line 422 was never false

423 continue 

424 

425 raise InvalidChangesException( 

426 "{0}: {1} looks like a byhand package, but is in section {2}".format( 

427 self.filename, f.filename, f.section 

428 ) 

429 ) 

430 

431 return byhand 

432 

433 @property 

434 def buildinfo_files(self) -> list[HashedFile]: 

435 """included buildinfo files""" 

436 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)] 

437 

438 @property 

439 def binary_names(self) -> list[str]: 

440 """names of included binary packages""" 

441 return self.changes.get("Binary", "").split() 

442 

443 @property 

444 def closed_bugs(self) -> list[str]: 

445 """bugs closed by this upload""" 

446 return self.changes.get("Closes", "").split() 

447 

448 @property 

449 def files(self) -> dict[str, HashedFile]: 

450 """dict mapping filenames to :class:`HashedFile` objects""" 

451 if self._files is None: 

452 self._files = parse_file_list(self.changes, True) 

453 return self._files 

454 

455 @property 

456 def bytes(self) -> int: 

457 """total size of files included in this upload in bytes""" 

458 return sum(f.size for f in self.files.values()) 

459 

460 def _key(self) -> tuple[str, AptVersion, bool, str]: 

461 """tuple used to compare two changes files 

462 

463 We sort by source name and version first. If these are identical, 

464 we sort changes that include source before those without source (so 

465 that sourceful uploads get processed first), and finally fall back 

466 to the filename (this should really never happen). 

467 """ 

468 return ( 

469 self.changes.get("Source", ""), 

470 AptVersion(self.changes.get("Version", "")), 

471 not self.sourceful, 

472 self.filename, 

473 ) 

474 

475 def __eq__(self, other: object) -> bool: 

476 if not isinstance(other, Changes): 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true

477 return NotImplemented 

478 return self._key() == other._key() 

479 

480 def __lt__(self, other: "Changes") -> bool: 

481 return self._key() < other._key() 

482 

483 

484class Binary: 

485 """Representation of a binary package""" 

486 

487 def __init__(self, directory: str, hashed_file: HashedFile): 

488 self.hashed_file: HashedFile = hashed_file 

489 """file object for the .deb""" 

490 

491 path = os.path.join(directory, hashed_file.input_filename) 

492 data = apt_inst.DebFile(path).control.extractdata("control") 

493 

494 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data) 

495 """dict to access fields in DEBIAN/control""" 

496 

497 @classmethod 

498 def from_file(cls, directory, filename) -> "Binary": 

499 hashed_file = HashedFile.from_file(directory, filename) 

500 return cls(directory, hashed_file) 

501 

502 @property 

503 def source(self) -> tuple[str, str]: 

504 """get tuple with source package name and version""" 

505 source = self.control.get("Source", None) 

506 if source is None: 

507 return (self.control["Package"], self.control["Version"]) 

508 match = re_field_source.match(source) 

509 if not match: 509 ↛ 510line 509 didn't jump to line 510, because the condition on line 509 was never true

510 raise InvalidBinaryException( 

511 "{0}: Invalid Source field.".format(self.hashed_file.filename) 

512 ) 

513 version = match.group("version") 

514 if version is None: 

515 version = self.control["Version"] 

516 return (match.group("package"), version) 

517 

518 @property 

519 def name(self) -> str: 

520 return self.control["Package"] 

521 

522 @property 

523 def type(self) -> str: 

524 """package type ('deb' or 'udeb')""" 

525 match = re_file_binary.match(self.hashed_file.filename) 

526 if not match: 526 ↛ 527line 526 didn't jump to line 527, because the condition on line 526 was never true

527 raise InvalidBinaryException( 

528 "{0}: Does not match re_file_binary".format(self.hashed_file.filename) 

529 ) 

530 return match.group("type") 

531 

532 @property 

533 def component(self) -> str: 

534 """component name""" 

535 fields = self.control["Section"].split("/") 

536 if len(fields) > 1: 

537 return fields[0] 

538 return "main" 

539 

540 

541class Source: 

542 """Representation of a source package""" 

543 

544 def __init__( 

545 self, 

546 directory: str, 

547 hashed_files: list[HashedFile], 

548 keyrings, 

549 require_signature=True, 

550 ): 

551 self.hashed_files: list[HashedFile] = hashed_files 

552 """list of source files (including the .dsc itself)""" 

553 

554 dsc_file = None 

555 for f in hashed_files: 

556 if re_file_dsc.match(f.filename): 

557 if dsc_file is not None: 557 ↛ 558line 557 didn't jump to line 558, because the condition on line 557 was never true

558 raise InvalidSourceException( 

559 "Multiple .dsc found ({0} and {1})".format( 

560 self._dsc_file.filename, f.filename 

561 ) 

562 ) 

563 else: 

564 dsc_file = f 

565 

566 if dsc_file is None: 566 ↛ 567line 566 didn't jump to line 567, because the condition on line 566 was never true

567 raise InvalidSourceException("No .dsc included in source files") 

568 self._dsc_file: HashedFile = dsc_file 

569 

570 # make sure the hash for the dsc is valid before we use it 

571 self._dsc_file.check(directory) 

572 

573 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename) 

574 with open(dsc_file_path, "rb") as fd: 

575 data = fd.read() 

576 self.signature = SignedFile(data, keyrings, require_signature) 

577 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents) 

578 """dict to access fields in the .dsc file""" 

579 

580 self.package_list: daklib.packagelist.PackageList = ( 

581 daklib.packagelist.PackageList(self.dsc) 

582 ) 

583 """Information about packages built by the source.""" 

584 

585 self._files: Optional[dict[str, HashedFile]] = None 

586 

587 @classmethod 

588 def from_file( 

589 cls, directory, filename, keyrings, require_signature=True 

590 ) -> "Source": 

591 hashed_file = HashedFile.from_file(directory, filename) 

592 return cls(directory, [hashed_file], keyrings, require_signature) 

593 

594 @property 

595 def files(self) -> dict[str, HashedFile]: 

596 """dict mapping filenames to :class:`HashedFile` objects for additional source files 

597 

598 This list does not include the .dsc itself. 

599 """ 

600 if self._files is None: 

601 self._files = parse_file_list(self.dsc, False) 

602 return self._files 

603 

604 @property 

605 def primary_fingerprint(self) -> str: 

606 """fingerprint of the key used to sign the .dsc""" 

607 return self.signature.primary_fingerprint 

608 

609 @property 

610 def valid_signature(self) -> bool: 

611 """:const:`True` if the .dsc has a valid signature""" 

612 return self.signature.valid 

613 

614 @property 

615 def weak_signature(self) -> bool: 

616 """:const:`True` if the .dsc was signed using a weak algorithm""" 

617 return self.signature.weak_signature 

618 

619 @property 

620 def component(self) -> str: 

621 """guessed component name 

622 

623 Might be wrong. Don't rely on this. 

624 """ 

625 if "Section" not in self.dsc: 625 ↛ 627line 625 didn't jump to line 627, because the condition on line 625 was never false

626 return "main" 

627 fields = self.dsc["Section"].split("/") 

628 if len(fields) > 1: 

629 return fields[0] 

630 return "main" 

631 

632 @property 

633 def filename(self) -> str: 

634 """filename of .dsc file""" 

635 return self._dsc_file.filename