Coverage for daklib/upload.py: 86%
334 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to handle uploads not yet installed to the archive
19This module provides classes to handle uploads not yet installed to the
20archive. Central is the :class:`Changes` class which represents a changes file.
21It provides methods to access the included binary and source packages.
22"""
24import errno
25import functools
26import os
27from collections.abc import Collection, Mapping
28from typing import TYPE_CHECKING, Optional, override
30import apt_inst
31import apt_pkg
33import daklib.dakapt
34import daklib.packagelist
35from daklib.aptversion import AptVersion
36from daklib.gpg import SignedFile
37from daklib.regexes import (
38 re_field_source,
39 re_file_binary,
40 re_file_buildinfo,
41 re_file_dsc,
42 re_file_safe,
43 re_file_source,
44 re_file_source_tag2upload,
45)
47if TYPE_CHECKING:
48 import datetime
49 import re
52class UploadException(Exception):
53 pass
56class InvalidChangesException(UploadException):
57 pass
60class InvalidBinaryException(UploadException):
61 pass
64class InvalidSourceException(UploadException):
65 pass
68class InvalidHashException(UploadException):
69 def __init__(
70 self, filename: str, hash_name: str, expected: str | int, actual: str | int
71 ):
72 self.filename = filename
73 self.hash_name = hash_name
74 self.expected = expected
75 self.actual = actual
77 @override
78 def __str__(self):
79 return (
80 "Invalid {0} hash for {1}:\n"
81 "According to the control file the {0} hash should be {2},\n"
82 "but {1} has {3}.\n"
83 "\n"
84 "If you did not include {1} in your upload, a different version\n"
85 "might already be known to the archive software."
86 ).format(self.hash_name, self.filename, self.expected, self.actual)
89class InvalidFilenameException(UploadException):
90 def __init__(self, filename: str):
91 self.filename: str = filename
93 @override
94 def __str__(self):
95 return "Invalid filename '{0}'.".format(self.filename)
98class FileDoesNotExist(UploadException):
99 def __init__(self, filename: str):
100 self.filename = filename
102 @override
103 def __str__(self):
104 return "Refers to non-existing file '{0}'".format(self.filename)
107class HashedFile:
108 """file with checksums"""
110 def __init__(
111 self,
112 filename: str,
113 size: int,
114 md5sum: str,
115 sha1sum: str,
116 sha256sum: str,
117 section: Optional[str] = None,
118 priority: Optional[str] = None,
119 input_filename: Optional[str] = None,
120 ):
121 self.filename: str = filename
122 """name of the file"""
124 if input_filename is None: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true
125 input_filename = filename
126 self.input_filename: str = input_filename
127 """name of the file on disk
129 Used for temporary files that should not be installed using their on-disk name.
130 """
132 self.size: int = size
133 """size in bytes"""
135 self.md5sum: str = md5sum
136 """MD5 hash in hexdigits"""
138 self.sha1sum: str = sha1sum
139 """SHA1 hash in hexdigits"""
141 self.sha256sum: str = sha256sum
142 """SHA256 hash in hexdigits"""
144 self.section: Optional[str] = section
145 """section or :const:`None`"""
147 self.priority: Optional[str] = priority
148 """priority or :const:`None`"""
150 @classmethod
151 def from_file(
152 cls,
153 directory: str,
154 filename: str,
155 section: Optional[str] = None,
156 priority: Optional[str] = None,
157 ) -> "HashedFile":
158 """create with values for an existing file
160 Create a :class:`HashedFile` object that refers to an already existing file.
162 :param directory: directory the file is located in
163 :param filename: filename
164 :param section: optional section as given in .changes files
165 :param priority: optional priority as given in .changes files
166 :return: :class:`HashedFile` object for the given file
167 """
168 path = os.path.join(directory, filename)
169 with open(path, "r") as fh:
170 size = os.fstat(fh.fileno()).st_size
171 hashes = daklib.dakapt.DakHashes(fh)
172 return cls(
173 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority
174 )
176 def check(self, directory: str) -> None:
177 """Validate hashes
179 Check if size and hashes match the expected value.
181 :param directory: directory the file is located in
182 :raises InvalidHashException: if there is a hash mismatch
183 """
184 path = os.path.join(directory, self.input_filename)
185 try:
186 with open(path) as fh:
187 self.check_fh(fh)
188 except OSError as e:
189 if e.errno == errno.ENOENT: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 raise FileDoesNotExist(self.input_filename)
191 raise
193 def check_fh(self, fh) -> None:
194 size = os.fstat(fh.fileno()).st_size
195 fh.seek(0)
196 hashes = daklib.dakapt.DakHashes(fh)
198 if size != self.size: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 raise InvalidHashException(self.filename, "size", self.size, size)
201 if hashes.md5 != self.md5sum: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5)
204 if hashes.sha1 != self.sha1sum: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 raise InvalidHashException(
206 self.filename, "sha1sum", self.sha1sum, hashes.sha1
207 )
209 if hashes.sha256 != self.sha256sum: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise InvalidHashException(
211 self.filename, "sha256sum", self.sha256sum, hashes.sha256
212 )
215def parse_file_list(
216 control: Mapping[str, str],
217 has_priority_and_section: bool,
218 safe_file_regexp: "re.Pattern" = re_file_safe,
219 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"),
220) -> dict[str, HashedFile]:
221 """Parse Files and Checksums-* fields
223 :param control: control file to take fields from
224 :param has_priority_and_section: Files field include section and priority
225 (as in .changes)
226 :return: dict mapping filenames to :class:`HashedFile` objects
228 :raises InvalidChangesException: missing fields or other grave errors
229 """
230 entries: dict[str, dict[str, str | int]] = {}
231 entry: dict[str, str | int] | None
233 for line in control.get(fields[0], "").split("\n"):
234 if len(line) == 0: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 continue
237 if has_priority_and_section:
238 (md5sum, size, section, priority, filename) = line.split()
239 entry = dict(
240 md5sum=md5sum,
241 size=int(size),
242 section=section,
243 priority=priority,
244 filename=filename,
245 )
246 else:
247 (md5sum, size, filename) = line.split()
248 entry = dict(md5sum=md5sum, size=int(size), filename=filename)
250 entries[filename] = entry
252 for line in control.get(fields[1], "").split("\n"):
253 if len(line) == 0: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 continue
255 (sha1sum, size, filename) = line.split()
256 entry = entries.get(filename)
257 if entry is None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 raise InvalidChangesException(
259 "{0} is listed in {1}, but not in {2}.".format(
260 filename, fields[1], fields[0]
261 )
262 )
263 if entry is not None and entry.get("size", None) != int(size): 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 raise InvalidChangesException(
265 "Size for {0} in {1} and {2} fields differ.".format(
266 filename, fields[0], fields[1]
267 )
268 )
269 entry["sha1sum"] = sha1sum
271 for line in control.get(fields[2], "").split("\n"):
272 if len(line) == 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 continue
274 (sha256sum, size, filename) = line.split()
275 entry = entries.get(filename)
276 if entry is None: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 raise InvalidChangesException(
278 "{0} is listed in {1}, but not in {2}.".format(
279 filename, fields[2], fields[0]
280 )
281 )
282 if entry is not None and entry.get("size", None) != int(size): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 raise InvalidChangesException(
284 "Size for {0} in {1} and {2} fields differ.".format(
285 filename, fields[0], fields[2]
286 )
287 )
288 entry["sha256sum"] = sha256sum
290 files = {}
291 for filename, entry in entries.items():
292 if "size" not in entry: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 raise InvalidChangesException("No size for {0}.".format(filename))
294 if "md5sum" not in entry: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true
295 raise InvalidChangesException("No md5sum for {0}.".format(filename))
296 if "sha1sum" not in entry: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 raise InvalidChangesException("No sha1sum for {0}.".format(filename))
298 if "sha256sum" not in entry: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 raise InvalidChangesException("No sha256sum for {0}.".format(filename))
300 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 raise InvalidChangesException(
302 f"References file with unsafe filename '{filename}'."
303 )
304 files[filename] = HashedFile(**entry) # type: ignore[arg-type]
306 return files
309@functools.total_ordering
310class Changes:
311 """Representation of a .changes file"""
313 def __init__(
314 self,
315 directory: str,
316 filename: str,
317 keyrings: Collection[str],
318 require_signature: bool = True,
319 ):
320 if not re_file_safe.match(filename): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 raise InvalidChangesException("{0}: unsafe filename".format(filename))
323 self.directory: str = directory
324 """directory the .changes is located in"""
326 self.filename: str = filename
327 """name of the .changes file"""
329 with open(self.path, "rb") as fd:
330 data = fd.read()
331 self.signature = SignedFile(data, keyrings, require_signature)
332 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
333 """dict to access fields of the .changes file"""
335 self._binaries: "Optional[list[Binary]]" = None
336 self._source: "Optional[Source]" = None
337 self._files: Optional[dict[str, HashedFile]] = None
338 self._keyrings = keyrings
339 self._require_signature: bool = require_signature
341 @property
342 def path(self) -> str:
343 """path to the .changes file"""
344 return os.path.join(self.directory, self.filename)
346 @property
347 def primary_fingerprint(self) -> str:
348 """fingerprint of the key used for signing the .changes file"""
349 return self.signature.primary_fingerprint
351 @property
352 def valid_signature(self) -> bool:
353 """:const:`True` if the .changes has a valid signature"""
354 return self.signature.valid
356 @property
357 def weak_signature(self) -> bool:
358 """:const:`True` if the .changes was signed using a weak algorithm"""
359 return self.signature.weak_signature
361 @property
362 def signature_timestamp(self) -> "datetime.datetime":
363 return self.signature.signature_timestamp
365 @property
366 def contents_sha1(self) -> str:
367 return self.signature.contents_sha1
369 @property
370 def architectures(self) -> list[str]:
371 """list of architectures included in the upload"""
372 return self.changes.get("Architecture", "").split()
374 @property
375 def distributions(self) -> list[str]:
376 """list of target distributions for the upload"""
377 return self.changes["Distribution"].split()
379 @property
380 def source(self) -> "Optional[Source]":
381 """included source or :const:`None`"""
382 if self._source is None:
383 source_files = []
384 for f in self.files.values():
385 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
386 source_files.append(f)
387 if len(source_files) > 0:
388 self._source = Source(
389 self.directory,
390 source_files,
391 self._keyrings,
392 self._require_signature,
393 )
394 return self._source
396 @property
397 def source_tag2upload_files(self) -> list[HashedFile]:
398 """
399 extra source files
400 """
401 return [
402 f
403 for f in self.files.values()
404 if re_file_source_tag2upload.match(f.filename)
405 ]
407 @property
408 def sourceful(self) -> bool:
409 """:const:`True` if the upload includes source"""
410 return "source" in self.architectures
412 @property
413 def source_name(self) -> str:
414 """source package name"""
415 m = re_field_source.match(self.changes["Source"])
416 assert m is not None
417 return m.group("package")
419 @property
420 def binaries(self) -> "list[Binary]":
421 """included binary packages"""
422 if self._binaries is None:
423 self._binaries = [
424 Binary(self.directory, f)
425 for f in self.files.values()
426 if re_file_binary.match(f.filename)
427 ]
428 return self._binaries
430 @property
431 def byhand_files(self) -> list[HashedFile]:
432 """included byhand files"""
433 byhand = []
435 for f in self.files.values():
436 assert f.section is not None
437 if f.section == "byhand" or f.section.startswith("raw-"): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 byhand.append(f)
439 continue
440 if (
441 re_file_dsc.match(f.filename)
442 or re_file_source.match(f.filename)
443 or re_file_binary.match(f.filename)
444 ):
445 continue
446 if re_file_buildinfo.match(f.filename): 446 ↛ 449line 446 didn't jump to line 449 because the condition on line 446 was always true
447 continue
449 raise InvalidChangesException(
450 "{0}: {1} looks like a byhand package, but is in section {2}".format(
451 self.filename, f.filename, f.section
452 )
453 )
455 return byhand
457 @property
458 def buildinfo_files(self) -> list[HashedFile]:
459 """included buildinfo files"""
460 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)]
462 @property
463 def binary_names(self) -> list[str]:
464 """names of included binary packages"""
465 return self.changes.get("Binary", "").split()
467 @property
468 def closed_bugs(self) -> list[str]:
469 """bugs closed by this upload"""
470 return self.changes.get("Closes", "").split()
472 @property
473 def files(self) -> dict[str, HashedFile]:
474 """dict mapping filenames to :class:`HashedFile` objects"""
475 if self._files is None:
476 self._files = parse_file_list(self.changes, True)
477 return self._files
479 @property
480 def bytes(self) -> int:
481 """total size of files included in this upload in bytes"""
482 return sum(f.size for f in self.files.values())
484 def _key(self) -> tuple[str, AptVersion, bool, str]:
485 """tuple used to compare two changes files
487 We sort by source name and version first. If these are identical,
488 we sort changes that include source before those without source (so
489 that sourceful uploads get processed first), and finally fall back
490 to the filename (this should really never happen).
491 """
492 return (
493 self.changes.get("Source", ""),
494 AptVersion(self.changes.get("Version", "")),
495 not self.sourceful,
496 self.filename,
497 )
499 @override
500 def __eq__(self, other: object) -> bool:
501 if not isinstance(other, Changes): 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true
502 return NotImplemented
503 return self._key() == other._key()
505 def __lt__(self, other: "Changes") -> bool:
506 return self._key() < other._key()
509class Binary:
510 """Representation of a binary package"""
512 def __init__(self, directory: str, hashed_file: HashedFile):
513 self.hashed_file: HashedFile = hashed_file
514 """file object for the .deb"""
516 path = os.path.join(directory, hashed_file.input_filename)
517 data = apt_inst.DebFile(path).control.extractdata("control")
519 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
520 """dict to access fields in DEBIAN/control"""
522 @classmethod
523 def from_file(cls, directory, filename) -> "Binary":
524 hashed_file = HashedFile.from_file(directory, filename)
525 return cls(directory, hashed_file)
527 @property
528 def source(self) -> tuple[str, str]:
529 """get tuple with source package name and version"""
530 source = self.control.get("Source", None)
531 if source is None:
532 return (self.control["Package"], self.control["Version"])
533 match = re_field_source.match(source)
534 if not match: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 raise InvalidBinaryException(
536 "{0}: Invalid Source field.".format(self.hashed_file.filename)
537 )
538 version = match.group("version")
539 if version is None:
540 version = self.control["Version"]
541 return (match.group("package"), version)
543 @property
544 def name(self) -> str:
545 return self.control["Package"]
547 @property
548 def type(self) -> str:
549 """package type ('deb' or 'udeb')"""
550 match = re_file_binary.match(self.hashed_file.filename)
551 if not match: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 raise InvalidBinaryException(
553 "{0}: Does not match re_file_binary".format(self.hashed_file.filename)
554 )
555 return match.group("type")
557 @property
558 def component(self) -> str:
559 """component name"""
560 fields = self.control["Section"].split("/")
561 if len(fields) > 1:
562 return fields[0]
563 return "main"
566class Source:
567 """Representation of a source package"""
569 def __init__(
570 self,
571 directory: str,
572 hashed_files: list[HashedFile],
573 keyrings: Collection[str],
574 require_signature=True,
575 ):
576 self.hashed_files: list[HashedFile] = hashed_files
577 """list of source files (including the .dsc itself)"""
579 dsc_file = None
580 for f in hashed_files:
581 if re_file_dsc.match(f.filename):
582 if dsc_file is not None: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true
583 raise InvalidSourceException(
584 "Multiple .dsc found ({0} and {1})".format(
585 self._dsc_file.filename, f.filename
586 )
587 )
588 else:
589 dsc_file = f
591 if dsc_file is None: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true
592 raise InvalidSourceException("No .dsc included in source files")
593 self._dsc_file: HashedFile = dsc_file
595 # make sure the hash for the dsc is valid before we use it
596 self._dsc_file.check(directory)
598 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
599 with open(dsc_file_path, "rb") as fd:
600 data = fd.read()
601 self.signature = SignedFile(data, keyrings, require_signature)
602 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
603 """dict to access fields in the .dsc file"""
605 self.package_list: daklib.packagelist.PackageList = (
606 daklib.packagelist.PackageList(self.dsc)
607 )
608 """Information about packages built by the source."""
610 self._files: Optional[dict[str, HashedFile]] = None
612 @classmethod
613 def from_file(
614 cls,
615 directory: str,
616 filename: str,
617 keyrings: Collection[str],
618 require_signature=True,
619 ) -> "Source":
620 hashed_file = HashedFile.from_file(directory, filename)
621 return cls(directory, [hashed_file], keyrings, require_signature)
623 @property
624 def files(self) -> dict[str, HashedFile]:
625 """dict mapping filenames to :class:`HashedFile` objects for additional source files
627 This list does not include the .dsc itself.
628 """
629 if self._files is None:
630 self._files = parse_file_list(self.dsc, False)
631 return self._files
633 @property
634 def primary_fingerprint(self) -> str:
635 """fingerprint of the key used to sign the .dsc"""
636 return self.signature.primary_fingerprint
638 @property
639 def valid_signature(self) -> bool:
640 """:const:`True` if the .dsc has a valid signature"""
641 return self.signature.valid
643 @property
644 def weak_signature(self) -> bool:
645 """:const:`True` if the .dsc was signed using a weak algorithm"""
646 return self.signature.weak_signature
648 @property
649 def component(self) -> str:
650 """guessed component name
652 Might be wrong. Don't rely on this.
653 """
654 if "Section" not in self.dsc: 654 ↛ 656line 654 didn't jump to line 656 because the condition on line 654 was always true
655 return "main"
656 fields = self.dsc["Section"].split("/")
657 if len(fields) > 1:
658 return fields[0]
659 return "main"
661 @property
662 def filename(self) -> str:
663 """filename of .dsc file"""
664 return self._dsc_file.filename