1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to handle uploads not yet installed to the archive
19This module provides classes to handle uploads not yet installed to the
20archive. Central is the :class:`Changes` class which represents a changes file.
21It provides methods to access the included binary and source packages.
22"""
24import apt_inst
25import apt_pkg
26import errno
27import functools
28import os
29from collections.abc import Mapping
30from typing import Optional, TYPE_CHECKING
32from daklib.aptversion import AptVersion
33from daklib.gpg import SignedFile
34from daklib.regexes import *
35import daklib.dakapt
36import daklib.packagelist
38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true
39 import datetime
40 import re
43class UploadException(Exception):
44 pass
47class InvalidChangesException(UploadException):
48 pass
51class InvalidBinaryException(UploadException):
52 pass
55class InvalidSourceException(UploadException):
56 pass
59class InvalidHashException(UploadException):
60 def __init__(self, filename: str, hash_name: str, expected, actual):
61 self.filename = filename
62 self.hash_name = hash_name
63 self.expected = expected
64 self.actual = actual
66 def __str__(self):
67 return ("Invalid {0} hash for {1}:\n"
68 "According to the control file the {0} hash should be {2},\n"
69 "but {1} has {3}.\n"
70 "\n"
71 "If you did not include {1} in your upload, a different version\n"
72 "might already be known to the archive software.") \
73 .format(self.hash_name, self.filename, self.expected, self.actual)
76class InvalidFilenameException(UploadException):
77 def __init__(self, filename: str):
78 self.filename: str = filename
80 def __str__(self):
81 return "Invalid filename '{0}'.".format(self.filename)
84class FileDoesNotExist(UploadException):
85 def __init__(self, filename: str):
86 self.filename = filename
88 def __str__(self):
89 return "Refers to non-existing file '{0}'".format(self.filename)
92class HashedFile:
93 """file with checksums
94 """
96 def __init__(self, filename: str, size: int, md5sum: str, sha1sum: str, sha256sum: str, section: Optional[str] = None, priority: Optional[str] = None, input_filename: Optional[str] = None):
97 self.filename: str = filename
98 """name of the file"""
100 if input_filename is None: 100 ↛ 102line 100 didn't jump to line 102, because the condition on line 100 was never false
101 input_filename = filename
102 self.input_filename: str = input_filename
103 """name of the file on disk
105 Used for temporary files that should not be installed using their on-disk name.
106 """
108 self.size: int = size
109 """size in bytes"""
111 self.md5sum: str = md5sum
112 """MD5 hash in hexdigits"""
114 self.sha1sum: str = sha1sum
115 """SHA1 hash in hexdigits"""
117 self.sha256sum: str = sha256sum
118 """SHA256 hash in hexdigits"""
120 self.section: Optional[str] = section
121 """section or :const:`None`"""
123 self.priority: Optional[str] = priority
124 """priority or :const:`None`"""
126 @classmethod
127 def from_file(cls, directory: str, filename: str, section: Optional[str] = None, priority: Optional[str] = None) -> 'HashedFile':
128 """create with values for an existing file
130 Create a :class:`HashedFile` object that refers to an already existing file.
132 :param directory: directory the file is located in
133 :param filename: filename
134 :param section: optional section as given in .changes files
135 :param priority: optional priority as given in .changes files
136 :return: :class:`HashedFile` object for the given file
137 """
138 path = os.path.join(directory, filename)
139 with open(path, 'r') as fh:
140 size = os.fstat(fh.fileno()).st_size
141 hashes = daklib.dakapt.DakHashes(fh)
142 return cls(filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority)
144 def check(self, directory: str) -> None:
145 """Validate hashes
147 Check if size and hashes match the expected value.
149 :param directory: directory the file is located in
150 :raises InvalidHashException: if there is a hash mismatch
151 """
152 path = os.path.join(directory, self.input_filename)
153 try:
154 with open(path) as fh:
155 self.check_fh(fh)
156 except OSError as e:
157 if e.errno == errno.ENOENT: 157 ↛ 159line 157 didn't jump to line 159, because the condition on line 157 was never false
158 raise FileDoesNotExist(self.input_filename)
159 raise
161 def check_fh(self, fh) -> None:
162 size = os.fstat(fh.fileno()).st_size
163 fh.seek(0)
164 hashes = daklib.dakapt.DakHashes(fh)
166 if size != self.size: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true
167 raise InvalidHashException(self.filename, 'size', self.size, size)
169 if hashes.md5 != self.md5sum: 169 ↛ 170line 169 didn't jump to line 170, because the condition on line 169 was never true
170 raise InvalidHashException(self.filename, 'md5sum', self.md5sum, hashes.md5)
172 if hashes.sha1 != self.sha1sum: 172 ↛ 173line 172 didn't jump to line 173, because the condition on line 172 was never true
173 raise InvalidHashException(self.filename, 'sha1sum', self.sha1sum, hashes.sha1)
175 if hashes.sha256 != self.sha256sum: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true
176 raise InvalidHashException(self.filename, 'sha256sum', self.sha256sum, hashes.sha256)
179def parse_file_list(
180 control: Mapping[str, str],
181 has_priority_and_section: bool,
182 safe_file_regexp: 're.Pattern' = re_file_safe,
183 fields=('Files', 'Checksums-Sha1', 'Checksums-Sha256')
184) -> dict[str, HashedFile]:
185 """Parse Files and Checksums-* fields
187 :param control: control file to take fields from
188 :param has_priority_and_section: Files field include section and priority
189 (as in .changes)
190 :return: dict mapping filenames to :class:`HashedFile` objects
192 :raises InvalidChangesException: missing fields or other grave errors
193 """
194 entries = {}
196 for line in control.get(fields[0], "").split('\n'):
197 if len(line) == 0: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true
198 continue
200 if has_priority_and_section:
201 (md5sum, size, section, priority, filename) = line.split()
202 entry = dict(md5sum=md5sum, size=int(size), section=section, priority=priority, filename=filename)
203 else:
204 (md5sum, size, filename) = line.split()
205 entry = dict(md5sum=md5sum, size=int(size), filename=filename)
207 entries[filename] = entry
209 for line in control.get(fields[1], "").split('\n'):
210 if len(line) == 0: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true
211 continue
212 (sha1sum, size, filename) = line.split()
213 entry = entries.get(filename)
214 if entry is None: 214 ↛ 215line 214 didn't jump to line 215, because the condition on line 214 was never true
215 raise InvalidChangesException('{0} is listed in {1}, but not in {2}.'.format(filename, fields[1], fields[0]))
216 if entry is not None and entry.get('size', None) != int(size): 216 ↛ 217line 216 didn't jump to line 217, because the condition on line 216 was never true
217 raise InvalidChangesException('Size for {0} in {1} and {2} fields differ.'.format(filename, fields[0], fields[1]))
218 entry['sha1sum'] = sha1sum
220 for line in control.get(fields[2], "").split('\n'):
221 if len(line) == 0: 221 ↛ 222line 221 didn't jump to line 222, because the condition on line 221 was never true
222 continue
223 (sha256sum, size, filename) = line.split()
224 entry = entries.get(filename)
225 if entry is None: 225 ↛ 226line 225 didn't jump to line 226, because the condition on line 225 was never true
226 raise InvalidChangesException('{0} is listed in {1}, but not in {2}.'.format(filename, fields[2], fields[0]))
227 if entry is not None and entry.get('size', None) != int(size): 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true
228 raise InvalidChangesException('Size for {0} in {1} and {2} fields differ.'.format(filename, fields[0], fields[2]))
229 entry['sha256sum'] = sha256sum
231 files = {}
232 for entry in entries.values():
233 filename = entry['filename']
234 if 'size' not in entry: 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true
235 raise InvalidChangesException('No size for {0}.'.format(filename))
236 if 'md5sum' not in entry: 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true
237 raise InvalidChangesException('No md5sum for {0}.'.format(filename))
238 if 'sha1sum' not in entry: 238 ↛ 239line 238 didn't jump to line 239, because the condition on line 238 was never true
239 raise InvalidChangesException('No sha1sum for {0}.'.format(filename))
240 if 'sha256sum' not in entry: 240 ↛ 241line 240 didn't jump to line 241, because the condition on line 240 was never true
241 raise InvalidChangesException('No sha256sum for {0}.'.format(filename))
242 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 242 ↛ 243line 242 didn't jump to line 243, because the condition on line 242 was never true
243 raise InvalidChangesException(f"References file with unsafe filename '{filename}'.")
244 files[filename] = HashedFile(**entry)
246 return files
249@functools.total_ordering
250class Changes:
251 """Representation of a .changes file
252 """
254 def __init__(self, directory: str, filename: str, keyrings, require_signature: bool = True):
255 if not re_file_safe.match(filename): 255 ↛ 256line 255 didn't jump to line 256, because the condition on line 255 was never true
256 raise InvalidChangesException('{0}: unsafe filename'.format(filename))
258 self.directory: str = directory
259 """directory the .changes is located in"""
261 self.filename: str = filename
262 """name of the .changes file"""
264 with open(self.path, 'rb') as fd:
265 data = fd.read()
266 self.signature = SignedFile(data, keyrings, require_signature)
267 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
268 """dict to access fields of the .changes file"""
270 self._binaries: 'Optional[list[Binary]]' = None
271 self._source: 'Optional[Source]' = None
272 self._files: Optional[dict[str, HashedFile]] = None
273 self._keyrings = keyrings
274 self._require_signature: bool = require_signature
276 @property
277 def path(self) -> str:
278 """path to the .changes file"""
279 return os.path.join(self.directory, self.filename)
281 @property
282 def primary_fingerprint(self) -> str:
283 """fingerprint of the key used for signing the .changes file"""
284 return self.signature.primary_fingerprint
286 @property
287 def valid_signature(self) -> bool:
288 """:const:`True` if the .changes has a valid signature"""
289 return self.signature.valid
291 @property
292 def weak_signature(self) -> bool:
293 """:const:`True` if the .changes was signed using a weak algorithm"""
294 return self.signature.weak_signature
296 @property
297 def signature_timestamp(self) -> 'datetime.datetime':
298 return self.signature.signature_timestamp
300 @property
301 def contents_sha1(self) -> str:
302 return self.signature.contents_sha1
304 @property
305 def architectures(self) -> list[str]:
306 """list of architectures included in the upload"""
307 return self.changes.get('Architecture', '').split()
309 @property
310 def distributions(self) -> list[str]:
311 """list of target distributions for the upload"""
312 return self.changes['Distribution'].split()
314 @property
315 def source(self) -> 'Optional[Source]':
316 """included source or :const:`None`"""
317 if self._source is None:
318 source_files = []
319 for f in self.files.values():
320 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
321 source_files.append(f)
322 if len(source_files) > 0:
323 self._source = Source(self.directory, source_files, self._keyrings, self._require_signature)
324 return self._source
326 @property
327 def sourceful(self) -> bool:
328 """:const:`True` if the upload includes source"""
329 return "source" in self.architectures
331 @property
332 def source_name(self) -> str:
333 """source package name"""
334 return re_field_source.match(self.changes['Source']).group('package')
336 @property
337 def binaries(self) -> 'list[Binary]':
338 """included binary packages"""
339 if self._binaries is None:
340 self._binaries = [
341 Binary(self.directory, f)
342 for f in self.files.values()
343 if re_file_binary.match(f.filename)
344 ]
345 return self._binaries
347 @property
348 def byhand_files(self) -> list[HashedFile]:
349 """included byhand files"""
350 byhand = []
352 for f in self.files.values():
353 if f.section == 'byhand' or f.section[:4] == 'raw-': 353 ↛ 354line 353 didn't jump to line 354, because the condition on line 353 was never true
354 byhand.append(f)
355 continue
356 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename) or re_file_binary.match(f.filename):
357 continue
358 if re_file_buildinfo.match(f.filename): 358 ↛ 361line 358 didn't jump to line 361, because the condition on line 358 was never false
359 continue
361 raise InvalidChangesException("{0}: {1} looks like a byhand package, but is in section {2}".format(self.filename, f.filename, f.section))
363 return byhand
365 @property
366 def buildinfo_files(self) -> list[HashedFile]:
367 """included buildinfo files"""
368 return [
369 f for f in self.files.values()
370 if re_file_buildinfo.match(f.filename)
371 ]
373 @property
374 def binary_names(self) -> list[str]:
375 """names of included binary packages"""
376 return self.changes.get('Binary', '').split()
378 @property
379 def closed_bugs(self) -> list[str]:
380 """bugs closed by this upload"""
381 return self.changes.get('Closes', '').split()
383 @property
384 def files(self) -> dict[str, HashedFile]:
385 """dict mapping filenames to :class:`HashedFile` objects"""
386 if self._files is None:
387 self._files = parse_file_list(self.changes, True)
388 return self._files
390 @property
391 def bytes(self) -> int:
392 """total size of files included in this upload in bytes"""
393 return sum(f.size for f in self.files.values())
395 def _key(self) -> tuple[str, AptVersion, bool, str]:
396 """tuple used to compare two changes files
398 We sort by source name and version first. If these are identical,
399 we sort changes that include source before those without source (so
400 that sourceful uploads get processed first), and finally fall back
401 to the filename (this should really never happen).
402 """
403 return (
404 self.changes.get('Source', ''),
405 AptVersion(self.changes.get('Version', '')),
406 not self.sourceful,
407 self.filename
408 )
410 def __eq__(self, other: object) -> bool:
411 if not isinstance(other, Changes): 411 ↛ 412line 411 didn't jump to line 412, because the condition on line 411 was never true
412 return NotImplemented
413 return self._key() == other._key()
415 def __lt__(self, other: 'Changes') -> bool:
416 return self._key() < other._key()
419class Binary:
420 """Representation of a binary package
421 """
423 def __init__(self, directory: str, hashed_file: HashedFile):
424 self.hashed_file: HashedFile = hashed_file
425 """file object for the .deb"""
427 path = os.path.join(directory, hashed_file.input_filename)
428 data = apt_inst.DebFile(path).control.extractdata("control")
430 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
431 """dict to access fields in DEBIAN/control"""
433 @classmethod
434 def from_file(cls, directory, filename) -> 'Binary':
435 hashed_file = HashedFile.from_file(directory, filename)
436 return cls(directory, hashed_file)
438 @property
439 def source(self) -> tuple[str, str]:
440 """get tuple with source package name and version"""
441 source = self.control.get("Source", None)
442 if source is None:
443 return (self.control["Package"], self.control["Version"])
444 match = re_field_source.match(source)
445 if not match: 445 ↛ 446line 445 didn't jump to line 446, because the condition on line 445 was never true
446 raise InvalidBinaryException('{0}: Invalid Source field.'.format(self.hashed_file.filename))
447 version = match.group('version')
448 if version is None:
449 version = self.control['Version']
450 return (match.group('package'), version)
452 @property
453 def name(self) -> str:
454 return self.control['Package']
456 @property
457 def type(self) -> str:
458 """package type ('deb' or 'udeb')"""
459 match = re_file_binary.match(self.hashed_file.filename)
460 if not match: 460 ↛ 461line 460 didn't jump to line 461, because the condition on line 460 was never true
461 raise InvalidBinaryException('{0}: Does not match re_file_binary'.format(self.hashed_file.filename))
462 return match.group('type')
464 @property
465 def component(self) -> str:
466 """component name"""
467 fields = self.control['Section'].split('/')
468 if len(fields) > 1:
469 return fields[0]
470 return "main"
473class Source:
474 """Representation of a source package
475 """
477 def __init__(self, directory: str, hashed_files: list[HashedFile], keyrings, require_signature=True):
478 self.hashed_files: list[HashedFile] = hashed_files
479 """list of source files (including the .dsc itself)"""
481 dsc_file = None
482 for f in hashed_files:
483 if re_file_dsc.match(f.filename):
484 if dsc_file is not None: 484 ↛ 485line 484 didn't jump to line 485, because the condition on line 484 was never true
485 raise InvalidSourceException("Multiple .dsc found ({0} and {1})".format(self._dsc_file.filename, f.filename))
486 else:
487 dsc_file = f
489 if dsc_file is None: 489 ↛ 490line 489 didn't jump to line 490, because the condition on line 489 was never true
490 raise InvalidSourceException("No .dsc included in source files")
491 self._dsc_file: HashedFile = dsc_file
493 # make sure the hash for the dsc is valid before we use it
494 self._dsc_file.check(directory)
496 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
497 with open(dsc_file_path, 'rb') as fd:
498 data = fd.read()
499 self.signature = SignedFile(data, keyrings, require_signature)
500 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
501 """dict to access fields in the .dsc file"""
503 self.package_list: daklib.packagelist.PackageList = daklib.packagelist.PackageList(self.dsc)
504 """Information about packages built by the source."""
506 self._files: Optional[dict[str, HashedFile]] = None
508 @classmethod
509 def from_file(cls, directory, filename, keyrings, require_signature=True) -> 'Source':
510 hashed_file = HashedFile.from_file(directory, filename)
511 return cls(directory, [hashed_file], keyrings, require_signature)
513 @property
514 def files(self) -> dict[str, HashedFile]:
515 """dict mapping filenames to :class:`HashedFile` objects for additional source files
517 This list does not include the .dsc itself.
518 """
519 if self._files is None:
520 self._files = parse_file_list(self.dsc, False)
521 return self._files
523 @property
524 def primary_fingerprint(self) -> str:
525 """fingerprint of the key used to sign the .dsc"""
526 return self.signature.primary_fingerprint
528 @property
529 def valid_signature(self) -> bool:
530 """:const:`True` if the .dsc has a valid signature"""
531 return self.signature.valid
533 @property
534 def weak_signature(self) -> bool:
535 """:const:`True` if the .dsc was signed using a weak algorithm"""
536 return self.signature.weak_signature
538 @property
539 def component(self) -> str:
540 """guessed component name
542 Might be wrong. Don't rely on this.
543 """
544 if 'Section' not in self.dsc: 544 ↛ 546line 544 didn't jump to line 546, because the condition on line 544 was never false
545 return 'main'
546 fields = self.dsc['Section'].split('/')
547 if len(fields) > 1:
548 return fields[0]
549 return "main"
551 @property
552 def filename(self) -> str:
553 """filename of .dsc file"""
554 return self._dsc_file.filename