# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""module to handle uploads not yet installed to the archive
This module provides classes to handle uploads not yet installed to the
archive.  Central is the :class:`Changes` class which represents a changes file.
It provides methods to access the included binary and source packages.
"""
import errno
import functools
import os
from collections.abc import Mapping
from typing import TYPE_CHECKING, Optional
import apt_inst
import apt_pkg
import daklib.dakapt
import daklib.packagelist
from daklib.aptversion import AptVersion
from daklib.gpg import SignedFile
from daklib.regexes import (
    re_field_source,
    re_file_binary,
    re_file_buildinfo,
    re_file_dsc,
    re_file_safe,
    re_file_source,
    re_file_source_tag2upload,
)
if TYPE_CHECKING:
    import datetime
    import re
[docs]class UploadException(Exception):
    pass 
[docs]class InvalidChangesException(UploadException):
    pass 
[docs]class InvalidBinaryException(UploadException):
    pass 
[docs]class InvalidSourceException(UploadException):
    pass 
[docs]class InvalidHashException(UploadException):
    def __init__(self, filename: str, hash_name: str, expected, actual):
        self.filename = filename
        self.hash_name = hash_name
        self.expected = expected
        self.actual = actual
    def __str__(self):
        return (
            "Invalid {0} hash for {1}:\n"
            "According to the control file the {0} hash should be {2},\n"
            "but {1} has {3}.\n"
            "\n"
            "If you did not include {1} in your upload, a different version\n"
            "might already be known to the archive software."
        ).format(self.hash_name, self.filename, self.expected, self.actual) 
[docs]class InvalidFilenameException(UploadException):
    def __init__(self, filename: str):
        self.filename: str = filename
    def __str__(self):
        return "Invalid filename '{0}'.".format(self.filename) 
[docs]class FileDoesNotExist(UploadException):
    def __init__(self, filename: str):
        self.filename = filename
    def __str__(self):
        return "Refers to non-existing file '{0}'".format(self.filename) 
[docs]class HashedFile:
    """file with checksums"""
    def __init__(
        self,
        filename: str,
        size: int,
        md5sum: str,
        sha1sum: str,
        sha256sum: str,
        section: Optional[str] = None,
        priority: Optional[str] = None,
        input_filename: Optional[str] = None,
    ):
        self.filename: str = filename
        """name of the file"""
        if input_filename is None:
            input_filename = filename
        self.input_filename: str = input_filename
        """name of the file on disk
        Used for temporary files that should not be installed using their on-disk name.
        """
        self.size: int = size
        """size in bytes"""
        self.md5sum: str = md5sum
        """MD5 hash in hexdigits"""
        self.sha1sum: str = sha1sum
        """SHA1 hash in hexdigits"""
        self.sha256sum: str = sha256sum
        """SHA256 hash in hexdigits"""
        self.section: Optional[str] = section
        """section or :const:`None`"""
        self.priority: Optional[str] = priority
        """priority or :const:`None`"""
[docs]    @classmethod
    def from_file(
        cls,
        directory: str,
        filename: str,
        section: Optional[str] = None,
        priority: Optional[str] = None,
    ) -> "HashedFile":
        """create with values for an existing file
        Create a :class:`HashedFile` object that refers to an already existing file.
        :param directory: directory the file is located in
        :param filename: filename
        :param section: optional section as given in .changes files
        :param priority: optional priority as given in .changes files
        :return: :class:`HashedFile` object for the given file
        """
        path = os.path.join(directory, filename)
        with open(path, "r") as fh:
            size = os.fstat(fh.fileno()).st_size
            hashes = daklib.dakapt.DakHashes(fh)
        return cls(
            filename, size, hashes.md5, hashes.sha1, hashes.sha256, section, priority
        ) 
[docs]    def check(self, directory: str) -> None:
        """Validate hashes
        Check if size and hashes match the expected value.
        :param directory: directory the file is located in
        :raises InvalidHashException: if there is a hash mismatch
        """
        path = os.path.join(directory, self.input_filename)
        try:
            with open(path) as fh:
                self.check_fh(fh)
        except OSError as e:
            if e.errno == errno.ENOENT:
                raise FileDoesNotExist(self.input_filename)
            raise 
[docs]    def check_fh(self, fh) -> None:
        size = os.fstat(fh.fileno()).st_size
        fh.seek(0)
        hashes = daklib.dakapt.DakHashes(fh)
        if size != self.size:
            raise InvalidHashException(self.filename, "size", self.size, size)
        if hashes.md5 != self.md5sum:
            raise InvalidHashException(self.filename, "md5sum", self.md5sum, hashes.md5)
        if hashes.sha1 != self.sha1sum:
            raise InvalidHashException(
                self.filename, "sha1sum", self.sha1sum, hashes.sha1
            )
        if hashes.sha256 != self.sha256sum:
            raise InvalidHashException(
                self.filename, "sha256sum", self.sha256sum, hashes.sha256
            )  
[docs]def parse_file_list(
    control: Mapping[str, str],
    has_priority_and_section: bool,
    safe_file_regexp: "re.Pattern" = re_file_safe,
    fields=("Files", "Checksums-Sha1", "Checksums-Sha256"),
) -> dict[str, HashedFile]:
    """Parse Files and Checksums-* fields
    :param control: control file to take fields from
    :param has_priority_and_section: Files field include section and priority
                                     (as in .changes)
    :return: dict mapping filenames to :class:`HashedFile` objects
    :raises InvalidChangesException: missing fields or other grave errors
    """
    entries = {}
    for line in control.get(fields[0], "").split("\n"):
        if len(line) == 0:
            continue
        if has_priority_and_section:
            (md5sum, size, section, priority, filename) = line.split()
            entry = dict(
                md5sum=md5sum,
                size=int(size),
                section=section,
                priority=priority,
                filename=filename,
            )
        else:
            (md5sum, size, filename) = line.split()
            entry = dict(md5sum=md5sum, size=int(size), filename=filename)
        entries[filename] = entry
    for line in control.get(fields[1], "").split("\n"):
        if len(line) == 0:
            continue
        (sha1sum, size, filename) = line.split()
        entry = entries.get(filename)
        if entry is None:
            raise InvalidChangesException(
                "{0} is listed in {1}, but not in {2}.".format(
                    filename, fields[1], fields[0]
                )
            )
        if entry is not None and entry.get("size", None) != int(size):
            raise InvalidChangesException(
                "Size for {0} in {1} and {2} fields differ.".format(
                    filename, fields[0], fields[1]
                )
            )
        entry["sha1sum"] = sha1sum
    for line in control.get(fields[2], "").split("\n"):
        if len(line) == 0:
            continue
        (sha256sum, size, filename) = line.split()
        entry = entries.get(filename)
        if entry is None:
            raise InvalidChangesException(
                "{0} is listed in {1}, but not in {2}.".format(
                    filename, fields[2], fields[0]
                )
            )
        if entry is not None and entry.get("size", None) != int(size):
            raise InvalidChangesException(
                "Size for {0} in {1} and {2} fields differ.".format(
                    filename, fields[0], fields[2]
                )
            )
        entry["sha256sum"] = sha256sum
    files = {}
    for entry in entries.values():
        filename = entry["filename"]
        if "size" not in entry:
            raise InvalidChangesException("No size for {0}.".format(filename))
        if "md5sum" not in entry:
            raise InvalidChangesException("No md5sum for {0}.".format(filename))
        if "sha1sum" not in entry:
            raise InvalidChangesException("No sha1sum for {0}.".format(filename))
        if "sha256sum" not in entry:
            raise InvalidChangesException("No sha256sum for {0}.".format(filename))
        if safe_file_regexp is not None and not safe_file_regexp.match(filename):
            raise InvalidChangesException(
                f"References file with unsafe filename '{filename}'."
            )
        files[filename] = HashedFile(**entry)
    return files 
[docs]@functools.total_ordering
class Changes:
    """Representation of a .changes file"""
    def __init__(
        self, directory: str, filename: str, keyrings, require_signature: bool = True
    ):
        if not re_file_safe.match(filename):
            raise InvalidChangesException("{0}: unsafe filename".format(filename))
        self.directory: str = directory
        """directory the .changes is located in"""
        self.filename: str = filename
        """name of the .changes file"""
        with open(self.path, "rb") as fd:
            data = fd.read()
        self.signature = SignedFile(data, keyrings, require_signature)
        self.changes: apt_pkg.TagSection = apt_pkg.TagSection(self.signature.contents)
        """dict to access fields of the .changes file"""
        self._binaries: "Optional[list[Binary]]" = None
        self._source: "Optional[Source]" = None
        self._files: Optional[dict[str, HashedFile]] = None
        self._keyrings = keyrings
        self._require_signature: bool = require_signature
    @property
    def path(self) -> str:
        """path to the .changes file"""
        return os.path.join(self.directory, self.filename)
    @property
    def primary_fingerprint(self) -> str:
        """fingerprint of the key used for signing the .changes file"""
        return self.signature.primary_fingerprint
    @property
    def valid_signature(self) -> bool:
        """:const:`True` if the .changes has a valid signature"""
        return self.signature.valid
    @property
    def weak_signature(self) -> bool:
        """:const:`True` if the .changes was signed using a weak algorithm"""
        return self.signature.weak_signature
    @property
    def signature_timestamp(self) -> "datetime.datetime":
        return self.signature.signature_timestamp
    @property
    def contents_sha1(self) -> str:
        return self.signature.contents_sha1
    @property
    def architectures(self) -> list[str]:
        """list of architectures included in the upload"""
        return self.changes.get("Architecture", "").split()
    @property
    def distributions(self) -> list[str]:
        """list of target distributions for the upload"""
        return self.changes["Distribution"].split()
    @property
    def source(self) -> "Optional[Source]":
        """included source or :const:`None`"""
        if self._source is None:
            source_files = []
            for f in self.files.values():
                if re_file_dsc.match(f.filename) or re_file_source.match(f.filename):
                    source_files.append(f)
            if len(source_files) > 0:
                self._source = Source(
                    self.directory,
                    source_files,
                    self._keyrings,
                    self._require_signature,
                )
        return self._source
    @property
    def source_tag2upload_files(self) -> list[HashedFile]:
        """
        extra source files
        """
        return [
            f
            for f in self.files.values()
            if re_file_source_tag2upload.match(f.filename)
        ]
    @property
    def sourceful(self) -> bool:
        """:const:`True` if the upload includes source"""
        return "source" in self.architectures
    @property
    def source_name(self) -> str:
        """source package name"""
        return re_field_source.match(self.changes["Source"]).group("package")
    @property
    def binaries(self) -> "list[Binary]":
        """included binary packages"""
        if self._binaries is None:
            self._binaries = [
                Binary(self.directory, f)
                for f in self.files.values()
                if re_file_binary.match(f.filename)
            ]
        return self._binaries
    @property
    def byhand_files(self) -> list[HashedFile]:
        """included byhand files"""
        byhand = []
        for f in self.files.values():
            if f.section == "byhand" or f.section[:4] == "raw-":
                byhand.append(f)
                continue
            if (
                re_file_dsc.match(f.filename)
                or re_file_source.match(f.filename)
                or re_file_binary.match(f.filename)
            ):
                continue
            if re_file_buildinfo.match(f.filename):
                continue
            raise InvalidChangesException(
                "{0}: {1} looks like a byhand package, but is in section {2}".format(
                    self.filename, f.filename, f.section
                )
            )
        return byhand
    @property
    def buildinfo_files(self) -> list[HashedFile]:
        """included buildinfo files"""
        return [f for f in self.files.values() if re_file_buildinfo.match(f.filename)]
    @property
    def binary_names(self) -> list[str]:
        """names of included binary packages"""
        return self.changes.get("Binary", "").split()
    @property
    def closed_bugs(self) -> list[str]:
        """bugs closed by this upload"""
        return self.changes.get("Closes", "").split()
    @property
    def files(self) -> dict[str, HashedFile]:
        """dict mapping filenames to :class:`HashedFile` objects"""
        if self._files is None:
            self._files = parse_file_list(self.changes, True)
        return self._files
    @property
    def bytes(self) -> int:
        """total size of files included in this upload in bytes"""
        return sum(f.size for f in self.files.values())
[docs]    def _key(self) -> tuple[str, AptVersion, bool, str]:
        """tuple used to compare two changes files
        We sort by source name and version first.  If these are identical,
        we sort changes that include source before those without source (so
        that sourceful uploads get processed first), and finally fall back
        to the filename (this should really never happen).
        """
        return (
            self.changes.get("Source", ""),
            AptVersion(self.changes.get("Version", "")),
            not self.sourceful,
            self.filename,
        ) 
    def __eq__(self, other: object) -> bool:
        if not isinstance(other, Changes):
            return NotImplemented
        return self._key() == other._key()
    def __lt__(self, other: "Changes") -> bool:
        return self._key() < other._key() 
[docs]class Binary:
    """Representation of a binary package"""
    def __init__(self, directory: str, hashed_file: HashedFile):
        self.hashed_file: HashedFile = hashed_file
        """file object for the .deb"""
        path = os.path.join(directory, hashed_file.input_filename)
        data = apt_inst.DebFile(path).control.extractdata("control")
        self.control: apt_pkg.TagSection = apt_pkg.TagSection(data)
        """dict to access fields in DEBIAN/control"""
[docs]    @classmethod
    def from_file(cls, directory, filename) -> "Binary":
        hashed_file = HashedFile.from_file(directory, filename)
        return cls(directory, hashed_file) 
    @property
    def source(self) -> tuple[str, str]:
        """get tuple with source package name and version"""
        source = self.control.get("Source", None)
        if source is None:
            return (self.control["Package"], self.control["Version"])
        match = re_field_source.match(source)
        if not match:
            raise InvalidBinaryException(
                "{0}: Invalid Source field.".format(self.hashed_file.filename)
            )
        version = match.group("version")
        if version is None:
            version = self.control["Version"]
        return (match.group("package"), version)
    @property
    def name(self) -> str:
        return self.control["Package"]
    @property
    def type(self) -> str:
        """package type ('deb' or 'udeb')"""
        match = re_file_binary.match(self.hashed_file.filename)
        if not match:
            raise InvalidBinaryException(
                "{0}: Does not match re_file_binary".format(self.hashed_file.filename)
            )
        return match.group("type")
    @property
    def component(self) -> str:
        """component name"""
        fields = self.control["Section"].split("/")
        if len(fields) > 1:
            return fields[0]
        return "main" 
[docs]class Source:
    """Representation of a source package"""
    def __init__(
        self,
        directory: str,
        hashed_files: list[HashedFile],
        keyrings,
        require_signature=True,
    ):
        self.hashed_files: list[HashedFile] = hashed_files
        """list of source files (including the .dsc itself)"""
        dsc_file = None
        for f in hashed_files:
            if re_file_dsc.match(f.filename):
                if dsc_file is not None:
                    raise InvalidSourceException(
                        "Multiple .dsc found ({0} and {1})".format(
                            self._dsc_file.filename, f.filename
                        )
                    )
                else:
                    dsc_file = f
        if dsc_file is None:
            raise InvalidSourceException("No .dsc included in source files")
        self._dsc_file: HashedFile = dsc_file
        # make sure the hash for the dsc is valid before we use it
        self._dsc_file.check(directory)
        dsc_file_path = os.path.join(directory, self._dsc_file.input_filename)
        with open(dsc_file_path, "rb") as fd:
            data = fd.read()
        self.signature = SignedFile(data, keyrings, require_signature)
        self.dsc: Mapping[str, str] = apt_pkg.TagSection(self.signature.contents)
        """dict to access fields in the .dsc file"""
        self.package_list: daklib.packagelist.PackageList = (
            daklib.packagelist.PackageList(self.dsc)
        )
        """Information about packages built by the source."""
        self._files: Optional[dict[str, HashedFile]] = None
[docs]    @classmethod
    def from_file(
        cls, directory, filename, keyrings, require_signature=True
    ) -> "Source":
        hashed_file = HashedFile.from_file(directory, filename)
        return cls(directory, [hashed_file], keyrings, require_signature) 
    @property
    def files(self) -> dict[str, HashedFile]:
        """dict mapping filenames to :class:`HashedFile` objects for additional source files
        This list does not include the .dsc itself.
        """
        if self._files is None:
            self._files = parse_file_list(self.dsc, False)
        return self._files
    @property
    def primary_fingerprint(self) -> str:
        """fingerprint of the key used to sign the .dsc"""
        return self.signature.primary_fingerprint
    @property
    def valid_signature(self) -> bool:
        """:const:`True` if the .dsc has a valid signature"""
        return self.signature.valid
    @property
    def weak_signature(self) -> bool:
        """:const:`True` if the .dsc was signed using a weak algorithm"""
        return self.signature.weak_signature
    @property
    def component(self) -> str:
        """guessed component name
        Might be wrong. Don't rely on this.
        """
        if "Section" not in self.dsc:
            return "main"
        fields = self.dsc["Section"].split("/")
        if len(fields) > 1:
            return fields[0]
        return "main"
    @property
    def filename(self) -> str:
        """filename of .dsc file"""
        return self._dsc_file.filename