"""module to handle command files
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2012, Ansgar Burchardt <ansgar@debian.org>
@copyright: 2023 Emilio Pozuelo Monfort <pochu@debian.org>
@license: GPL-2+
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import tempfile
import apt_pkg
from daklib.config import Config
from daklib.dak_exceptions import ParseMaintError
from daklib.dbconn import (
    ACL,
    ACLPerSource,
    ACLPerSuite,
    DBChange,
    DBConn,
    DBSource,
    Fingerprint,
    Keyring,
    PolicyQueueUpload,
    SignatureHistory,
)
from daklib.gpg import SignedFile
from daklib.regexes import re_field_package
from daklib.textutils import fix_maintainer
from daklib.utils import TemplateSubst, gpg_get_key_addresses, send_mail
[docs]class CommandError(Exception):
    pass 
[docs]class CommandFile:
    def __init__(self, filename: str, data: bytes, log=None):
        if log is None:
            from daklib.daklog import Logger
            log = Logger()
        self.cc: list[str] = []
        self.result = []
        self.log = log
        self.filename: str = filename
        self.data = data
        self.uploader = None
[docs]    def _check_replay(self, signed_file: SignedFile, session):
        """check for replays
        .. note::
           Will commit changes to the database.
        :param session: database session
        """
        # Mark commands file as seen to prevent replays.
        signature_history = SignatureHistory.from_signed_file(signed_file)
        session.add(signature_history)
        session.commit() 
[docs]    def _quote_section(self, section) -> str:
        lines = [f"> {line}" for line in str(section).splitlines()]
        return "\n".join(lines) 
[docs]    def _evaluate_sections(self, sections, session):
        session.rollback()
        try:
            while True:
                next(sections)
                section = sections.section
                self.result.append(self._quote_section(section))
                action = section.get("Action", None)
                if action is None:
                    raise CommandError("Encountered section without Action field")
                if action == "dm":
                    self.action_dm(self.fingerprint, section, session)
                elif action == "dm-remove":
                    self.action_dm_remove(self.fingerprint, section, session)
                elif action == "dm-migrate":
                    self.action_dm_migrate(self.fingerprint, section, session)
                elif action == "break-the-archive":
                    self.action_break_the_archive(self.fingerprint, section, session)
                elif action == "process-upload":
                    self.action_process_upload(self.fingerprint, section, session)
                else:
                    raise CommandError("Unknown action: {0}".format(action))
                self.result.append("")
        except StopIteration:
            pass
        finally:
            session.rollback() 
[docs]    def _notify_uploader(self):
        cnf = Config()
        bcc = "X-DAK: dak process-command"
        if "Dinstall::Bcc" in cnf:
            bcc = "{0}\nBcc: {1}".format(bcc, cnf["Dinstall::Bcc"])
        maint_to = None
        addresses = gpg_get_key_addresses(self.fingerprint.fingerprint)
        if len(addresses) > 0:
            maint_to = addresses[0]
        if self.uploader:
            try:
                maint_to = fix_maintainer(self.uploader)[1]
            except ParseMaintError:
                self.log.log("ignoring malformed uploader", self.filename)
        cc = set()
        for address in self.cc:
            try:
                cc.add(fix_maintainer(address)[1])
            except ParseMaintError:
                self.log.log("ignoring malformed cc", self.filename)
        subst = {
            "__DAK_ADDRESS__": cnf["Dinstall::MyEmailAddress"],
            "__MAINTAINER_TO__": maint_to,
            "__CC__": ", ".join(cc),
            "__BCC__": bcc,
            "__RESULTS__": "\n".join(self.result),
            "__FILENAME__": self.filename,
        }
        message = TemplateSubst(
            subst, os.path.join(cnf["Dir::Templates"], "process-command.processed")
        )
        send_mail(message) 
[docs]    def evaluate(self) -> bool:
        """evaluate commands file
        :return: :const:`True` if the file was processed sucessfully,
                 :const:`False` otherwise
        """
        result = True
        session = DBConn().session()
        keyrings = (
            session.query(Keyring).filter_by(active=True).order_by(Keyring.priority)
        )
        keyring_files = [k.keyring_name for k in keyrings]
        signed_file = SignedFile(self.data, keyring_files)
        if not signed_file.valid:
            self.log.log(["invalid signature", self.filename])
            return False
        self.fingerprint = (
            session.query(Fingerprint)
            .filter_by(fingerprint=signed_file.primary_fingerprint)
            .one()
        )
        if self.fingerprint.keyring is None:
            self.log.log(["signed by key in unknown keyring", self.filename])
            return False
        assert self.fingerprint.keyring.active
        self.log.log(
            [
                "processing",
                self.filename,
                "signed-by={0}".format(self.fingerprint.fingerprint),
            ]
        )
        with tempfile.TemporaryFile() as fh:
            fh.write(signed_file.contents)
            fh.seek(0)
            sections = apt_pkg.TagFile(fh)
        try:
            next(sections)
            section = sections.section
            if "Uploader" in section:
                self.uploader = section["Uploader"]
            if "Cc" in section:
                self.cc.append(section["Cc"])
            # TODO: Verify first section has valid Archive field
            if "Archive" not in section:
                raise CommandError("No Archive field in first section.")
            # TODO: send mail when we detected a replay.
            self._check_replay(signed_file, session)
            self._evaluate_sections(sections, session)
            self.result.append("")
        except Exception as e:
            self.log.log(["ERROR", e])
            self.result.append(
                "There was an error processing this section. No changes were committed.\nDetails:\n{0}".format(
                    e
                )
            )
            result = False
        self._notify_uploader()
        session.close()
        return result 
[docs]    def _split_packages(self, value: str) -> list[str]:
        names = value.split()
        for name in names:
            if not re_field_package.match(name):
                raise CommandError('Invalid package name "{0}"'.format(name))
        return names 
[docs]    def action_dm(self, fingerprint, section, session) -> None:
        cnf = Config()
        if (
            "Command::DM::AdminKeyrings" not in cnf
            or "Command::DM::ACL" not in cnf
            or "Command::DM::Keyrings" not in cnf
        ):
            raise CommandError("DM command is not configured for this archive.")
        allowed_keyrings = cnf.value_list("Command::DM::AdminKeyrings")
        if fingerprint.keyring.keyring_name not in allowed_keyrings:
            raise CommandError(
                "Key {0} is not allowed to set DM".format(fingerprint.fingerprint)
            )
        acl_name = cnf.get("Command::DM::ACL", "dm")
        acl = session.query(ACL).filter_by(name=acl_name).one()
        fpr_hash = section["Fingerprint"].replace(" ", "")
        fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first()
        if fpr is None:
            raise CommandError("Unknown fingerprint {0}".format(fpr_hash))
        if fpr.keyring is None or fpr.keyring.keyring_name not in cnf.value_list(
            "Command::DM::Keyrings"
        ):
            raise CommandError("Key {0} is not in DM keyring.".format(fpr.fingerprint))
        addresses = gpg_get_key_addresses(fpr.fingerprint)
        if len(addresses) > 0:
            self.cc.append(addresses[0])
        self.log.log(["dm", "fingerprint", fpr.fingerprint])
        self.result.append("Fingerprint: {0}".format(fpr.fingerprint))
        if len(addresses) > 0:
            self.log.log(["dm", "uid", addresses[0]])
            self.result.append("Uid: {0}".format(addresses[0]))
        for source in self._split_packages(section.get("Allow", "")):
            # Check for existance of source package to catch typos
            if session.query(DBSource).filter_by(source=source).first() is None:
                raise CommandError(
                    "Tried to grant permissions for unknown source package: {0}".format(
                        source
                    )
                )
            if (
                session.query(ACLPerSource)
                .filter_by(acl=acl, fingerprint=fpr, source=source)
                .first()
                is None
            ):
                aps = ACLPerSource()
                aps.acl = acl
                aps.fingerprint = fpr
                aps.source = source
                aps.created_by = fingerprint
                aps.reason = section.get("Reason")
                session.add(aps)
                self.log.log(["dm", "allow", fpr.fingerprint, source])
                self.result.append("Allowed: {0}".format(source))
            else:
                self.result.append("Already-Allowed: {0}".format(source))
        session.flush()
        for source in self._split_packages(section.get("Deny", "")):
            count = (
                session.query(ACLPerSource)
                .filter_by(acl=acl, fingerprint=fpr, source=source)
                .delete()
            )
            if count == 0:
                raise CommandError(
                    "Tried to remove upload permissions for package {0}, "
                    "but no upload permissions were granted before.".format(source)
                )
            self.log.log(["dm", "deny", fpr.fingerprint, source])
            self.result.append("Denied: {0}".format(source))
        session.commit() 
[docs]    def _action_dm_admin_common(self, fingerprint, section, session) -> None:
        cnf = Config()
        if (
            "Command::DM-Admin::AdminFingerprints" not in cnf
            or "Command::DM::ACL" not in cnf
        ):
            raise CommandError("DM admin command is not configured for this archive.")
        allowed_fingerprints = cnf.value_list("Command::DM-Admin::AdminFingerprints")
        if fingerprint.fingerprint not in allowed_fingerprints:
            raise CommandError(
                "Key {0} is not allowed to admin DM".format(fingerprint.fingerprint)
            ) 
[docs]    def action_dm_remove(self, fingerprint, section, session) -> None:
        self._action_dm_admin_common(fingerprint, section, session)
        cnf = Config()
        acl_name = cnf.get("Command::DM::ACL", "dm")
        acl = session.query(ACL).filter_by(name=acl_name).one()
        fpr_hash = section["Fingerprint"].replace(" ", "")
        fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first()
        if fpr is None:
            self.result.append(
                "Unknown fingerprint: {0}\nNo action taken.".format(fpr_hash)
            )
            return
        self.log.log(["dm-remove", fpr.fingerprint])
        count = 0
        for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr):
            self.log.log(
                ["dm-remove", fpr.fingerprint, "source={0}".format(entry.source)]
            )
            count += 1
            session.delete(entry)
        self.result.append(
            "Removed: {0}.\n{1} acl entries removed.".format(fpr.fingerprint, count)
        )
        session.commit() 
[docs]    def action_dm_migrate(self, fingerprint, section, session) -> None:
        self._action_dm_admin_common(fingerprint, section, session)
        cnf = Config()
        acl_name = cnf.get("Command::DM::ACL", "dm")
        acl = session.query(ACL).filter_by(name=acl_name).one()
        fpr_hash_from = section["From"].replace(" ", "")
        fpr_from = (
            session.query(Fingerprint).filter_by(fingerprint=fpr_hash_from).first()
        )
        if fpr_from is None:
            self.result.append(
                "Unknown fingerprint (From): {0}\nNo action taken.".format(
                    fpr_hash_from
                )
            )
            return
        fpr_hash_to = section["To"].replace(" ", "")
        fpr_to = session.query(Fingerprint).filter_by(fingerprint=fpr_hash_to).first()
        if fpr_to is None:
            self.result.append(
                "Unknown fingerprint (To): {0}\nNo action taken.".format(fpr_hash_to)
            )
            return
        if fpr_to.keyring is None or fpr_to.keyring.keyring_name not in cnf.value_list(
            "Command::DM::Keyrings"
        ):
            self.result.append(
                "Key (To) {0} is not in DM keyring.\nNo action taken.".format(
                    fpr_to.fingerprint
                )
            )
            return
        self.log.log(
            [
                "dm-migrate",
                "from={0}".format(fpr_hash_from),
                "to={0}".format(fpr_hash_to),
            ]
        )
        sources = []
        for entry in session.query(ACLPerSource).filter_by(
            acl=acl, fingerprint=fpr_from
        ):
            self.log.log(
                [
                    "dm-migrate",
                    "from={0}".format(fpr_hash_from),
                    "to={0}".format(fpr_hash_to),
                    "source={0}".format(entry.source),
                ]
            )
            entry.fingerprint = fpr_to
            sources.append(entry.source)
        self.result.append(
            "Migrated {0} to {1}.\n{2} acl entries changed: {3}".format(
                fpr_hash_from, fpr_hash_to, len(sources), ", ".join(sources)
            )
        )
        session.commit() 
[docs]    def action_break_the_archive(self, fingerprint, section, session) -> None:
        name = "Dave"
        uid = fingerprint.uid
        if uid is not None and uid.name is not None:
            name = uid.name.split()[0]
        self.result.append(
            "DAK9000: I'm sorry, {0}. I'm afraid I can't do that.".format(name)
        ) 
[docs]    def _sourcename_from_dbchanges(self, changes: DBChange) -> str:
        source = changes.source
        # in case the Source contains spaces, e.g. in binNMU .changes
        source = source.split(" ")[0]
        return source 
[docs]    def _process_upload_add_command_file(
        self, upload: PolicyQueueUpload, command
    ) -> None:
        source = self._sourcename_from_dbchanges(upload.changes)
        filename = f"{command}.{source}_{upload.changes.version}"
        content = "OK" if command == "ACCEPT" else "NOTOK"
        with open(
            os.path.join(upload.policy_queue.path, "COMMENTS", filename), "x"
        ) as f:
            f.write(content + "\n") 
[docs]    def _action_process_upload_common(self, fingerprint, section, session) -> None:
        cnf = Config()
        if "Command::ProcessUpload::ACL" not in cnf:
            raise CommandError(
                "Process Upload command is not configured for this archive."
            ) 
[docs]    def action_process_upload(self, fingerprint, section, session) -> None:
        self._action_process_upload_common(fingerprint, section, session)
        cnf = Config()
        acl_name = cnf.get("Command::ProcessUpload::ACL", "process-upload")
        acl = session.query(ACL).filter_by(name=acl_name).one()
        source = section["Source"].replace(" ", "")
        version = section["Version"].replace(" ", "")
        command = section["Command"].replace(" ", "")
        if command not in ("ACCEPT", "REJECT"):
            raise CommandError("Invalid ProcessUpload command: {0}".format(command))
        uploads = (
            session.query(PolicyQueueUpload)
            .join(PolicyQueueUpload.changes)
            .filter_by(version=version)
            .all()
        )
        # we don't filter_by(source=source) because a source in a DBChange can
        # contain more than the source, e.g. 'source (version)' for binNMUs
        uploads = [
            upload
            for upload in uploads
            if self._sourcename_from_dbchanges(upload.changes) == source
        ]
        if not uploads:
            raise CommandError(
                "Could not find upload for {0} {1}".format(source, version)
            )
        upload = uploads[0]
        # we consider all uploads except those for NEW, and take into account the
        # target suite when checking for permissions
        if upload.policy_queue.queue_name == "new":
            raise CommandError(
                "Processing uploads from NEW not allowed ({0} {1})".format(
                    source, version
                )
            )
        suite = upload.target_suite
        self.log.log(
            [
                "process-upload",
                fingerprint.fingerprint,
                source,
                version,
                upload.policy_queue.queue_name,
                suite.suite_name,
            ]
        )
        allowed = False
        for entry in session.query(ACLPerSource).filter_by(
            acl=acl, fingerprint=fingerprint, source=source
        ):
            allowed = True
        if not allowed:
            for entry in session.query(ACLPerSuite).filter_by(
                acl=acl, fingerprint=fingerprint, suite=suite
            ):
                allowed = True
        self.log.log(
            [
                "process-upload",
                fingerprint.fingerprint,
                source,
                version,
                upload.policy_queue.queue_name,
                suite.suite_name,
                allowed,
            ]
        )
        if allowed:
            self._process_upload_add_command_file(upload, command)
        self.result.append(
            "ProcessUpload: processed fp {0}: {1}_{2}/{3}".format(
                fingerprint.fingerprint, source, version, suite.codename
            )
        )