1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to handle uploads not yet installed to the archive
19This module provides classes to handle uploads not yet installed to the
20archive. Central is the :class:`Changes` class which represents a changes file.
21It provides methods to access the included binary and source packages.
22"""
24import errno
25import functools
26import os
27from collections.abc import Mapping
28from typing import TYPE_CHECKING, Optional
30import apt_inst
31import apt_pkg
33import daklib.dakapt
34import daklib.packagelist
35from daklib.aptversion import AptVersion
36from daklib.gpg import SignedFile
37from daklib.regexes import (
38 re_field_source,
39 re_file_binary,
40 re_file_buildinfo,
41 re_file_dsc,
42 re_file_safe,
43 re_file_source,
44 re_file_source_tag2upload,
45)
47if TYPE_CHECKING: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true
48 import datetime
49 import re
52class UploadException(Exception):
53 pass
56class InvalidChangesException(UploadException):
57 pass
60class InvalidBinaryException(UploadException):
61 pass
64class InvalidSourceException(UploadException):
65 pass
68class InvalidHashException(UploadException):
69 def __init__(self, filename: str, hash_name: str, expected, actual):
70 self.filename = filename
71 self.hash_name = hash_name
72 self.expected = expected
73 self.actual = actual
75 def __str__(self):
76 return (
77 "Invalid {0} hash for {1}:\n"
78 "According to the control file the {0} hash should be {2},\n"
79 "but {1} has {3}.\n"
80 "\n"
81 "If you did not include {1} in your upload, a different version\n"
82 "might already be known to the archive software."
83 ).format(self.hash_name, self.filename, self.expected, self.actual)
86class InvalidFilenameException(UploadException):
87 def __init__(self, filename: str):
88 self.filename: str = filename
90 def __str__(self):
91 return "Invalid filename '{0}'.".format(self.filename)
94class FileDoesNotExist(UploadException):
95 def __init__(self, filename: str):
96 self.filename = filename
98 def __str__(self):
99 return "Refers to non-existing file '{0}'".format(self.filename)
102class HashedFile:
103 """file with checksums"""
105 def __init__(
106 self,
107 filename: str,
108 size: int,
109 md5sum: str,
110 sha1sum: str,
111 sha256sum: str,
112 section: Optional[str] = None,
113 priority: Optional[str] = None,
114 input_filename: Optional[str] = None,
115 ):
116 self.filename: str = filename
117 """name of the file"""
119 if input_filename is None: 119 ↛ 121line 119 didn't jump to line 121, because the condition on line 119 was never false
120 input_filename = filename
121 self.input_filename: str = input_filename
122 """name of the file on disk
124 Used for temporary files that should not be installed using their on-disk name.
125 """
127 self.size: int = size
128 """size in bytes"""
130 self.md5sum: str = md5sum
131 """MD5 hash in hexdigits"""
133 self.sha1sum: str = sha1sum
134 """SHA1 hash in hexdigits"""
136 self.sha256sum: str = sha256sum
137 """SHA256 hash in hexdigits"""
139 self.section: Optional[str] = section
140 """section or :const:`None`"""
142 self.priority: Optional[str] = priority
143 """priority or :const:`None`"""
145 @classmethod
146 def from_file(
147 cls,
148 directory: str,
149 filename: str,
150 section: Optional[str] = None,
151 priority: Optional[str] = None,
152 ) -> "HashedFile":
153 """create with values for an existing file
155 Create a :class:`HashedFile` object that refers to an already existing file.
157 :param directory: directory the file is located in
158 :param filename: filename
159 :param section: optional section as given in .changes files
160 :param priority: optional priority as given in .changes files
161 :return: :class:`HashedFile` object for the given file
162 """
163 path = os.path.join(directory, filename)
164 with open(path, "r") as fh:
165 size = os.fstat(fh.fileno()).st_size
166 hashes = daklib.dakapt.DakHashes(fh)
167 return cls(
168 filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority
169 )
171 def check(self, directory: str) -> None:
172 """Validate hashes
174 Check if size and hashes match the expected value.
176 :param directory: directory the file is located in
177 :raises InvalidHashException: if there is a hash mismatch
178 """
179 path = os.path.join(directory, self.input_filename)
180 try:
181 with open(path) as fh:
182 self.check_fh(fh)
183 except OSError as e:
184 if e.errno == errno.ENOENT: 184 ↛ 186line 184 didn't jump to line 186, because the condition on line 184 was never false
185 raise FileDoesNotExist(self.input_filename)
186 raise
188 def check_fh(self, fh) -> None:
189 size = os.fstat(fh.fileno()).st_size
190 fh.seek(0)
191 hashes = daklib.dakapt.DakHashes(fh)
193 if size != self.size: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true
194 raise InvalidHashException(self.filename, "size", self.size, size)
196 if hashes.md5 != self.md5sum: 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true
197 raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5)
199 if hashes.sha1 != self.sha1sum: 199 ↛ 200line 199 didn't jump to line 200, because the condition on line 199 was never true
200 raise InvalidHashException(
201 self.filename, "sha1sum", self.sha1sum, hashes.sha1
202 )
204 if hashes.sha256 != self.sha256sum: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true
205 raise InvalidHashException(
206 self.filename, "sha256sum", self.sha256sum, hashes.sha256
207 )
210def parse_file_list(
211 control: Mapping[str, str],
212 has_priority_and_section: bool,
213 safe_file_regexp: "re.Pattern" = re_file_safe,
214 fields=("Files", "Checksums-Sha1", "Checksums-Sha256"),
215) -> dict[str, HashedFile]:
216 """Parse Files and Checksums-* fields
218 :param control: control file to take fields from
219 :param has_priority_and_section: Files field include section and priority
220 (as in .changes)
221 :return: dict mapping filenames to :class:`HashedFile` objects
223 :raises InvalidChangesException: missing fields or other grave errors
224 """
225 entries = {}
227 for line in control.get(fields[0], "").split("\n"):
228 if len(line) == 0: 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true
229 continue
231 if has_priority_and_section:
232 (md5sum, size, section, priority, filename) = line.split()
233 entry = dict(
234 md5sum=md5sum,
235 size=int(size),
236 section=section,
237 priority=priority,
238 filename=filename,
239 )
240 else:
241 (md5sum, size, filename) = line.split()
242 entry = dict(md5sum=md5sum, size=int(size), filename=filename)
244 entries[filename] = entry
246 for line in control.get(fields[1], "").split("\n"):
247 if len(line) == 0: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true
248 continue
249 (sha1sum, size, filename) = line.split()
250 entry = entries.get(filename)
251 if entry is None: 251 ↛ 252line 251 didn't jump to line 252, because the condition on line 251 was never true
252 raise InvalidChangesException(
253 "{0} is listed in {1}, but not in {2}.".format(
254 filename, fields[1], fields[0]
255 )
256 )
257 if entry is not None and entry.get("size", None) != int(size): 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true
258 raise InvalidChangesException(
259 "Size for {0} in {1} and {2} fields differ.".format(
260 filename, fields[0], fields[1]
261 )
262 )
263 entry["sha1sum"] = sha1sum
265 for line in control.get(fields[2], "").split("\n"):
266 if len(line) == 0: 266 ↛ 267line 266 didn't jump to line 267, because the condition on line 266 was never true
267 continue
268 (sha256sum, size, filename) = line.split()
269 entry = entries.get(filename)
270 if entry is None: 270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true
271 raise InvalidChangesException(
272 "{0} is listed in {1}, but not in {2}.".format(
273 filename, fields[2], fields[0]
274 )
275 )
276 if entry is not None and entry.get("size", None) != int(size): 276 ↛ 277line 276 didn't jump to line 277, because the condition on line 276 was never true
277 raise InvalidChangesException(
278 "Size for {0} in {1} and {2} fields differ.".format(
279 filename, fields[0], fields[2]
280 )
281 )
282 entry["sha256sum"] = sha256sum
284 files = {}
285 for entry in entries.values():
286 filename = entry["filename"]
287 if "size" not in entry: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true
288 raise InvalidChangesException("No size for {0}.".format(filename))
289 if "md5sum" not in entry: 289 ↛ 290line 289 didn't jump to line 290, because the condition on line 289 was never true
290 raise InvalidChangesException("No md5sum for {0}.".format(filename))
291 if "sha1sum" not in entry: 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true
292 raise InvalidChangesException("No sha1sum for {0}.".format(filename))
293 if "sha256sum" not in entry: 293 ↛ 294line 293 didn't jump to line 294, because the condition on line 293 was never true
294 raise InvalidChangesException("No sha256sum for {0}.".format(filename))
295 if safe_file_regexp is not None and not safe_file_regexp.match(filename): 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true
296 raise InvalidChangesException(
297 f"References file with unsafe filename '{filename}'."
298 )
299 files[filename] = HashedFile(**entry)
301 return files
304@functools.total_ordering
305class Changes:
306 """Representation of a .changes file"""
308 def __init__(
309 self, directory: str, filename: str, keyrings, require_signature: bool = True
310 ):
311 if not re_file_safe.match(filename): 311 ↛ 312line 311 didn't jump to line 312, because the condition on line 311 was never true
312 raise InvalidChangesException("{0}: unsafe filename".format(filename))
314 self.directory: str = directory
315 """directory the .changes is located in"""
317 self.filename: str = filename
318 """name of the .changes file"""
320 with open(self.path, "rb") as fd:
321 data = fd.read()
322 self.signature = SignedFile(data, keyrings, require_signature)
323 self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
324 """dict to access fields of the .changes file"""
326 self._binaries: "Optional[list[Binary]]" = None
327 self._source: "Optional[Source]" = None
328 self._files: Optional[dict[str, HashedFile]] = None
329 self._keyrings = keyrings
330 self._require_signature: bool = require_signature
332 @property
333 def path(self) -> str:
334 """path to the .changes file"""
335 return os.path.join(self.directory, self.filename)
337 @property
338 def primary_fingerprint(self) -> str:
339 """fingerprint of the key used for signing the .changes file"""
340 return self.signature.primary_fingerprint
342 @property
343 def valid_signature(self) -> bool:
344 """:const:`True` if the .changes has a valid signature"""
345 return self.signature.valid
347 @property
348 def weak_signature(self) -> bool:
349 """:const:`True` if the .changes was signed using a weak algorithm"""
350 return self.signature.weak_signature
352 @property
353 def signature_timestamp(self) -> "datetime.datetime":
354 return self.signature.signature_timestamp
356 @property
357 def contents_sha1(self) -> str:
358 return self.signature.contents_sha1
360 @property
361 def architectures(self) -> list[str]:
362 """list of architectures included in the upload"""
363 return self.changes.get("Architecture", "").split()
365 @property
366 def distributions(self) -> list[str]:
367 """list of target distributions for the upload"""
368 return self.changes["Distribution"].split()
370 @property
371 def source(self) -> "Optional[Source]":
372 """included source or :const:`None`"""
373 if self._source is None:
374 source_files = []
375 for f in self.files.values():
376 if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
377 source_files.append(f)
378 if len(source_files) > 0:
379 self._source = Source(
380 self.directory,
381 source_files,
382 self._keyrings,
383 self._require_signature,
384 )
385 return self._source
387 @property
388 def source_tag2upload_files(self) -> list[HashedFile]:
389 """
390 extra source files
391 """
392 return [
393 f
394 for f in self.files.values()
395 if re_file_source_tag2upload.match(f.filename)
396 ]
398 @property
399 def sourceful(self) -> bool:
400 """:const:`True` if the upload includes source"""
401 return "source" in self.architectures
403 @property
404 def source_name(self) -> str:
405 """source package name"""
406 return re_field_source.match(self.changes["Source"]).group("package")
408 @property
409 def binaries(self) -> "list[Binary]":
410 """included binary packages"""
411 if self._binaries is None:
412 self._binaries = [
413 Binary(self.directory, f)
414 for f in self.files.values()
415 if re_file_binary.match(f.filename)
416 ]
417 return self._binaries
419 @property
420 def byhand_files(self) -> list[HashedFile]:
421 """included byhand files"""
422 byhand = []
424 for f in self.files.values():
425 if f.section == "byhand" or f.section[:4] == "raw-": 425 ↛ 426line 425 didn't jump to line 426, because the condition on line 425 was never true
426 byhand.append(f)
427 continue
428 if (
429 re_file_dsc.match(f.filename)
430 or re_file_source.match(f.filename)
431 or re_file_binary.match(f.filename)
432 ):
433 continue
434 if re_file_buildinfo.match(f.filename): 434 ↛ 437line 434 didn't jump to line 437, because the condition on line 434 was never false
435 continue
437 raise InvalidChangesException(
438 "{0}: {1} looks like a byhand package, but is in section {2}".format(
439 self.filename, f.filename, f.section
440 )
441 )
443 return byhand
445 @property
446 def buildinfo_files(self) -> list[HashedFile]:
447 """included buildinfo files"""
448 return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)]
450 @property
451 def binary_names(self) -> list[str]:
452 """names of included binary packages"""
453 return self.changes.get("Binary", "").split()
455 @property
456 def closed_bugs(self) -> list[str]:
457 """bugs closed by this upload"""
458 return self.changes.get("Closes", "").split()
460 @property
461 def files(self) -> dict[str, HashedFile]:
462 """dict mapping filenames to :class:`HashedFile` objects"""
463 if self._files is None:
464 self._files = parse_file_list(self.changes, True)
465 return self._files
467 @property
468 def bytes(self) -> int:
469 """total size of files included in this upload in bytes"""
470 return sum(f.size for f in self.files.values())
472 def _key(self) -> tuple[str, AptVersion, bool, str]:
473 """tuple used to compare two changes files
475 We sort by source name and version first. If these are identical,
476 we sort changes that include source before those without source (so
477 that sourceful uploads get processed first), and finally fall back
478 to the filename (this should really never happen).
479 """
480 return (
481 self.changes.get("Source", ""),
482 AptVersion(self.changes.get("Version", "")),
483 not self.sourceful,
484 self.filename,
485 )
487 def __eq__(self, other: object) -> bool:
488 if not isinstance(other, Changes): 488 ↛ 489line 488 didn't jump to line 489, because the condition on line 488 was never true
489 return NotImplemented
490 return self._key() == other._key()
492 def __lt__(self, other: "Changes") -> bool:
493 return self._key() < other._key()
496class Binary:
497 """Representation of a binary package"""
499 def __init__(self, directory: str, hashed_file: HashedFile):
500 self.hashed_file: HashedFile = hashed_file
501 """file object for the .deb"""
503 path = os.path.join(directory, hashed_file.input_filename)
504 data = apt_inst.DebFile(path).control.extractdata("control")
506 self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
507 """dict to access fields in DEBIAN/control"""
509 @classmethod
510 def from_file(cls, directory, filename) -> "Binary":
511 hashed_file = HashedFile.from_file(directory, filename)
512 return cls(directory, hashed_file)
514 @property
515 def source(self) -> tuple[str, str]:
516 """get tuple with source package name and version"""
517 source = self.control.get("Source", None)
518 if source is None:
519 return (self.control["Package"], self.control["Version"])
520 match = re_field_source.match(source)
521 if not match: 521 ↛ 522line 521 didn't jump to line 522, because the condition on line 521 was never true
522 raise InvalidBinaryException(
523 "{0}: Invalid Source field.".format(self.hashed_file.filename)
524 )
525 version = match.group("version")
526 if version is None:
527 version = self.control["Version"]
528 return (match.group("package"), version)
530 @property
531 def name(self) -> str:
532 return self.control["Package"]
534 @property
535 def type(self) -> str:
536 """package type ('deb' or 'udeb')"""
537 match = re_file_binary.match(self.hashed_file.filename)
538 if not match: 538 ↛ 539line 538 didn't jump to line 539, because the condition on line 538 was never true
539 raise InvalidBinaryException(
540 "{0}: Does not match re_file_binary".format(self.hashed_file.filename)
541 )
542 return match.group("type")
544 @property
545 def component(self) -> str:
546 """component name"""
547 fields = self.control["Section"].split("/")
548 if len(fields) > 1:
549 return fields[0]
550 return "main"
553class Source:
554 """Representation of a source package"""
556 def __init__(
557 self,
558 directory: str,
559 hashed_files: list[HashedFile],
560 keyrings,
561 require_signature=True,
562 ):
563 self.hashed_files: list[HashedFile] = hashed_files
564 """list of source files (including the .dsc itself)"""
566 dsc_file = None
567 for f in hashed_files:
568 if re_file_dsc.match(f.filename):
569 if dsc_file is not None: 569 ↛ 570line 569 didn't jump to line 570, because the condition on line 569 was never true
570 raise InvalidSourceException(
571 "Multiple .dsc found ({0} and {1})".format(
572 self._dsc_file.filename, f.filename
573 )
574 )
575 else:
576 dsc_file = f
578 if dsc_file is None: 578 ↛ 579line 578 didn't jump to line 579, because the condition on line 578 was never true
579 raise InvalidSourceException("No .dsc included in source files")
580 self._dsc_file: HashedFile = dsc_file
582 # make sure the hash for the dsc is valid before we use it
583 self._dsc_file.check(directory)
585 dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
586 with open(dsc_file_path, "rb") as fd:
587 data = fd.read()
588 self.signature = SignedFile(data, keyrings, require_signature)
589 self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
590 """dict to access fields in the .dsc file"""
592 self.package_list: daklib.packagelist.PackageList = (
593 daklib.packagelist.PackageList(self.dsc)
594 )
595 """Information about packages built by the source."""
597 self._files: Optional[dict[str, HashedFile]] = None
599 @classmethod
600 def from_file(
601 cls, directory, filename, keyrings, require_signature=True
602 ) -> "Source":
603 hashed_file = HashedFile.from_file(directory, filename)
604 return cls(directory, [hashed_file], keyrings, require_signature)
606 @property
607 def files(self) -> dict[str, HashedFile]:
608 """dict mapping filenames to :class:`HashedFile` objects for additional source files
610 This list does not include the .dsc itself.
611 """
612 if self._files is None:
613 self._files = parse_file_list(self.dsc, False)
614 return self._files
616 @property
617 def primary_fingerprint(self) -> str:
618 """fingerprint of the key used to sign the .dsc"""
619 return self.signature.primary_fingerprint
621 @property
622 def valid_signature(self) -> bool:
623 """:const:`True` if the .dsc has a valid signature"""
624 return self.signature.valid
626 @property
627 def weak_signature(self) -> bool:
628 """:const:`True` if the .dsc was signed using a weak algorithm"""
629 return self.signature.weak_signature
631 @property
632 def component(self) -> str:
633 """guessed component name
635 Might be wrong. Don't rely on this.
636 """
637 if "Section" not in self.dsc: 637 ↛ 639line 637 didn't jump to line 639, because the condition on line 637 was never false
638 return "main"
639 fields = self.dsc["Section"].split("/")
640 if len(fields) > 1:
641 return fields[0]
642 return "main"
644 @property
645 def filename(self) -> str:
646 """filename of .dsc file"""
647 return self._dsc_file.filename