1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""module to handle uploads not yet installed to the archive 

18 

19This module provides classes to handle uploads not yet installed to the 

20archive. Central is the :class:`Changes` class which represents a changes file. 

21It provides methods to access the included binary and source packages. 

22""" 

23 

24import apt_inst 

25import apt_pkg 

26import errno 

27import functools 

28import os 

29from collections.abc import Mapping 

30from typing import Optional, TYPE_CHECKING 

31 

32from daklib.aptversion import AptVersion 

33from daklib.gpg import SignedFile 

34from daklib.regexes import * 

35import daklib.dakapt 

36import daklib.packagelist 

37 

38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true

39 import datetime 

40 import re 

41 

42 

43class UploadException(Exception): 

44 pass 

45 

46 

47class InvalidChangesException(UploadException): 

48 pass 

49 

50 

51class InvalidBinaryException(UploadException): 

52 pass 

53 

54 

55class InvalidSourceException(UploadException): 

56 pass 

57 

58 

59class InvalidHashException(UploadException): 

60 def __init__(self, filename: str, hash_name: str, expected, actual): 

61 self.filename = filename 

62 self.hash_name = hash_name 

63 self.expected = expected 

64 self.actual = actual 

65 

66 def __str__(self): 

67 return ("Invalid {0} hash for {1}:\n" 

68 "According to the control file the {0} hash should be {2},\n" 

69 "but {1} has {3}.\n" 

70 "\n" 

71 "If you did not include {1} in your upload, a different version\n" 

72 "might already be known to the archive software.") \ 

73 .format(self.hash_name, self.filename, self.expected, self.actual) 

74 

75 

76class InvalidFilenameException(UploadException): 

77 def __init__(self, filename: str): 

78 self.filename: str = filename 

79 

80 def __str__(self): 

81 return "Invalid filename '{0}'.".format(self.filename) 

82 

83 

84class FileDoesNotExist(UploadException): 

85 def __init__(self, filename: str): 

86 self.filename = filename 

87 

88 def __str__(self): 

89 return "Refers to non-existing file '{0}'".format(self.filename) 

90 

91 

92class HashedFile: 

93 """file with checksums 

94 """ 

95 

96 def __init__(self, filename: str, size: int, md5sum: str, sha1sum: str, sha256sum: str, section: Optional[str] = None, priority: Optional[str] = None, input_filename: Optional[str] = None): 

97 self.filename: str = filename 

98 """name of the file""" 

99 

100 if input_filename is None: 100 ↛ 102line 100 didn't jump to line 102, because the condition on line 100 was never false

101 input_filename = filename 

102 self.input_filename: str = input_filename 

103 """name of the file on disk 

104 

105 Used for temporary files that should not be installed using their on-disk name. 

106 """ 

107 

108 self.size: int = size 

109 """size in bytes""" 

110 

111 self.md5sum: str = md5sum 

112 """MD5 hash in hexdigits""" 

113 

114 self.sha1sum: str = sha1sum 

115 """SHA1 hash in hexdigits""" 

116 

117 self.sha256sum: str = sha256sum 

118 """SHA256 hash in hexdigits""" 

119 

120 self.section: Optional[str] = section 

121 """section or :const:`None`""" 

122 

123 self.priority: Optional[str] = priority 

124 """priority or :const:`None`""" 

125 

126 @classmethod 

127 def from_file(cls, directory: str, filename: str, section: Optional[str] = None, priority: Optional[str] = None) -> 'HashedFile': 

128 """create with values for an existing file 

129 

130 Create a :class:`HashedFile` object that refers to an already existing file. 

131 

132 :param directory: directory the file is located in 

133 :param filename: filename 

134 :param section: optional section as given in .changes files 

135 :param priority: optional priority as given in .changes files 

136 :return: :class:`HashedFile` object for the given file 

137 """ 

138 path = os.path.join(directory, filename) 

139 with open(path, 'r') as fh: 

140 size = os.fstat(fh.fileno()).st_size 

141 hashes = daklib.dakapt.DakHashes(fh) 

142 return cls(filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority) 

143 

144 def check(self, directory: str) -> None: 

145 """Validate hashes 

146 

147 Check if size and hashes match the expected value. 

148 

149 :param directory: directory the file is located in 

150 :raises InvalidHashException: if there is a hash mismatch 

151 """ 

152 path = os.path.join(directory, self.input_filename) 

153 try: 

154 with open(path) as fh: 

155 self.check_fh(fh) 

156 except OSError as e: 

157 if e.errno == errno.ENOENT: 157 ↛ 159line 157 didn't jump to line 159, because the condition on line 157 was never false

158 raise FileDoesNotExist(self.input_filename) 

159 raise 

160 

161 def check_fh(self, fh) -> None: 

162 size = os.fstat(fh.fileno()).st_size 

163 fh.seek(0) 

164 hashes = daklib.dakapt.DakHashes(fh) 

165 

166 if size != self.size: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true

167 raise InvalidHashException(self.filename, 'size', self.size, size) 

168 

169 if hashes.md5 != self.md5sum: 169 ↛ 170line 169 didn't jump to line 170, because the condition on line 169 was never true

170 raise InvalidHashException(self.filename, 'md5sum', self.md5sum, hashes.md5) 

171 

172 if hashes.sha1 != self.sha1sum: 172 ↛ 173line 172 didn't jump to line 173, because the condition on line 172 was never true

173 raise InvalidHashException(self.filename, 'sha1sum', self.sha1sum, hashes.sha1) 

174 

175 if hashes.sha256 != self.sha256sum: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true

176 raise InvalidHashException(self.filename, 'sha256sum', self.sha256sum, hashes.sha256) 

177 

178 

179def parse_file_list( 

180 control: Mapping[str, str], 

181 has_priority_and_section: bool, 

182 safe_file_regexp: 're.Pattern' = re_file_safe, 

183 fields=('Files', 'Checksums-Sha1', 'Checksums-Sha256') 

184) -> dict[str, HashedFile]: 

185 """Parse Files and Checksums-* fields 

186 

187 :param control: control file to take fields from 

188 :param has_priority_and_section: Files field include section and priority 

189 (as in .changes) 

190 :return: dict mapping filenames to :class:`HashedFile` objects 

191 

192 :raises InvalidChangesException: missing fields or other grave errors 

193 """ 

194 entries = {} 

195 

196 for line in control.get(fields[0], "").split('\n'): 

197 if len(line) == 0: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true

198 continue 

199 

200 if has_priority_and_section: 

201 (md5sum, size, section, priority, filename) = line.split() 

202 entry = dict(md5sum=md5sum, size=int(size), section=section, priority=priority, filename=filename) 

203 else: 

204 (md5sum, size, filename) = line.split() 

205 entry = dict(md5sum=md5sum, size=int(size), filename=filename) 

206 

207 entries[filename] = entry 

208 

209 for line in control.get(fields[1], "").split('\n'): 

210 if len(line) == 0: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 continue 

212 (sha1sum, size, filename) = line.split() 

213 entry = entries.get(filename) 

214 if entry is None: 214 ↛ 215line 214 didn't jump to line 215, because the condition on line 214 was never true

215 raise InvalidChangesException('{0} is listed in {1}, but not in {2}.'.format(filename, fields[1], fields[0])) 

216 if entry is not None and entry.get('size', None) != int(size): 216 ↛ 217line 216 didn't jump to line 217, because the condition on line 216 was never true

217 raise InvalidChangesException('Size for {0} in {1} and {2} fields differ.'.format(filename, fields[0], fields[1])) 

218 entry['sha1sum'] = sha1sum 

219 

220 for line in control.get(fields[2], "").split('\n'): 

221 if len(line) == 0: 221 ↛ 222line 221 didn't jump to line 222, because the condition on line 221 was never true

222 continue 

223 (sha256sum, size, filename) = line.split() 

224 entry = entries.get(filename) 

225 if entry is None: 225 ↛ 226line 225 didn't jump to line 226, because the condition on line 225 was never true

226 raise InvalidChangesException('{0} is listed in {1}, but not in {2}.'.format(filename, fields[2], fields[0])) 

227 if entry is not None and entry.get('size', None) != int(size): 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true

228 raise InvalidChangesException('Size for {0} in {1} and {2} fields differ.'.format(filename, fields[0], fields[2])) 

229 entry['sha256sum'] = sha256sum 

230 

231 files = {} 

232 for entry in entries.values(): 

233 filename = entry['filename'] 

234 if 'size' not in entry: 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true

235 raise InvalidChangesException('No size for {0}.'.format(filename)) 

236 if 'md5sum' not in entry: 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true

237 raise InvalidChangesException('No md5sum for {0}.'.format(filename)) 

238 if 'sha1sum' not in entry: 238 ↛ 239line 238 didn't jump to line 239, because the condition on line 238 was never true

239 raise InvalidChangesException('No sha1sum for {0}.'.format(filename)) 

240 if 'sha256sum' not in entry: 240 ↛ 241line 240 didn't jump to line 241, because the condition on line 240 was never true

241 raise InvalidChangesException('No sha256sum for {0}.'.format(filename)) 

242 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 242 ↛ 243line 242 didn't jump to line 243, because the condition on line 242 was never true

243 raise InvalidChangesException(f"References file with unsafe filename '{filename}'.") 

244 files[filename] = HashedFile(**entry) 

245 

246 return files 

247 

248 

249@functools.total_ordering 

250class Changes: 

251 """Representation of a .changes file 

252 """ 

253 

254 def __init__(self, directory: str, filename: str, keyrings, require_signature: bool = True): 

255 if not re_file_safe.match(filename): 255 ↛ 256line 255 didn't jump to line 256, because the condition on line 255 was never true

256 raise InvalidChangesException('{0}: unsafe filename'.format(filename)) 

257 

258 self.directory: str = directory 

259 """directory the .changes is located in""" 

260 

261 self.filename: str = filename 

262 """name of the .changes file""" 

263 

264 with open(self.path, 'rb') as fd: 

265 data = fd.read() 

266 self.signature = SignedFile(data, keyrings, require_signature) 

267 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents) 

268 """dict to access fields of the .changes file""" 

269 

270 self._binaries: 'Optional[list[Binary]]' = None 

271 self._source: 'Optional[Source]' = None 

272 self._files: Optional[dict[str, HashedFile]] = None 

273 self._keyrings = keyrings 

274 self._require_signature: bool = require_signature 

275 

276 @property 

277 def path(self) -> str: 

278 """path to the .changes file""" 

279 return os.path.join(self.directory, self.filename) 

280 

281 @property 

282 def primary_fingerprint(self) -> str: 

283 """fingerprint of the key used for signing the .changes file""" 

284 return self.signature.primary_fingerprint 

285 

286 @property 

287 def valid_signature(self) -> bool: 

288 """:const:`True` if the .changes has a valid signature""" 

289 return self.signature.valid 

290 

291 @property 

292 def weak_signature(self) -> bool: 

293 """:const:`True` if the .changes was signed using a weak algorithm""" 

294 return self.signature.weak_signature 

295 

296 @property 

297 def signature_timestamp(self) -> 'datetime.datetime': 

298 return self.signature.signature_timestamp 

299 

300 @property 

301 def contents_sha1(self) -> str: 

302 return self.signature.contents_sha1 

303 

304 @property 

305 def architectures(self) -> list[str]: 

306 """list of architectures included in the upload""" 

307 return self.changes.get('Architecture', '').split() 

308 

309 @property 

310 def distributions(self) -> list[str]: 

311 """list of target distributions for the upload""" 

312 return self.changes['Distribution'].split() 

313 

314 @property 

315 def source(self) -> 'Optional[Source]': 

316 """included source or :const:`None`""" 

317 if self._source is None: 

318 source_files = [] 

319 for f in self.files.values(): 

320 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename): 

321 source_files.append(f) 

322 if len(source_files) > 0: 

323 self._source = Source(self.directory, source_files, self._keyrings, self._require_signature) 

324 return self._source 

325 

326 @property 

327 def sourceful(self) -> bool: 

328 """:const:`True` if the upload includes source""" 

329 return "source" in self.architectures 

330 

331 @property 

332 def source_name(self) -> str: 

333 """source package name""" 

334 return re_field_source.match(self.changes['Source']).group('package') 

335 

336 @property 

337 def binaries(self) -> 'list[Binary]': 

338 """included binary packages""" 

339 if self._binaries is None: 

340 self._binaries = [ 

341 Binary(self.directory, f) 

342 for f in self.files.values() 

343 if re_file_binary.match(f.filename) 

344 ] 

345 return self._binaries 

346 

347 @property 

348 def byhand_files(self) -> list[HashedFile]: 

349 """included byhand files""" 

350 byhand = [] 

351 

352 for f in self.files.values(): 

353 if f.section == 'byhand' or f.section[:4] == 'raw-': 353 ↛ 354line 353 didn't jump to line 354, because the condition on line 353 was never true

354 byhand.append(f) 

355 continue 

356 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename) or re_file_binary.match(f.filename): 

357 continue 

358 if re_file_buildinfo.match(f.filename): 358 ↛ 361line 358 didn't jump to line 361, because the condition on line 358 was never false

359 continue 

360 

361 raise InvalidChangesException("{0}: {1} looks like a byhand package, but is in section {2}".format(self.filename, f.filename, f.section)) 

362 

363 return byhand 

364 

365 @property 

366 def buildinfo_files(self) -> list[HashedFile]: 

367 """included buildinfo files""" 

368 return [ 

369 f for f in self.files.values() 

370 if re_file_buildinfo.match(f.filename) 

371 ] 

372 

373 @property 

374 def binary_names(self) -> list[str]: 

375 """names of included binary packages""" 

376 return self.changes.get('Binary', '').split() 

377 

378 @property 

379 def closed_bugs(self) -> list[str]: 

380 """bugs closed by this upload""" 

381 return self.changes.get('Closes', '').split() 

382 

383 @property 

384 def files(self) -> dict[str, HashedFile]: 

385 """dict mapping filenames to :class:`HashedFile` objects""" 

386 if self._files is None: 

387 self._files = parse_file_list(self.changes, True) 

388 return self._files 

389 

390 @property 

391 def bytes(self) -> int: 

392 """total size of files included in this upload in bytes""" 

393 return sum(f.size for f in self.files.values()) 

394 

395 def _key(self) -> tuple[str, AptVersion, bool, str]: 

396 """tuple used to compare two changes files 

397 

398 We sort by source name and version first. If these are identical, 

399 we sort changes that include source before those without source (so 

400 that sourceful uploads get processed first), and finally fall back 

401 to the filename (this should really never happen). 

402 """ 

403 return ( 

404 self.changes.get('Source', ''), 

405 AptVersion(self.changes.get('Version', '')), 

406 not self.sourceful, 

407 self.filename 

408 ) 

409 

410 def __eq__(self, other: object) -> bool: 

411 if not isinstance(other, Changes): 411 ↛ 412line 411 didn't jump to line 412, because the condition on line 411 was never true

412 return NotImplemented 

413 return self._key() == other._key() 

414 

415 def __lt__(self, other: 'Changes') -> bool: 

416 return self._key() < other._key() 

417 

418 

419class Binary: 

420 """Representation of a binary package 

421 """ 

422 

423 def __init__(self, directory: str, hashed_file: HashedFile): 

424 self.hashed_file: HashedFile = hashed_file 

425 """file object for the .deb""" 

426 

427 path = os.path.join(directory, hashed_file.input_filename) 

428 data = apt_inst.DebFile(path).control.extractdata("control") 

429 

430 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data) 

431 """dict to access fields in DEBIAN/control""" 

432 

433 @classmethod 

434 def from_file(cls, directory, filename) -> 'Binary': 

435 hashed_file = HashedFile.from_file(directory, filename) 

436 return cls(directory, hashed_file) 

437 

438 @property 

439 def source(self) -> tuple[str, str]: 

440 """get tuple with source package name and version""" 

441 source = self.control.get("Source", None) 

442 if source is None: 

443 return (self.control["Package"], self.control["Version"]) 

444 match = re_field_source.match(source) 

445 if not match: 445 ↛ 446line 445 didn't jump to line 446, because the condition on line 445 was never true

446 raise InvalidBinaryException('{0}: Invalid Source field.'.format(self.hashed_file.filename)) 

447 version = match.group('version') 

448 if version is None: 

449 version = self.control['Version'] 

450 return (match.group('package'), version) 

451 

452 @property 

453 def name(self) -> str: 

454 return self.control['Package'] 

455 

456 @property 

457 def type(self) -> str: 

458 """package type ('deb' or 'udeb')""" 

459 match = re_file_binary.match(self.hashed_file.filename) 

460 if not match: 460 ↛ 461line 460 didn't jump to line 461, because the condition on line 460 was never true

461 raise InvalidBinaryException('{0}: Does not match re_file_binary'.format(self.hashed_file.filename)) 

462 return match.group('type') 

463 

464 @property 

465 def component(self) -> str: 

466 """component name""" 

467 fields = self.control['Section'].split('/') 

468 if len(fields) > 1: 

469 return fields[0] 

470 return "main" 

471 

472 

473class Source: 

474 """Representation of a source package 

475 """ 

476 

477 def __init__(self, directory: str, hashed_files: list[HashedFile], keyrings, require_signature=True): 

478 self.hashed_files: list[HashedFile] = hashed_files 

479 """list of source files (including the .dsc itself)""" 

480 

481 dsc_file = None 

482 for f in hashed_files: 

483 if re_file_dsc.match(f.filename): 

484 if dsc_file is not None: 484 ↛ 485line 484 didn't jump to line 485, because the condition on line 484 was never true

485 raise InvalidSourceException("Multiple .dsc found ({0} and {1})".format(self._dsc_file.filename, f.filename)) 

486 else: 

487 dsc_file = f 

488 

489 if dsc_file is None: 489 ↛ 490line 489 didn't jump to line 490, because the condition on line 489 was never true

490 raise InvalidSourceException("No .dsc included in source files") 

491 self._dsc_file: HashedFile = dsc_file 

492 

493 # make sure the hash for the dsc is valid before we use it 

494 self._dsc_file.check(directory) 

495 

496 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename) 

497 with open(dsc_file_path, 'rb') as fd: 

498 data = fd.read() 

499 self.signature = SignedFile(data, keyrings, require_signature) 

500 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents) 

501 """dict to access fields in the .dsc file""" 

502 

503 self.package_list: daklib.packagelist.PackageList = daklib.packagelist.PackageList(self.dsc) 

504 """Information about packages built by the source.""" 

505 

506 self._files: Optional[dict[str, HashedFile]] = None 

507 

508 @classmethod 

509 def from_file(cls, directory, filename, keyrings, require_signature=True) -> 'Source': 

510 hashed_file = HashedFile.from_file(directory, filename) 

511 return cls(directory, [hashed_file], keyrings, require_signature) 

512 

513 @property 

514 def files(self) -> dict[str, HashedFile]: 

515 """dict mapping filenames to :class:`HashedFile` objects for additional source files 

516 

517 This list does not include the .dsc itself. 

518 """ 

519 if self._files is None: 

520 self._files = parse_file_list(self.dsc, False) 

521 return self._files 

522 

523 @property 

524 def primary_fingerprint(self) -> str: 

525 """fingerprint of the key used to sign the .dsc""" 

526 return self.signature.primary_fingerprint 

527 

528 @property 

529 def valid_signature(self) -> bool: 

530 """:const:`True` if the .dsc has a valid signature""" 

531 return self.signature.valid 

532 

533 @property 

534 def weak_signature(self) -> bool: 

535 """:const:`True` if the .dsc was signed using a weak algorithm""" 

536 return self.signature.weak_signature 

537 

538 @property 

539 def component(self) -> str: 

540 """guessed component name 

541 

542 Might be wrong. Don't rely on this. 

543 """ 

544 if 'Section' not in self.dsc: 544 ↛ 546line 544 didn't jump to line 546, because the condition on line 544 was never false

545 return 'main' 

546 fields = self.dsc['Section'].split('/') 

547 if len(fields) > 1: 

548 return fields[0] 

549 return "main" 

550 

551 @property 

552 def filename(self) -> str: 

553 """filename of .dsc file""" 

554 return self._dsc_file.filename