1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to handle uploads not yet installed to the archive
19This module provides classes to handle uploads not yet installed to the
20archive. Central is the :class:`Changes` class which represents a changes file.
21It provides methods to access the included binary and source packages.
22"""
24import errno
25import functools
26import os
27from collections.abc import Mapping
28from typing import TYPE_CHECKING, Optional
30import apt_inst
31import apt_pkg
33import daklib.dakapt
34import daklib.packagelist
35from daklib.aptversion import AptVersion
36from daklib.gpg import SignedFile
37from daklib.regexes import (
38 re_field_source,
39 re_file_binary,
40 re_file_buildinfo,
41 re_file_dsc,
42 re_file_safe,
43 re_file_source,
44)
46if TYPE_CHECKING: 46 ↛ 47line 46 didn't jump to line 47, because the condition on line 46 was never true
47 import datetime
48 import re
51class UploadException(Exception):
52 pass
55class InvalidChangesException(UploadException):
56 pass
59class InvalidBinaryException(UploadException):
60 pass
63class InvalidSourceException(UploadException):
64 pass
67class InvalidHashException(UploadException):
68 def __init__(self, filename: str, hash_name: str, expected, actual):
69 self.filename = filename
70 self.hash_name = hash_name
71 self.expected = expected
72 self.actual = actual
74 def __str__(self):
75 return (
76 "Invalid {0} hash for {1}:\n"
77 "According to the control file the {0} hash should be {2},\n"
78 "but {1} has {3}.\n"
79 "\n"
80 "If you did not include {1} in your upload, a different version\n"
81 "might already be known to the archive software."
82 ).format(self.hash_name, self.filename, self.expected, self.actual)
85class InvalidFilenameException(UploadException):
86 def __init__(self, filename: str):
87 self.filename: str = filename
89 def __str__(self):
90 return "Invalid filename '{0}'.".format(self.filename)
93class FileDoesNotExist(UploadException):
94 def __init__(self, filename: str):
95 self.filename = filename
97 def __str__(self):
98 return "Refers to non-existing file '{0}'".format(self.filename)
101class HashedFile:
102 """file with checksums"""
104 def __init__(
105 self,
106 filename: str,
107 size: int,
108 md5sum: str,
109 sha1sum: str,
110 sha256sum: str,
111 section: Optional[str] = None,
112 priority: Optional[str] = None,
113 input_filename: Optional[str] = None,
114 ):
115 self.filename: str = filename
116 """name of the file"""
118 if input_filename is None: 118 ↛ 120line 118 didn't jump to line 120, because the condition on line 118 was never false
119 input_filename = filename
120 self.input_filename: str = input_filename
121 """name of the file on disk
123 Used for temporary files that should not be installed using their on-disk name.
124 """
126 self.size: int = size
127 """size in bytes"""
129 self.md5sum: str = md5sum
130 """MD5 hash in hexdigits"""
132 self.sha1sum: str = sha1sum
133 """SHA1 hash in hexdigits"""
135 self.sha256sum: str = sha256sum
136 """SHA256 hash in hexdigits"""
138 self.section: Optional[str] = section
139 """section or :const:`None`"""
141 self.priority: Optional[str] = priority
142 """priority or :const:`None`"""
144 @classmethod
145 def from_file(
146 cls,
147 directory: str,
148 filename: str,
149 section: Optional[str] = None,
150 priority: Optional[str] = None,
151 ) -> "HashedFile":
152 """create with values for an existing file
154 Create a :class:`HashedFile` object that refers to an already existing file.
156 :param directory: directory the file is located in
157 :param filename: filename
158 :param section: optional section as given in .changes files
159 :param priority: optional priority as given in .changes files
160 :return: :class:`HashedFile` object for the given file
161 """
162 path = os.path.join(directory, filename)
163 with open(path, "r") as fh:
164 size = os.fstat(fh.fileno()).st_size
165 hashes = daklib.dakapt.DakHashes(fh)
166 return cls(
167 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority
168 )
170 def check(self, directory: str) -> None:
171 """Validate hashes
173 Check if size and hashes match the expected value.
175 :param directory: directory the file is located in
176 :raises InvalidHashException: if there is a hash mismatch
177 """
178 path = os.path.join(directory, self.input_filename)
179 try:
180 with open(path) as fh:
181 self.check_fh(fh)
182 except OSError as e:
183 if e.errno == errno.ENOENT: 183 ↛ 185line 183 didn't jump to line 185, because the condition on line 183 was never false
184 raise FileDoesNotExist(self.input_filename)
185 raise
187 def check_fh(self, fh) -> None:
188 size = os.fstat(fh.fileno()).st_size
189 fh.seek(0)
190 hashes = daklib.dakapt.DakHashes(fh)
192 if size != self.size: 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true
193 raise InvalidHashException(self.filename, "size", self.size, size)
195 if hashes.md5 != self.md5sum: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true
196 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5)
198 if hashes.sha1 != self.sha1sum: 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true
199 raise InvalidHashException(
200 self.filename, "sha1sum", self.sha1sum, hashes.sha1
201 )
203 if hashes.sha256 != self.sha256sum: 203 ↛ 204line 203 didn't jump to line 204, because the condition on line 203 was never true
204 raise InvalidHashException(
205 self.filename, "sha256sum", self.sha256sum, hashes.sha256
206 )
209def parse_file_list(
210 control: Mapping[str, str],
211 has_priority_and_section: bool,
212 safe_file_regexp: "re.Pattern" = re_file_safe,
213 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"),
214) -> dict[str, HashedFile]:
215 """Parse Files and Checksums-* fields
217 :param control: control file to take fields from
218 :param has_priority_and_section: Files field include section and priority
219 (as in .changes)
220 :return: dict mapping filenames to :class:`HashedFile` objects
222 :raises InvalidChangesException: missing fields or other grave errors
223 """
224 entries = {}
226 for line in control.get(fields[0], "").split("\n"):
227 if len(line) == 0: 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true
228 continue
230 if has_priority_and_section:
231 (md5sum, size, section, priority, filename) = line.split()
232 entry = dict(
233 md5sum=md5sum,
234 size=int(size),
235 section=section,
236 priority=priority,
237 filename=filename,
238 )
239 else:
240 (md5sum, size, filename) = line.split()
241 entry = dict(md5sum=md5sum, size=int(size), filename=filename)
243 entries[filename] = entry
245 for line in control.get(fields[1], "").split("\n"):
246 if len(line) == 0: 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true
247 continue
248 (sha1sum, size, filename) = line.split()
249 entry = entries.get(filename)
250 if entry is None: 250 ↛ 251line 250 didn't jump to line 251, because the condition on line 250 was never true
251 raise InvalidChangesException(
252 "{0} is listed in {1}, but not in {2}.".format(
253 filename, fields[1], fields[0]
254 )
255 )
256 if entry is not None and entry.get("size", None) != int(size): 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true
257 raise InvalidChangesException(
258 "Size for {0} in {1} and {2} fields differ.".format(
259 filename, fields[0], fields[1]
260 )
261 )
262 entry["sha1sum"] = sha1sum
264 for line in control.get(fields[2], "").split("\n"):
265 if len(line) == 0: 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true
266 continue
267 (sha256sum, size, filename) = line.split()
268 entry = entries.get(filename)
269 if entry is None: 269 ↛ 270line 269 didn't jump to line 270, because the condition on line 269 was never true
270 raise InvalidChangesException(
271 "{0} is listed in {1}, but not in {2}.".format(
272 filename, fields[2], fields[0]
273 )
274 )
275 if entry is not None and entry.get("size", None) != int(size): 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true
276 raise InvalidChangesException(
277 "Size for {0} in {1} and {2} fields differ.".format(
278 filename, fields[0], fields[2]
279 )
280 )
281 entry["sha256sum"] = sha256sum
283 files = {}
284 for entry in entries.values():
285 filename = entry["filename"]
286 if "size" not in entry: 286 ↛ 287line 286 didn't jump to line 287, because the condition on line 286 was never true
287 raise InvalidChangesException("No size for {0}.".format(filename))
288 if "md5sum" not in entry: 288 ↛ 289line 288 didn't jump to line 289, because the condition on line 288 was never true
289 raise InvalidChangesException("No md5sum for {0}.".format(filename))
290 if "sha1sum" not in entry: 290 ↛ 291line 290 didn't jump to line 291, because the condition on line 290 was never true
291 raise InvalidChangesException("No sha1sum for {0}.".format(filename))
292 if "sha256sum" not in entry: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true
293 raise InvalidChangesException("No sha256sum for {0}.".format(filename))
294 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true
295 raise InvalidChangesException(
296 f"References file with unsafe filename '{filename}'."
297 )
298 files[filename] = HashedFile(**entry)
300 return files
303@functools.total_ordering
304class Changes:
305 """Representation of a .changes file"""
307 def __init__(
308 self, directory: str, filename: str, keyrings, require_signature: bool = True
309 ):
310 if not re_file_safe.match(filename): 310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true
311 raise InvalidChangesException("{0}: unsafe filename".format(filename))
313 self.directory: str = directory
314 """directory the .changes is located in"""
316 self.filename: str = filename
317 """name of the .changes file"""
319 with open(self.path, "rb") as fd:
320 data = fd.read()
321 self.signature = SignedFile(data, keyrings, require_signature)
322 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
323 """dict to access fields of the .changes file"""
325 self._binaries: "Optional[list[Binary]]" = None
326 self._source: "Optional[Source]" = None
327 self._files: Optional[dict[str, HashedFile]] = None
328 self._keyrings = keyrings
329 self._require_signature: bool = require_signature
331 @property
332 def path(self) -> str:
333 """path to the .changes file"""
334 return os.path.join(self.directory, self.filename)
336 @property
337 def primary_fingerprint(self) -> str:
338 """fingerprint of the key used for signing the .changes file"""
339 return self.signature.primary_fingerprint
341 @property
342 def valid_signature(self) -> bool:
343 """:const:`True` if the .changes has a valid signature"""
344 return self.signature.valid
346 @property
347 def weak_signature(self) -> bool:
348 """:const:`True` if the .changes was signed using a weak algorithm"""
349 return self.signature.weak_signature
351 @property
352 def signature_timestamp(self) -> "datetime.datetime":
353 return self.signature.signature_timestamp
355 @property
356 def contents_sha1(self) -> str:
357 return self.signature.contents_sha1
359 @property
360 def architectures(self) -> list[str]:
361 """list of architectures included in the upload"""
362 return self.changes.get("Architecture", "").split()
364 @property
365 def distributions(self) -> list[str]:
366 """list of target distributions for the upload"""
367 return self.changes["Distribution"].split()
369 @property
370 def source(self) -> "Optional[Source]":
371 """included source or :const:`None`"""
372 if self._source is None:
373 source_files = []
374 for f in self.files.values():
375 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
376 source_files.append(f)
377 if len(source_files) > 0:
378 self._source = Source(
379 self.directory,
380 source_files,
381 self._keyrings,
382 self._require_signature,
383 )
384 return self._source
386 @property
387 def sourceful(self) -> bool:
388 """:const:`True` if the upload includes source"""
389 return "source" in self.architectures
391 @property
392 def source_name(self) -> str:
393 """source package name"""
394 return re_field_source.match(self.changes["Source"]).group("package")
396 @property
397 def binaries(self) -> "list[Binary]":
398 """included binary packages"""
399 if self._binaries is None:
400 self._binaries = [
401 Binary(self.directory, f)
402 for f in self.files.values()
403 if re_file_binary.match(f.filename)
404 ]
405 return self._binaries
407 @property
408 def byhand_files(self) -> list[HashedFile]:
409 """included byhand files"""
410 byhand = []
412 for f in self.files.values():
413 if f.section == "byhand" or f.section[:4] == "raw-": 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true
414 byhand.append(f)
415 continue
416 if (
417 re_file_dsc.match(f.filename)
418 or re_file_source.match(f.filename)
419 or re_file_binary.match(f.filename)
420 ):
421 continue
422 if re_file_buildinfo.match(f.filename): 422 ↛ 425line 422 didn't jump to line 425, because the condition on line 422 was never false
423 continue
425 raise InvalidChangesException(
426 "{0}: {1} looks like a byhand package, but is in section {2}".format(
427 self.filename, f.filename, f.section
428 )
429 )
431 return byhand
433 @property
434 def buildinfo_files(self) -> list[HashedFile]:
435 """included buildinfo files"""
436 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)]
438 @property
439 def binary_names(self) -> list[str]:
440 """names of included binary packages"""
441 return self.changes.get("Binary", "").split()
443 @property
444 def closed_bugs(self) -> list[str]:
445 """bugs closed by this upload"""
446 return self.changes.get("Closes", "").split()
448 @property
449 def files(self) -> dict[str, HashedFile]:
450 """dict mapping filenames to :class:`HashedFile` objects"""
451 if self._files is None:
452 self._files = parse_file_list(self.changes, True)
453 return self._files
455 @property
456 def bytes(self) -> int:
457 """total size of files included in this upload in bytes"""
458 return sum(f.size for f in self.files.values())
460 def _key(self) -> tuple[str, AptVersion, bool, str]:
461 """tuple used to compare two changes files
463 We sort by source name and version first. If these are identical,
464 we sort changes that include source before those without source (so
465 that sourceful uploads get processed first), and finally fall back
466 to the filename (this should really never happen).
467 """
468 return (
469 self.changes.get("Source", ""),
470 AptVersion(self.changes.get("Version", "")),
471 not self.sourceful,
472 self.filename,
473 )
475 def __eq__(self, other: object) -> bool:
476 if not isinstance(other, Changes): 476 ↛ 477line 476 didn't jump to line 477, because the condition on line 476 was never true
477 return NotImplemented
478 return self._key() == other._key()
480 def __lt__(self, other: "Changes") -> bool:
481 return self._key() < other._key()
484class Binary:
485 """Representation of a binary package"""
487 def __init__(self, directory: str, hashed_file: HashedFile):
488 self.hashed_file: HashedFile = hashed_file
489 """file object for the .deb"""
491 path = os.path.join(directory, hashed_file.input_filename)
492 data = apt_inst.DebFile(path).control.extractdata("control")
494 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
495 """dict to access fields in DEBIAN/control"""
497 @classmethod
498 def from_file(cls, directory, filename) -> "Binary":
499 hashed_file = HashedFile.from_file(directory, filename)
500 return cls(directory, hashed_file)
502 @property
503 def source(self) -> tuple[str, str]:
504 """get tuple with source package name and version"""
505 source = self.control.get("Source", None)
506 if source is None:
507 return (self.control["Package"], self.control["Version"])
508 match = re_field_source.match(source)
509 if not match: 509 ↛ 510line 509 didn't jump to line 510, because the condition on line 509 was never true
510 raise InvalidBinaryException(
511 "{0}: Invalid Source field.".format(self.hashed_file.filename)
512 )
513 version = match.group("version")
514 if version is None:
515 version = self.control["Version"]
516 return (match.group("package"), version)
518 @property
519 def name(self) -> str:
520 return self.control["Package"]
522 @property
523 def type(self) -> str:
524 """package type ('deb' or 'udeb')"""
525 match = re_file_binary.match(self.hashed_file.filename)
526 if not match: 526 ↛ 527line 526 didn't jump to line 527, because the condition on line 526 was never true
527 raise InvalidBinaryException(
528 "{0}: Does not match re_file_binary".format(self.hashed_file.filename)
529 )
530 return match.group("type")
532 @property
533 def component(self) -> str:
534 """component name"""
535 fields = self.control["Section"].split("/")
536 if len(fields) > 1:
537 return fields[0]
538 return "main"
541class Source:
542 """Representation of a source package"""
544 def __init__(
545 self,
546 directory: str,
547 hashed_files: list[HashedFile],
548 keyrings,
549 require_signature=True,
550 ):
551 self.hashed_files: list[HashedFile] = hashed_files
552 """list of source files (including the .dsc itself)"""
554 dsc_file = None
555 for f in hashed_files:
556 if re_file_dsc.match(f.filename):
557 if dsc_file is not None: 557 ↛ 558line 557 didn't jump to line 558, because the condition on line 557 was never true
558 raise InvalidSourceException(
559 "Multiple .dsc found ({0} and {1})".format(
560 self._dsc_file.filename, f.filename
561 )
562 )
563 else:
564 dsc_file = f
566 if dsc_file is None: 566 ↛ 567line 566 didn't jump to line 567, because the condition on line 566 was never true
567 raise InvalidSourceException("No .dsc included in source files")
568 self._dsc_file: HashedFile = dsc_file
570 # make sure the hash for the dsc is valid before we use it
571 self._dsc_file.check(directory)
573 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
574 with open(dsc_file_path, "rb") as fd:
575 data = fd.read()
576 self.signature = SignedFile(data, keyrings, require_signature)
577 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
578 """dict to access fields in the .dsc file"""
580 self.package_list: daklib.packagelist.PackageList = (
581 daklib.packagelist.PackageList(self.dsc)
582 )
583 """Information about packages built by the source."""
585 self._files: Optional[dict[str, HashedFile]] = None
587 @classmethod
588 def from_file(
589 cls, directory, filename, keyrings, require_signature=True
590 ) -> "Source":
591 hashed_file = HashedFile.from_file(directory, filename)
592 return cls(directory, [hashed_file], keyrings, require_signature)
594 @property
595 def files(self) -> dict[str, HashedFile]:
596 """dict mapping filenames to :class:`HashedFile` objects for additional source files
598 This list does not include the .dsc itself.
599 """
600 if self._files is None:
601 self._files = parse_file_list(self.dsc, False)
602 return self._files
604 @property
605 def primary_fingerprint(self) -> str:
606 """fingerprint of the key used to sign the .dsc"""
607 return self.signature.primary_fingerprint
609 @property
610 def valid_signature(self) -> bool:
611 """:const:`True` if the .dsc has a valid signature"""
612 return self.signature.valid
614 @property
615 def weak_signature(self) -> bool:
616 """:const:`True` if the .dsc was signed using a weak algorithm"""
617 return self.signature.weak_signature
619 @property
620 def component(self) -> str:
621 """guessed component name
623 Might be wrong. Don't rely on this.
624 """
625 if "Section" not in self.dsc: 625 ↛ 627line 625 didn't jump to line 627, because the condition on line 625 was never false
626 return "main"
627 fields = self.dsc["Section"].split("/")
628 if len(fields) > 1:
629 return fields[0]
630 return "main"
632 @property
633 def filename(self) -> str:
634 """filename of .dsc file"""
635 return self._dsc_file.filename