Coverage for daklib/upload.py: 86%
338 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-03-14 12:19 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-03-14 12:19 +0000
1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to handle uploads not yet installed to the archive
19This module provides classes to handle uploads not yet installed to the
20archive. Central is the :class:`Changes` class which represents a changes file.
21It provides methods to access the included binary and source packages.
22"""
24import errno
25import functools
26import os
27from collections.abc import Collection, Mapping
28from typing import TYPE_CHECKING, Optional, override
30import apt_inst
31import apt_pkg
33import daklib.dakapt
34import daklib.packagelist
35from daklib.aptversion import AptVersion
36from daklib.gpg import SignedFile
37from daklib.regexes import (
38 re_field_source,
39 re_file_binary,
40 re_file_buildinfo,
41 re_file_dsc,
42 re_file_safe,
43 re_file_source,
44 re_file_source_tag2upload,
45)
47if TYPE_CHECKING:
48 import datetime
49 import re
52class UploadException(Exception):
53 pass
56class InvalidChangesException(UploadException):
57 pass
60class InvalidBinaryException(UploadException):
61 pass
64class InvalidSourceException(UploadException):
65 pass
68class InvalidHashException(UploadException):
69 def __init__(
70 self, filename: str, hash_name: str, expected: str | int, actual: str | int
71 ):
72 self.filename = filename
73 self.hash_name = hash_name
74 self.expected = expected
75 self.actual = actual
77 @override
78 def __str__(self):
79 return (
80 "Invalid {0} hash for {1}:\n"
81 "According to the control file the {0} hash should be {2},\n"
82 "but {1} has {3}.\n"
83 "\n"
84 "If you did not include {1} in your upload, a different version\n"
85 "might already be known to the archive software."
86 ).format(self.hash_name, self.filename, self.expected, self.actual)
89class InvalidFilenameException(UploadException):
90 def __init__(self, filename: str):
91 self.filename: str = filename
93 @override
94 def __str__(self):
95 return "Invalid filename '{0}'.".format(self.filename)
98class FileDoesNotExist(UploadException):
99 def __init__(self, filename: str):
100 self.filename = filename
102 @override
103 def __str__(self):
104 return "Refers to non-existing file '{0}'".format(self.filename)
107class HashedFile:
108 """file with checksums"""
110 def __init__(
111 self,
112 filename: str,
113 size: int,
114 md5sum: str,
115 sha1sum: str,
116 sha256sum: str,
117 section: Optional[str] = None,
118 priority: Optional[str] = None,
119 input_filename: Optional[str] = None,
120 ):
121 self.filename: str = filename
122 """name of the file"""
124 if input_filename is None: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true
125 input_filename = filename
126 self.input_filename: str = input_filename
127 """name of the file on disk
129 Used for temporary files that should not be installed using their on-disk name.
130 """
132 self.size: int = size
133 """size in bytes"""
135 self.md5sum: str = md5sum
136 """MD5 hash in hexdigits"""
138 self.sha1sum: str = sha1sum
139 """SHA1 hash in hexdigits"""
141 self.sha256sum: str = sha256sum
142 """SHA256 hash in hexdigits"""
144 self.section: Optional[str] = section
145 """section or :const:`None`"""
147 self.priority: Optional[str] = priority
148 """priority or :const:`None`"""
150 @classmethod
151 def from_file(
152 cls,
153 directory: str,
154 filename: str,
155 section: Optional[str] = None,
156 priority: Optional[str] = None,
157 ) -> "HashedFile":
158 """create with values for an existing file
160 Create a :class:`HashedFile` object that refers to an already existing file.
162 :param directory: directory the file is located in
163 :param filename: filename
164 :param section: optional section as given in .changes files
165 :param priority: optional priority as given in .changes files
166 :return: :class:`HashedFile` object for the given file
167 """
168 path = os.path.join(directory, filename)
169 with open(path, "r") as fh:
170 size = os.fstat(fh.fileno()).st_size
171 hashes = daklib.dakapt.DakHashes(fh)
172 return cls(
173 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority
174 )
176 def check(self, directory: str) -> None:
177 """Validate hashes
179 Check if size and hashes match the expected value.
181 :param directory: directory the file is located in
182 :raises InvalidHashException: if there is a hash mismatch
183 """
184 path = os.path.join(directory, self.input_filename)
185 try:
186 with open(path) as fh:
187 self.check_fh(fh)
188 except OSError as e:
189 if e.errno == errno.ENOENT: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 raise FileDoesNotExist(self.input_filename)
191 raise
193 def check_fh(self, fh) -> None:
194 size = os.fstat(fh.fileno()).st_size
195 fh.seek(0)
196 hashes = daklib.dakapt.DakHashes(fh)
198 if size != self.size: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 raise InvalidHashException(self.filename, "size", self.size, size)
201 if hashes.md5 != self.md5sum: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5)
204 if hashes.sha1 != self.sha1sum: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 raise InvalidHashException(
206 self.filename, "sha1sum", self.sha1sum, hashes.sha1
207 )
209 if hashes.sha256 != self.sha256sum: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise InvalidHashException(
211 self.filename, "sha256sum", self.sha256sum, hashes.sha256
212 )
215def parse_file_list(
216 control: Mapping[str, str],
217 has_priority_and_section: bool,
218 safe_file_regexp: "re.Pattern" = re_file_safe,
219 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"),
220) -> dict[str, HashedFile]:
221 """Parse Files and Checksums-* fields
223 :param control: control file to take fields from
224 :param has_priority_and_section: Files field include section and priority
225 (as in .changes)
226 :return: dict mapping filenames to :class:`HashedFile` objects
228 :raises InvalidChangesException: missing fields or other grave errors
229 """
230 entries: dict[str, dict[str, str | int]] = {}
231 entry: dict[str, str | int] | None
233 for line in control.get(fields[0], "").split("\n"):
234 if len(line) == 0: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 continue
237 if has_priority_and_section:
238 (md5sum, size, section, priority, filename) = line.split()
239 entry = dict(
240 md5sum=md5sum,
241 size=int(size),
242 section=section,
243 priority=priority,
244 filename=filename,
245 )
246 else:
247 (md5sum, size, filename) = line.split()
248 entry = dict(md5sum=md5sum, size=int(size), filename=filename)
250 entries[filename] = entry
252 for line in control.get(fields[1], "").split("\n"):
253 if len(line) == 0: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 continue
255 (sha1sum, size, filename) = line.split()
256 entry = entries.get(filename)
257 if entry is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 raise InvalidChangesException(
259 "{0} is listed in {1}, but not in {2}.".format(
260 filename, fields[1], fields[0]
261 )
262 )
263 if entry is not None and entry.get("size", None) != int(size): 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 raise InvalidChangesException(
265 "Size for {0} in {1} and {2} fields differ.".format(
266 filename, fields[0], fields[1]
267 )
268 )
269 entry["sha1sum"] = sha1sum
271 for line in control.get(fields[2], "").split("\n"):
272 if len(line) == 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 continue
274 (sha256sum, size, filename) = line.split()
275 entry = entries.get(filename)
276 if entry is None: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 raise InvalidChangesException(
278 "{0} is listed in {1}, but not in {2}.".format(
279 filename, fields[2], fields[0]
280 )
281 )
282 if entry is not None and entry.get("size", None) != int(size): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 raise InvalidChangesException(
284 "Size for {0} in {1} and {2} fields differ.".format(
285 filename, fields[0], fields[2]
286 )
287 )
288 entry["sha256sum"] = sha256sum
290 files = {}
291 for filename, entry in entries.items():
292 if "size" not in entry: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 raise InvalidChangesException("No size for {0}.".format(filename))
294 if "md5sum" not in entry: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true
295 raise InvalidChangesException("No md5sum for {0}.".format(filename))
296 if "sha1sum" not in entry: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 raise InvalidChangesException("No sha1sum for {0}.".format(filename))
298 if "sha256sum" not in entry: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 raise InvalidChangesException("No sha256sum for {0}.".format(filename))
300 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 raise InvalidChangesException(
302 f"References file with unsafe filename '{filename}'."
303 )
304 files[filename] = HashedFile(**entry) # type: ignore[arg-type]
306 return files
309def _mangle_source_name(source: str) -> str:
310 """mangle source name for comparison
312 This is a hack to ensure "grub-efi-*-signed" is processed before
313 "grub2".
314 """
316 if source == "grub2": 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 return "grub"
318 return source
321@functools.total_ordering
322class Changes:
323 """Representation of a .changes file"""
325 def __init__(
326 self,
327 directory: str,
328 filename: str,
329 keyrings: Collection[str],
330 require_signature: bool = True,
331 ):
332 if not re_file_safe.match(filename): 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 raise InvalidChangesException("{0}: unsafe filename".format(filename))
335 self.directory: str = directory
336 """directory the .changes is located in"""
338 self.filename: str = filename
339 """name of the .changes file"""
341 with open(self.path, "rb") as fd:
342 data = fd.read()
343 self.signature = SignedFile(data, keyrings, require_signature)
344 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
345 """dict to access fields of the .changes file"""
347 self._binaries: "Optional[list[Binary]]" = None
348 self._source: "Optional[Source]" = None
349 self._files: Optional[dict[str, HashedFile]] = None
350 self._keyrings = keyrings
351 self._require_signature: bool = require_signature
353 @property
354 def path(self) -> str:
355 """path to the .changes file"""
356 return os.path.join(self.directory, self.filename)
358 @property
359 def primary_fingerprint(self) -> str:
360 """fingerprint of the key used for signing the .changes file"""
361 return self.signature.primary_fingerprint
363 @property
364 def valid_signature(self) -> bool:
365 """:const:`True` if the .changes has a valid signature"""
366 return self.signature.valid
368 @property
369 def weak_signature(self) -> bool:
370 """:const:`True` if the .changes was signed using a weak algorithm"""
371 return self.signature.weak_signature
373 @property
374 def signature_timestamp(self) -> "datetime.datetime":
375 return self.signature.signature_timestamp
377 @property
378 def contents_sha1(self) -> str:
379 return self.signature.contents_sha1
381 @property
382 def architectures(self) -> list[str]:
383 """list of architectures included in the upload"""
384 return self.changes.get("Architecture", "").split()
386 @property
387 def distributions(self) -> list[str]:
388 """list of target distributions for the upload"""
389 return self.changes["Distribution"].split()
391 @property
392 def source(self) -> "Optional[Source]":
393 """included source or :const:`None`"""
394 if self._source is None:
395 source_files = []
396 for f in self.files.values():
397 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
398 source_files.append(f)
399 if len(source_files) > 0:
400 self._source = Source(
401 self.directory,
402 source_files,
403 self._keyrings,
404 self._require_signature,
405 )
406 return self._source
408 @property
409 def source_tag2upload_files(self) -> list[HashedFile]:
410 """
411 extra source files
412 """
413 return [
414 f
415 for f in self.files.values()
416 if re_file_source_tag2upload.match(f.filename)
417 ]
419 @property
420 def sourceful(self) -> bool:
421 """:const:`True` if the upload includes source"""
422 return "source" in self.architectures
424 @property
425 def source_name(self) -> str:
426 """source package name"""
427 m = re_field_source.match(self.changes["Source"])
428 assert m is not None
429 return m.group("package")
431 @property
432 def binaries(self) -> "list[Binary]":
433 """included binary packages"""
434 if self._binaries is None:
435 self._binaries = [
436 Binary(self.directory, f)
437 for f in self.files.values()
438 if re_file_binary.match(f.filename)
439 ]
440 return self._binaries
442 @property
443 def byhand_files(self) -> list[HashedFile]:
444 """included byhand files"""
445 byhand = []
447 for f in self.files.values():
448 assert f.section is not None
449 if f.section == "byhand" or f.section.startswith("raw-"): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 byhand.append(f)
451 continue
452 if (
453 re_file_dsc.match(f.filename)
454 or re_file_source.match(f.filename)
455 or re_file_binary.match(f.filename)
456 ):
457 continue
458 if re_file_buildinfo.match(f.filename): 458 ↛ 461line 458 didn't jump to line 461 because the condition on line 458 was always true
459 continue
461 raise InvalidChangesException(
462 "{0}: {1} looks like a byhand package, but is in section {2}".format(
463 self.filename, f.filename, f.section
464 )
465 )
467 return byhand
469 @property
470 def buildinfo_files(self) -> list[HashedFile]:
471 """included buildinfo files"""
472 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)]
474 @property
475 def binary_names(self) -> list[str]:
476 """names of included binary packages"""
477 return self.changes.get("Binary", "").split()
479 @property
480 def closed_bugs(self) -> list[str]:
481 """bugs closed by this upload"""
482 return self.changes.get("Closes", "").split()
484 @property
485 def files(self) -> dict[str, HashedFile]:
486 """dict mapping filenames to :class:`HashedFile` objects"""
487 if self._files is None:
488 self._files = parse_file_list(self.changes, True)
489 return self._files
491 @property
492 def bytes(self) -> int:
493 """total size of files included in this upload in bytes"""
494 return sum(f.size for f in self.files.values())
496 def _key(self) -> tuple[str, AptVersion, bool, str]:
497 """tuple used to compare two changes files
499 We sort by source name and version first. If these are identical,
500 we sort changes that include source before those without source (so
501 that sourceful uploads get processed first), and finally fall back
502 to the filename (this should really never happen).
503 """
504 return (
505 _mangle_source_name(self.changes.get("Source", "")),
506 AptVersion(self.changes.get("Version", "")),
507 not self.sourceful,
508 self.filename,
509 )
511 @override
512 def __eq__(self, other: object) -> bool:
513 if not isinstance(other, Changes): 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true
514 return NotImplemented
515 return self._key() == other._key()
517 def __lt__(self, other: "Changes") -> bool:
518 return self._key() < other._key()
521class Binary:
522 """Representation of a binary package"""
524 def __init__(self, directory: str, hashed_file: HashedFile):
525 self.hashed_file: HashedFile = hashed_file
526 """file object for the .deb"""
528 path = os.path.join(directory, hashed_file.input_filename)
529 data = apt_inst.DebFile(path).control.extractdata("control")
531 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
532 """dict to access fields in DEBIAN/control"""
534 @classmethod
535 def from_file(cls, directory, filename) -> "Binary":
536 hashed_file = HashedFile.from_file(directory, filename)
537 return cls(directory, hashed_file)
539 @property
540 def source(self) -> tuple[str, str]:
541 """get tuple with source package name and version"""
542 source = self.control.get("Source", None)
543 if source is None:
544 return (self.control["Package"], self.control["Version"])
545 match = re_field_source.match(source)
546 if not match: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true
547 raise InvalidBinaryException(
548 "{0}: Invalid Source field.".format(self.hashed_file.filename)
549 )
550 version = match.group("version")
551 if version is None:
552 version = self.control["Version"]
553 return (match.group("package"), version)
555 @property
556 def name(self) -> str:
557 return self.control["Package"]
559 @property
560 def type(self) -> str:
561 """package type ('deb' or 'udeb')"""
562 match = re_file_binary.match(self.hashed_file.filename)
563 if not match: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true
564 raise InvalidBinaryException(
565 "{0}: Does not match re_file_binary".format(self.hashed_file.filename)
566 )
567 return match.group("type")
569 @property
570 def component(self) -> str:
571 """component name"""
572 fields = self.control["Section"].split("/")
573 if len(fields) > 1:
574 return fields[0]
575 return "main"
578class Source:
579 """Representation of a source package"""
581 def __init__(
582 self,
583 directory: str,
584 hashed_files: list[HashedFile],
585 keyrings: Collection[str],
586 require_signature=True,
587 ):
588 self.hashed_files: list[HashedFile] = hashed_files
589 """list of source files (including the .dsc itself)"""
591 dsc_file = None
592 for f in hashed_files:
593 if re_file_dsc.match(f.filename):
594 if dsc_file is not None: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true
595 raise InvalidSourceException(
596 "Multiple .dsc found ({0} and {1})".format(
597 self._dsc_file.filename, f.filename
598 )
599 )
600 else:
601 dsc_file = f
603 if dsc_file is None: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 raise InvalidSourceException("No .dsc included in source files")
605 self._dsc_file: HashedFile = dsc_file
607 # make sure the hash for the dsc is valid before we use it
608 self._dsc_file.check(directory)
610 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
611 with open(dsc_file_path, "rb") as fd:
612 data = fd.read()
613 self.signature = SignedFile(data, keyrings, require_signature)
614 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
615 """dict to access fields in the .dsc file"""
617 self.package_list: daklib.packagelist.PackageList = (
618 daklib.packagelist.PackageList(self.dsc)
619 )
620 """Information about packages built by the source."""
622 self._files: Optional[dict[str, HashedFile]] = None
624 @classmethod
625 def from_file(
626 cls,
627 directory: str,
628 filename: str,
629 keyrings: Collection[str],
630 require_signature=True,
631 ) -> "Source":
632 hashed_file = HashedFile.from_file(directory, filename)
633 return cls(directory, [hashed_file], keyrings, require_signature)
635 @property
636 def files(self) -> dict[str, HashedFile]:
637 """dict mapping filenames to :class:`HashedFile` objects for additional source files
639 This list does not include the .dsc itself.
640 """
641 if self._files is None:
642 self._files = parse_file_list(self.dsc, False)
643 return self._files
645 @property
646 def primary_fingerprint(self) -> str:
647 """fingerprint of the key used to sign the .dsc"""
648 return self.signature.primary_fingerprint
650 @property
651 def valid_signature(self) -> bool:
652 """:const:`True` if the .dsc has a valid signature"""
653 return self.signature.valid
655 @property
656 def weak_signature(self) -> bool:
657 """:const:`True` if the .dsc was signed using a weak algorithm"""
658 return self.signature.weak_signature
660 @property
661 def component(self) -> str:
662 """guessed component name
664 Might be wrong. Don't rely on this.
665 """
666 if "Section" not in self.dsc: 666 ↛ 668line 666 didn't jump to line 668 because the condition on line 666 was always true
667 return "main"
668 fields = self.dsc["Section"].split("/")
669 if len(fields) > 1:
670 return fields[0]
671 return "main"
673 @property
674 def filename(self) -> str:
675 """filename of .dsc file"""
676 return self._dsc_file.filename