Coverage for daklib/upload.py: 86%
332 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to handle uploads not yet installed to the archive
19This module provides classes to handle uploads not yet installed to the
20archive. Central is the :class:`Changes` class which represents a changes file.
21It provides methods to access the included binary and source packages.
22"""
24import errno
25import functools
26import os
27from collections.abc import Collection, Mapping
28from typing import TYPE_CHECKING, Optional, override
30import apt_inst
31import apt_pkg
33import daklib.dakapt
34import daklib.packagelist
35from daklib.aptversion import AptVersion
36from daklib.gpg import SignedFile
37from daklib.regexes import (
38 re_field_source,
39 re_file_binary,
40 re_file_buildinfo,
41 re_file_dsc,
42 re_file_safe,
43 re_file_source,
44 re_file_source_tag2upload,
45)
47if TYPE_CHECKING:
48 import datetime
49 import re
52class UploadException(Exception):
53 pass
56class InvalidChangesException(UploadException):
57 pass
60class InvalidBinaryException(UploadException):
61 pass
64class InvalidSourceException(UploadException):
65 pass
68class InvalidHashException(UploadException):
69 def __init__(
70 self, filename: str, hash_name: str, expected: str | int, actual: str | int
71 ):
72 self.filename = filename
73 self.hash_name = hash_name
74 self.expected = expected
75 self.actual = actual
77 @override
78 def __str__(self):
79 return (
80 "Invalid {0} hash for {1}:\n"
81 "According to the control file the {0} hash should be {2},\n"
82 "but {1} has {3}.\n"
83 "\n"
84 "If you did not include {1} in your upload, a different version\n"
85 "might already be known to the archive software."
86 ).format(self.hash_name, self.filename, self.expected, self.actual)
89class FileDoesNotExist(UploadException):
90 def __init__(self, filename: str):
91 self.filename = filename
93 @override
94 def __str__(self):
95 return "Refers to non-existing file '{0}'".format(self.filename)
98class HashedFile:
99 """file with checksums"""
101 def __init__(
102 self,
103 filename: str,
104 size: int,
105 md5sum: str,
106 sha1sum: str,
107 sha256sum: str,
108 section: Optional[str] = None,
109 priority: Optional[str] = None,
110 input_filename: Optional[str] = None,
111 ):
112 self.filename: str = filename
113 """name of the file"""
115 if input_filename is None: 115 ↛ 117line 115 didn't jump to line 117 because the condition on line 115 was always true
116 input_filename = filename
117 self.input_filename: str = input_filename
118 """name of the file on disk
120 Used for temporary files that should not be installed using their on-disk name.
121 """
123 self.size: int = size
124 """size in bytes"""
126 self.md5sum: str = md5sum
127 """MD5 hash in hexdigits"""
129 self.sha1sum: str = sha1sum
130 """SHA1 hash in hexdigits"""
132 self.sha256sum: str = sha256sum
133 """SHA256 hash in hexdigits"""
135 self.section: Optional[str] = section
136 """section or :const:`None`"""
138 self.priority: Optional[str] = priority
139 """priority or :const:`None`"""
141 @classmethod
142 def from_file(
143 cls,
144 directory: str,
145 filename: str,
146 section: Optional[str] = None,
147 priority: Optional[str] = None,
148 ) -> "HashedFile":
149 """create with values for an existing file
151 Create a :class:`HashedFile` object that refers to an already existing file.
153 :param directory: directory the file is located in
154 :param filename: filename
155 :param section: optional section as given in .changes files
156 :param priority: optional priority as given in .changes files
157 :return: :class:`HashedFile` object for the given file
158 """
159 path = os.path.join(directory, filename)
160 with open(path, "r") as fh:
161 size = os.fstat(fh.fileno()).st_size
162 hashes = daklib.dakapt.DakHashes(fh)
163 return cls(
164 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority
165 )
167 def check(self, directory: str) -> None:
168 """Validate hashes
170 Check if size and hashes match the expected value.
172 :param directory: directory the file is located in
173 :raises InvalidHashException: if there is a hash mismatch
174 """
175 path = os.path.join(directory, self.input_filename)
176 try:
177 with open(path) as fh:
178 self.check_fh(fh)
179 except OSError as e:
180 if e.errno == errno.ENOENT: 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true
181 raise FileDoesNotExist(self.input_filename)
182 raise
184 def check_fh(self, fh) -> None:
185 size = os.fstat(fh.fileno()).st_size
186 fh.seek(0)
187 hashes = daklib.dakapt.DakHashes(fh)
189 if size != self.size: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 raise InvalidHashException(self.filename, "size", self.size, size)
192 if hashes.md5 != self.md5sum: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5)
195 if hashes.sha1 != self.sha1sum: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 raise InvalidHashException(
197 self.filename, "sha1sum", self.sha1sum, hashes.sha1
198 )
200 if hashes.sha256 != self.sha256sum: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 raise InvalidHashException(
202 self.filename, "sha256sum", self.sha256sum, hashes.sha256
203 )
206def parse_file_list(
207 control: Mapping[str, str],
208 has_priority_and_section: bool,
209 safe_file_regexp: "re.Pattern" = re_file_safe,
210 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"),
211) -> dict[str, HashedFile]:
212 """Parse Files and Checksums-* fields
214 :param control: control file to take fields from
215 :param has_priority_and_section: Files field include section and priority
216 (as in .changes)
217 :return: dict mapping filenames to :class:`HashedFile` objects
219 :raises InvalidChangesException: missing fields or other grave errors
220 """
221 entries: dict[str, dict[str, str | int]] = {}
222 entry: dict[str, str | int] | None
224 for line in control.get(fields[0], "").split("\n"):
225 if len(line) == 0: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true
226 continue
228 if has_priority_and_section:
229 (md5sum, size, section, priority, filename) = line.split()
230 entry = dict(
231 md5sum=md5sum,
232 size=int(size),
233 section=section,
234 priority=priority,
235 filename=filename,
236 )
237 else:
238 (md5sum, size, filename) = line.split()
239 entry = dict(md5sum=md5sum, size=int(size), filename=filename)
241 entries[filename] = entry
243 for line in control.get(fields[1], "").split("\n"):
244 if len(line) == 0: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 continue
246 (sha1sum, size, filename) = line.split()
247 entry = entries.get(filename)
248 if entry is None: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 raise InvalidChangesException(
250 "{0} is listed in {1}, but not in {2}.".format(
251 filename, fields[1], fields[0]
252 )
253 )
254 if entry is not None and entry.get("size", None) != int(size): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 raise InvalidChangesException(
256 "Size for {0} in {1} and {2} fields differ.".format(
257 filename, fields[0], fields[1]
258 )
259 )
260 entry["sha1sum"] = sha1sum
262 for line in control.get(fields[2], "").split("\n"):
263 if len(line) == 0: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 continue
265 (sha256sum, size, filename) = line.split()
266 entry = entries.get(filename)
267 if entry is None: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 raise InvalidChangesException(
269 "{0} is listed in {1}, but not in {2}.".format(
270 filename, fields[2], fields[0]
271 )
272 )
273 if entry is not None and entry.get("size", None) != int(size): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 raise InvalidChangesException(
275 "Size for {0} in {1} and {2} fields differ.".format(
276 filename, fields[0], fields[2]
277 )
278 )
279 entry["sha256sum"] = sha256sum
281 files = {}
282 for filename, entry in entries.items():
283 if "size" not in entry: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 raise InvalidChangesException("No size for {0}.".format(filename))
285 if "md5sum" not in entry: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 raise InvalidChangesException("No md5sum for {0}.".format(filename))
287 if "sha1sum" not in entry: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 raise InvalidChangesException("No sha1sum for {0}.".format(filename))
289 if "sha256sum" not in entry: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 raise InvalidChangesException("No sha256sum for {0}.".format(filename))
291 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 raise InvalidChangesException(
293 f"References file with unsafe filename '{filename}'."
294 )
295 files[filename] = HashedFile(**entry) # type: ignore[arg-type]
297 return files
300def _mangle_source_name(source: str) -> str:
301 """mangle source name for comparison
303 This is a hack to ensure "grub-efi-*-signed" is processed before
304 "grub2".
305 """
307 if source == "grub2": 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true
308 return "grub"
309 return source
312@functools.total_ordering
313class Changes:
314 """Representation of a .changes file"""
316 def __init__(
317 self,
318 directory: str,
319 filename: str,
320 keyrings: Collection[str],
321 require_signature: bool = True,
322 ):
323 if not re_file_safe.match(filename): 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 raise InvalidChangesException("{0}: unsafe filename".format(filename))
326 self.directory: str = directory
327 """directory the .changes is located in"""
329 self.filename: str = filename
330 """name of the .changes file"""
332 with open(self.path, "rb") as fd:
333 data = fd.read()
334 self.signature = SignedFile(data, keyrings, require_signature)
335 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
336 """dict to access fields of the .changes file"""
338 self._binaries: "Optional[list[Binary]]" = None
339 self._source: "Optional[Source]" = None
340 self._files: Optional[dict[str, HashedFile]] = None
341 self._keyrings = keyrings
342 self._require_signature: bool = require_signature
344 @property
345 def path(self) -> str:
346 """path to the .changes file"""
347 return os.path.join(self.directory, self.filename)
349 @property
350 def primary_fingerprint(self) -> str:
351 """fingerprint of the key used for signing the .changes file"""
352 return self.signature.primary_fingerprint
354 @property
355 def valid_signature(self) -> bool:
356 """:const:`True` if the .changes has a valid signature"""
357 return self.signature.valid
359 @property
360 def weak_signature(self) -> bool:
361 """:const:`True` if the .changes was signed using a weak algorithm"""
362 return self.signature.weak_signature
364 @property
365 def signature_timestamp(self) -> "datetime.datetime":
366 return self.signature.signature_timestamp
368 @property
369 def contents_sha1(self) -> str:
370 return self.signature.contents_sha1
372 @property
373 def architectures(self) -> list[str]:
374 """list of architectures included in the upload"""
375 return self.changes.get("Architecture", "").split()
377 @property
378 def distributions(self) -> list[str]:
379 """list of target distributions for the upload"""
380 return self.changes["Distribution"].split()
382 @property
383 def source(self) -> "Optional[Source]":
384 """included source or :const:`None`"""
385 if self._source is None:
386 source_files = []
387 for f in self.files.values():
388 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
389 source_files.append(f)
390 if len(source_files) > 0:
391 self._source = Source(
392 self.directory,
393 source_files,
394 self._keyrings,
395 self._require_signature,
396 )
397 return self._source
399 @property
400 def source_tag2upload_files(self) -> list[HashedFile]:
401 """
402 extra source files
403 """
404 return [
405 f
406 for f in self.files.values()
407 if re_file_source_tag2upload.match(f.filename)
408 ]
410 @property
411 def sourceful(self) -> bool:
412 """:const:`True` if the upload includes source"""
413 return "source" in self.architectures
415 @property
416 def source_name(self) -> str:
417 """source package name"""
418 m = re_field_source.match(self.changes["Source"])
419 assert m is not None
420 return m.group("package")
422 @property
423 def binaries(self) -> "list[Binary]":
424 """included binary packages"""
425 if self._binaries is None:
426 self._binaries = [
427 Binary(self.directory, f)
428 for f in self.files.values()
429 if re_file_binary.match(f.filename)
430 ]
431 return self._binaries
433 @property
434 def byhand_files(self) -> list[HashedFile]:
435 """included byhand files"""
436 byhand = []
438 for f in self.files.values():
439 assert f.section is not None
440 if f.section == "byhand" or f.section.startswith("raw-"): 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 byhand.append(f)
442 continue
443 if (
444 re_file_dsc.match(f.filename)
445 or re_file_source.match(f.filename)
446 or re_file_binary.match(f.filename)
447 ):
448 continue
449 if re_file_buildinfo.match(f.filename): 449 ↛ 452line 449 didn't jump to line 452 because the condition on line 449 was always true
450 continue
452 raise InvalidChangesException(
453 "{0}: {1} looks like a byhand package, but is in section {2}".format(
454 self.filename, f.filename, f.section
455 )
456 )
458 return byhand
460 @property
461 def buildinfo_files(self) -> list[HashedFile]:
462 """included buildinfo files"""
463 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)]
465 @property
466 def binary_names(self) -> list[str]:
467 """names of included binary packages"""
468 return self.changes.get("Binary", "").split()
470 @property
471 def closed_bugs(self) -> list[str]:
472 """bugs closed by this upload"""
473 return self.changes.get("Closes", "").split()
475 @property
476 def files(self) -> dict[str, HashedFile]:
477 """dict mapping filenames to :class:`HashedFile` objects"""
478 if self._files is None:
479 self._files = parse_file_list(self.changes, True)
480 return self._files
482 @property
483 def bytes(self) -> int:
484 """total size of files included in this upload in bytes"""
485 return sum(f.size for f in self.files.values())
487 def _key(self) -> tuple[str, AptVersion, bool, str]:
488 """tuple used to compare two changes files
490 We sort by source name and version first. If these are identical,
491 we sort changes that include source before those without source (so
492 that sourceful uploads get processed first), and finally fall back
493 to the filename (this should really never happen).
494 """
495 return (
496 _mangle_source_name(self.changes.get("Source", "")),
497 AptVersion(self.changes.get("Version", "")),
498 not self.sourceful,
499 self.filename,
500 )
502 @override
503 def __eq__(self, other: object) -> bool:
504 if not isinstance(other, Changes): 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 return NotImplemented
506 return self._key() == other._key()
508 def __lt__(self, other: "Changes") -> bool:
509 return self._key() < other._key()
512class Binary:
513 """Representation of a binary package"""
515 def __init__(self, directory: str, hashed_file: HashedFile):
516 self.hashed_file: HashedFile = hashed_file
517 """file object for the .deb"""
519 path = os.path.join(directory, hashed_file.input_filename)
520 data = apt_inst.DebFile(path).control.extractdata("control")
522 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
523 """dict to access fields in DEBIAN/control"""
525 @classmethod
526 def from_file(cls, directory, filename) -> "Binary":
527 hashed_file = HashedFile.from_file(directory, filename)
528 return cls(directory, hashed_file)
530 @property
531 def source(self) -> tuple[str, str]:
532 """get tuple with source package name and version"""
533 source = self.control.get("Source", None)
534 if source is None:
535 return (self.control["Package"], self.control["Version"])
536 match = re_field_source.match(source)
537 if not match: 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true
538 raise InvalidBinaryException(
539 "{0}: Invalid Source field.".format(self.hashed_file.filename)
540 )
541 version = match.group("version")
542 if version is None:
543 version = self.control["Version"]
544 return (match.group("package"), version)
546 @property
547 def name(self) -> str:
548 return self.control["Package"]
550 @property
551 def type(self) -> str:
552 """package type ('deb' or 'udeb')"""
553 match = re_file_binary.match(self.hashed_file.filename)
554 if not match: 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true
555 raise InvalidBinaryException(
556 "{0}: Does not match re_file_binary".format(self.hashed_file.filename)
557 )
558 return match.group("type")
560 @property
561 def component(self) -> str:
562 """component name"""
563 fields = self.control["Section"].split("/")
564 if len(fields) > 1:
565 return fields[0]
566 return "main"
569class Source:
570 """Representation of a source package"""
572 def __init__(
573 self,
574 directory: str,
575 hashed_files: list[HashedFile],
576 keyrings: Collection[str],
577 require_signature=True,
578 ):
579 self.hashed_files: list[HashedFile] = hashed_files
580 """list of source files (including the .dsc itself)"""
582 dsc_file = None
583 for f in hashed_files:
584 if re_file_dsc.match(f.filename):
585 if dsc_file is not None: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 raise InvalidSourceException(
587 "Multiple .dsc found ({0} and {1})".format(
588 self._dsc_file.filename, f.filename
589 )
590 )
591 else:
592 dsc_file = f
594 if dsc_file is None: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true
595 raise InvalidSourceException("No .dsc included in source files")
596 self._dsc_file: HashedFile = dsc_file
598 # make sure the hash for the dsc is valid before we use it
599 self._dsc_file.check(directory)
601 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
602 with open(dsc_file_path, "rb") as fd:
603 data = fd.read()
604 self.signature = SignedFile(data, keyrings, require_signature)
605 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
606 """dict to access fields in the .dsc file"""
608 self.package_list: daklib.packagelist.PackageList = (
609 daklib.packagelist.PackageList(self.dsc)
610 )
611 """Information about packages built by the source."""
613 self._files: Optional[dict[str, HashedFile]] = None
615 @classmethod
616 def from_file(
617 cls,
618 directory: str,
619 filename: str,
620 keyrings: Collection[str],
621 require_signature=True,
622 ) -> "Source":
623 hashed_file = HashedFile.from_file(directory, filename)
624 return cls(directory, [hashed_file], keyrings, require_signature)
626 @property
627 def files(self) -> dict[str, HashedFile]:
628 """dict mapping filenames to :class:`HashedFile` objects for additional source files
630 This list does not include the .dsc itself.
631 """
632 if self._files is None:
633 self._files = parse_file_list(self.dsc, False)
634 return self._files
636 @property
637 def primary_fingerprint(self) -> str:
638 """fingerprint of the key used to sign the .dsc"""
639 return self.signature.primary_fingerprint
641 @property
642 def valid_signature(self) -> bool:
643 """:const:`True` if the .dsc has a valid signature"""
644 return self.signature.valid
646 @property
647 def weak_signature(self) -> bool:
648 """:const:`True` if the .dsc was signed using a weak algorithm"""
649 return self.signature.weak_signature
651 @property
652 def component(self) -> str:
653 """guessed component name
655 Might be wrong. Don't rely on this.
656 """
657 if "Section" not in self.dsc: 657 ↛ 659line 657 didn't jump to line 659 because the condition on line 657 was always true
658 return "main"
659 fields = self.dsc["Section"].split("/")
660 if len(fields) > 1:
661 return fields[0]
662 return "main"
664 @property
665 def filename(self) -> str:
666 """filename of .dsc file"""
667 return self._dsc_file.filename