# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""module to handle uploads not yet installed to the archive
This module provides classes to handle uploads not yet installed to the
archive. Central is the :class:`Changes` class which represents a changes file.
It provides methods to access the included binary and source packages.
"""
import apt_inst
import apt_pkg
import errno
import functools
import os
from collections.abc import Mapping
from typing import Optional, TYPE_CHECKING
from daklib.aptversion import AptVersion
from daklib.gpg import SignedFile
from daklib.regexes import *
import daklib.dakapt
import daklib.packagelist
if TYPE_CHECKING:
import datetime
import re
[docs]class UploadException(Exception):
pass
[docs]class InvalidChangesException(UploadException):
pass
[docs]class InvalidBinaryException(UploadException):
pass
[docs]class InvalidSourceException(UploadException):
pass
[docs]class InvalidHashException(UploadException):
def __init__(self, filename: str, hash_name: str, expected, actual):
self.filename = filename
self.hash_name = hash_name
self.expected = expected
self.actual = actual
def __str__(self):
return ("Invalid {0} hash for {1}:\n"
"According to the control file the {0} hash should be {2},\n"
"but {1} has {3}.\n"
"\n"
"If you did not include {1} in your upload, a different version\n"
"might already be known to the archive software.") \
.format(self.hash_name, self.filename, self.expected, self.actual)
[docs]class InvalidFilenameException(UploadException):
def __init__(self, filename: str):
self.filename: str = filename
def __str__(self):
return "Invalid filename '{0}'.".format(self.filename)
[docs]class FileDoesNotExist(UploadException):
def __init__(self, filename: str):
self.filename = filename
def __str__(self):
return "Refers to non-existing file '{0}'".format(self.filename)
[docs]class HashedFile:
"""file with checksums
"""
def __init__(self, filename: str, size: int, md5sum: str, sha1sum: str, sha256sum: str, section: Optional[str] = None, priority: Optional[str] = None, input_filename: Optional[str] = None):
self.filename: str = filename
"""name of the file"""
if input_filename is None:
input_filename = filename
self.input_filename: str = input_filename
"""name of the file on disk
Used for temporary files that should not be installed using their on-disk name.
"""
self.size: int = size
"""size in bytes"""
self.md5sum: str = md5sum
"""MD5 hash in hexdigits"""
self.sha1sum: str = sha1sum
"""SHA1 hash in hexdigits"""
self.sha256sum: str = sha256sum
"""SHA256 hash in hexdigits"""
self.section: Optional[str] = section
"""section or :const:`None`"""
self.priority: Optional[str] = priority
"""priority or :const:`None`"""
[docs] @classmethod
def from_file(cls, directory: str, filename: str, section: Optional[str] = None, priority: Optional[str] = None) -> 'HashedFile':
"""create with values for an existing file
Create a :class:`HashedFile` object that refers to an already existing file.
:param directory: directory the file is located in
:param filename: filename
:param section: optional section as given in .changes files
:param priority: optional priority as given in .changes files
:return: :class:`HashedFile` object for the given file
"""
path = os.path.join(directory, filename)
with open(path, 'r') as fh:
size = os.fstat(fh.fileno()).st_size
hashes = daklib.dakapt.DakHashes(fh)
return cls(filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority)
[docs] def check(self, directory: str) -> None:
"""Validate hashes
Check if size and hashes match the expected value.
:param directory: directory the file is located in
:raises InvalidHashException: if there is a hash mismatch
"""
path = os.path.join(directory, self.input_filename)
try:
with open(path) as fh:
self.check_fh(fh)
except OSError as e:
if e.errno == errno.ENOENT:
raise FileDoesNotExist(self.input_filename)
raise
[docs] def check_fh(self, fh) -> None:
size = os.fstat(fh.fileno()).st_size
fh.seek(0)
hashes = daklib.dakapt.DakHashes(fh)
if size != self.size:
raise InvalidHashException(self.filename, 'size', self.size, size)
if hashes.md5 != self.md5sum:
raise InvalidHashException(self.filename, 'md5sum', self.md5sum, hashes.md5)
if hashes.sha1 != self.sha1sum:
raise InvalidHashException(self.filename, 'sha1sum', self.sha1sum, hashes.sha1)
if hashes.sha256 != self.sha256sum:
raise InvalidHashException(self.filename, 'sha256sum', self.sha256sum, hashes.sha256)
[docs]def parse_file_list(
control: Mapping[str, str],
has_priority_and_section: bool,
safe_file_regexp: 're.Pattern' = re_file_safe,
fields=('Files', 'Checksums-Sha1', 'Checksums-Sha256')
) -> dict[str, HashedFile]:
"""Parse Files and Checksums-* fields
:param control: control file to take fields from
:param has_priority_and_section: Files field include section and priority
(as in .changes)
:return: dict mapping filenames to :class:`HashedFile` objects
:raises InvalidChangesException: missing fields or other grave errors
"""
entries = {}
for line in control.get(fields[0], "").split('\n'):
if len(line) == 0:
continue
if has_priority_and_section:
(md5sum, size, section, priority, filename) = line.split()
entry = dict(md5sum=md5sum, size=int(size), section=section, priority=priority, filename=filename)
else:
(md5sum, size, filename) = line.split()
entry = dict(md5sum=md5sum, size=int(size), filename=filename)
entries[filename] = entry
for line in control.get(fields[1], "").split('\n'):
if len(line) == 0:
continue
(sha1sum, size, filename) = line.split()
entry = entries.get(filename)
if entry is None:
raise InvalidChangesException('{0} is listed in {1}, but not in {2}.'.format(filename, fields[1], fields[0]))
if entry is not None and entry.get('size', None) != int(size):
raise InvalidChangesException('Size for {0} in {1} and {2} fields differ.'.format(filename, fields[0], fields[1]))
entry['sha1sum'] = sha1sum
for line in control.get(fields[2], "").split('\n'):
if len(line) == 0:
continue
(sha256sum, size, filename) = line.split()
entry = entries.get(filename)
if entry is None:
raise InvalidChangesException('{0} is listed in {1}, but not in {2}.'.format(filename, fields[2], fields[0]))
if entry is not None and entry.get('size', None) != int(size):
raise InvalidChangesException('Size for {0} in {1} and {2} fields differ.'.format(filename, fields[0], fields[2]))
entry['sha256sum'] = sha256sum
files = {}
for entry in entries.values():
filename = entry['filename']
if 'size' not in entry:
raise InvalidChangesException('No size for {0}.'.format(filename))
if 'md5sum' not in entry:
raise InvalidChangesException('No md5sum for {0}.'.format(filename))
if 'sha1sum' not in entry:
raise InvalidChangesException('No sha1sum for {0}.'.format(filename))
if 'sha256sum' not in entry:
raise InvalidChangesException('No sha256sum for {0}.'.format(filename))
if safe_file_regexp is not None and not safe_file_regexp.match(filename):
raise InvalidChangesException(f"References file with unsafe filename '{filename}'.")
files[filename] = HashedFile(**entry)
return files
[docs]@functools.total_ordering
class Changes:
"""Representation of a .changes file
"""
def __init__(self, directory: str, filename: str, keyrings, require_signature: bool = True):
if not re_file_safe.match(filename):
raise InvalidChangesException('{0}: unsafe filename'.format(filename))
self.directory: str = directory
"""directory the .changes is located in"""
self.filename: str = filename
"""name of the .changes file"""
with open(self.path, 'rb') as fd:
data = fd.read()
self.signature = SignedFile(data, keyrings, require_signature)
self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
"""dict to access fields of the .changes file"""
self._binaries: 'Optional[list[Binary]]' = None
self._source: 'Optional[Source]' = None
self._files: Optional[dict[str, HashedFile]] = None
self._keyrings = keyrings
self._require_signature: bool = require_signature
@property
def path(self) -> str:
"""path to the .changes file"""
return os.path.join(self.directory, self.filename)
@property
def primary_fingerprint(self) -> str:
"""fingerprint of the key used for signing the .changes file"""
return self.signature.primary_fingerprint
@property
def valid_signature(self) -> bool:
""":const:`True` if the .changes has a valid signature"""
return self.signature.valid
@property
def weak_signature(self) -> bool:
""":const:`True` if the .changes was signed using a weak algorithm"""
return self.signature.weak_signature
@property
def signature_timestamp(self) -> 'datetime.datetime':
return self.signature.signature_timestamp
@property
def contents_sha1(self) -> str:
return self.signature.contents_sha1
@property
def architectures(self) -> list[str]:
"""list of architectures included in the upload"""
return self.changes.get('Architecture', '').split()
@property
def distributions(self) -> list[str]:
"""list of target distributions for the upload"""
return self.changes['Distribution'].split()
@property
def source(self) -> 'Optional[Source]':
"""included source or :const:`None`"""
if self._source is None:
source_files = []
for f in self.files.values():
if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
source_files.append(f)
if len(source_files) > 0:
self._source = Source(self.directory, source_files, self._keyrings, self._require_signature)
return self._source
@property
def sourceful(self) -> bool:
""":const:`True` if the upload includes source"""
return "source" in self.architectures
@property
def source_name(self) -> str:
"""source package name"""
return re_field_source.match(self.changes['Source']).group('package')
@property
def binaries(self) -> 'list[Binary]':
"""included binary packages"""
if self._binaries is None:
self._binaries = [
Binary(self.directory, f)
for f in self.files.values()
if re_file_binary.match(f.filename)
]
return self._binaries
@property
def byhand_files(self) -> list[HashedFile]:
"""included byhand files"""
byhand = []
for f in self.files.values():
if f.section == 'byhand' or f.section[:4] == 'raw-':
byhand.append(f)
continue
if re_file_dsc.match(f.filename) or re_file_source.match(f.filename) or re_file_binary.match(f.filename):
continue
if re_file_buildinfo.match(f.filename):
continue
raise InvalidChangesException("{0}: {1} looks like a byhand package, but is in section {2}".format(self.filename, f.filename, f.section))
return byhand
@property
def buildinfo_files(self) -> list[HashedFile]:
"""included buildinfo files"""
return [
f for f in self.files.values()
if re_file_buildinfo.match(f.filename)
]
@property
def binary_names(self) -> list[str]:
"""names of included binary packages"""
return self.changes.get('Binary', '').split()
@property
def closed_bugs(self) -> list[str]:
"""bugs closed by this upload"""
return self.changes.get('Closes', '').split()
@property
def files(self) -> dict[str, HashedFile]:
"""dict mapping filenames to :class:`HashedFile` objects"""
if self._files is None:
self._files = parse_file_list(self.changes, True)
return self._files
@property
def bytes(self) -> int:
"""total size of files included in this upload in bytes"""
return sum(f.size for f in self.files.values())
[docs] def _key(self) -> tuple[str, AptVersion, bool, str]:
"""tuple used to compare two changes files
We sort by source name and version first. If these are identical,
we sort changes that include source before those without source (so
that sourceful uploads get processed first), and finally fall back
to the filename (this should really never happen).
"""
return (
self.changes.get('Source', ''),
AptVersion(self.changes.get('Version', '')),
not self.sourceful,
self.filename
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Changes):
return NotImplemented
return self._key() == other._key()
def __lt__(self, other: 'Changes') -> bool:
return self._key() < other._key()
[docs]class Binary:
"""Representation of a binary package
"""
def __init__(self, directory: str, hashed_file: HashedFile):
self.hashed_file: HashedFile = hashed_file
"""file object for the .deb"""
path = os.path.join(directory, hashed_file.input_filename)
data = apt_inst.DebFile(path).control.extractdata("control")
self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
"""dict to access fields in DEBIAN/control"""
[docs] @classmethod
def from_file(cls, directory, filename) -> 'Binary':
hashed_file = HashedFile.from_file(directory, filename)
return cls(directory, hashed_file)
@property
def source(self) -> tuple[str, str]:
"""get tuple with source package name and version"""
source = self.control.get("Source", None)
if source is None:
return (self.control["Package"], self.control["Version"])
match = re_field_source.match(source)
if not match:
raise InvalidBinaryException('{0}: Invalid Source field.'.format(self.hashed_file.filename))
version = match.group('version')
if version is None:
version = self.control['Version']
return (match.group('package'), version)
@property
def name(self) -> str:
return self.control['Package']
@property
def type(self) -> str:
"""package type ('deb' or 'udeb')"""
match = re_file_binary.match(self.hashed_file.filename)
if not match:
raise InvalidBinaryException('{0}: Does not match re_file_binary'.format(self.hashed_file.filename))
return match.group('type')
@property
def component(self) -> str:
"""component name"""
fields = self.control['Section'].split('/')
if len(fields) > 1:
return fields[0]
return "main"
[docs]class Source:
"""Representation of a source package
"""
def __init__(self, directory: str, hashed_files: list[HashedFile], keyrings, require_signature=True):
self.hashed_files: list[HashedFile] = hashed_files
"""list of source files (including the .dsc itself)"""
dsc_file = None
for f in hashed_files:
if re_file_dsc.match(f.filename):
if dsc_file is not None:
raise InvalidSourceException("Multiple .dsc found ({0} and {1})".format(self._dsc_file.filename, f.filename))
else:
dsc_file = f
if dsc_file is None:
raise InvalidSourceException("No .dsc included in source files")
self._dsc_file: HashedFile = dsc_file
# make sure the hash for the dsc is valid before we use it
self._dsc_file.check(directory)
dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
with open(dsc_file_path, 'rb') as fd:
data = fd.read()
self.signature = SignedFile(data, keyrings, require_signature)
self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
"""dict to access fields in the .dsc file"""
self.package_list: daklib.packagelist.PackageList = daklib.packagelist.PackageList(self.dsc)
"""Information about packages built by the source."""
self._files: Optional[dict[str, HashedFile]] = None
[docs] @classmethod
def from_file(cls, directory, filename, keyrings, require_signature=True) -> 'Source':
hashed_file = HashedFile.from_file(directory, filename)
return cls(directory, [hashed_file], keyrings, require_signature)
@property
def files(self) -> dict[str, HashedFile]:
"""dict mapping filenames to :class:`HashedFile` objects for additional source files
This list does not include the .dsc itself.
"""
if self._files is None:
self._files = parse_file_list(self.dsc, False)
return self._files
@property
def primary_fingerprint(self) -> str:
"""fingerprint of the key used to sign the .dsc"""
return self.signature.primary_fingerprint
@property
def valid_signature(self) -> bool:
""":const:`True` if the .dsc has a valid signature"""
return self.signature.valid
@property
def weak_signature(self) -> bool:
""":const:`True` if the .dsc was signed using a weak algorithm"""
return self.signature.weak_signature
@property
def component(self) -> str:
"""guessed component name
Might be wrong. Don't rely on this.
"""
if 'Section' not in self.dsc:
return 'main'
fields = self.dsc['Section'].split('/')
if len(fields) > 1:
return fields[0]
return "main"
@property
def filename(self) -> str:
"""filename of .dsc file"""
return self._dsc_file.filename