Source code for daklib.command

"""module to handle command files

@contact: Debian FTP Master <>
@copyright: 2012, Ansgar Burchardt <>
@copyright: 2023 Emilio Pozuelo Monfort <>
@license: GPL-2+

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import apt_pkg
import os
import tempfile

from daklib.config import Config
from daklib.dak_exceptions import *
from daklib.dbconn import *
from daklib.gpg import SignedFile
from daklib.regexes import re_field_package
from daklib.textutils import fix_maintainer
from daklib.utils import gpg_get_key_addresses, send_mail, TemplateSubst

[docs]class CommandError(Exception): pass
[docs]class CommandFile: def __init__(self, filename: str, data: bytes, log=None): if log is None: from daklib.daklog import Logger log = Logger() list[str] = [] self.result = [] self.log = log self.filename: str = filename = data self.uploader = None
[docs] def _check_replay(self, signed_file: SignedFile, session): """check for replays .. note:: Will commit changes to the database. :param session: database session """ # Mark commands file as seen to prevent replays. signature_history = SignatureHistory.from_signed_file(signed_file) session.add(signature_history) session.commit()
[docs] def _quote_section(self, section) -> str: lines = [] for l in str(section).splitlines(): lines.append("> {0}".format(l)) return "\n".join(lines)
[docs] def _evaluate_sections(self, sections, session): session.rollback() try: while True: next(sections) section = sections.section self.result.append(self._quote_section(section)) action = section.get('Action', None) if action is None: raise CommandError('Encountered section without Action field') if action == 'dm': self.action_dm(self.fingerprint, section, session) elif action == 'dm-remove': self.action_dm_remove(self.fingerprint, section, session) elif action == 'dm-migrate': self.action_dm_migrate(self.fingerprint, section, session) elif action == 'break-the-archive': self.action_break_the_archive(self.fingerprint, section, session) elif action == 'process-upload': self.action_process_upload(self.fingerprint, section, session) else: raise CommandError('Unknown action: {0}'.format(action)) self.result.append('') except StopIteration: pass finally: session.rollback()
[docs] def _notify_uploader(self): cnf = Config() bcc = 'X-DAK: dak process-command' if 'Dinstall::Bcc' in cnf: bcc = '{0}\nBcc: {1}'.format(bcc, cnf['Dinstall::Bcc']) maint_to = None addresses = gpg_get_key_addresses(self.fingerprint.fingerprint) if len(addresses) > 0: maint_to = addresses[0] if self.uploader: try: maint_to = fix_maintainer(self.uploader)[1] except ParseMaintError: self.log.log('ignoring malformed uploader', self.filename) cc = set() for address in try: cc.add(fix_maintainer(address)[1]) except ParseMaintError: self.log.log('ignoring malformed cc', self.filename) subst = { '__DAK_ADDRESS__': cnf['Dinstall::MyEmailAddress'], '__MAINTAINER_TO__': maint_to, '__CC__': ", ".join(cc), '__BCC__': bcc, '__RESULTS__': "\n".join(self.result), '__FILENAME__': self.filename, } message = TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-command.processed')) send_mail(message)
[docs] def evaluate(self) -> bool: """evaluate commands file :return: :const:`True` if the file was processed sucessfully, :const:`False` otherwise """ result = True session = DBConn().session() keyrings = session.query(Keyring).filter_by(active=True).order_by(Keyring.priority) keyring_files = [k.keyring_name for k in keyrings] signed_file = SignedFile(, keyring_files) if not signed_file.valid: self.log.log(['invalid signature', self.filename]) return False self.fingerprint = session.query(Fingerprint).filter_by(fingerprint=signed_file.primary_fingerprint).one() if self.fingerprint.keyring is None: self.log.log(['singed by key in unknown keyring', self.filename]) return False assert self.log.log(['processing', self.filename, 'signed-by={0}'.format(self.fingerprint.fingerprint)]) with tempfile.TemporaryFile() as fh: fh.write(signed_file.contents) sections = apt_pkg.TagFile(fh) try: next(sections) section = sections.section if 'Uploader' in section: self.uploader = section['Uploader'] if 'Cc' in section:['Cc']) # TODO: Verify first section has valid Archive field if 'Archive' not in section: raise CommandError('No Archive field in first section.') # TODO: send mail when we detected a replay. self._check_replay(signed_file, session) self._evaluate_sections(sections, session) self.result.append('') except Exception as e: self.log.log(['ERROR', e]) self.result.append("There was an error processing this section. No changes were committed.\nDetails:\n{0}".format(e)) result = False self._notify_uploader() session.close() return result
[docs] def _split_packages(self, value: str) -> list[str]: names = value.split() for name in names: if not re_field_package.match(name): raise CommandError('Invalid package name "{0}"'.format(name)) return names
[docs] def action_dm(self, fingerprint, section, session) -> None: cnf = Config() if 'Command::DM::AdminKeyrings' not in cnf \ or 'Command::DM::ACL' not in cnf \ or 'Command::DM::Keyrings' not in cnf: raise CommandError('DM command is not configured for this archive.') allowed_keyrings = cnf.value_list('Command::DM::AdminKeyrings') if fingerprint.keyring.keyring_name not in allowed_keyrings: raise CommandError('Key {0} is not allowed to set DM'.format(fingerprint.fingerprint)) acl_name = cnf.get('Command::DM::ACL', 'dm') acl = session.query(ACL).filter_by(name=acl_name).one() fpr_hash = section['Fingerprint'].replace(' ', '') fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first() if fpr is None: raise CommandError('Unknown fingerprint {0}'.format(fpr_hash)) if fpr.keyring is None or fpr.keyring.keyring_name not in cnf.value_list('Command::DM::Keyrings'): raise CommandError('Key {0} is not in DM keyring.'.format(fpr.fingerprint)) addresses = gpg_get_key_addresses(fpr.fingerprint) if len(addresses) > 0:[0]) self.log.log(['dm', 'fingerprint', fpr.fingerprint]) self.result.append('Fingerprint: {0}'.format(fpr.fingerprint)) if len(addresses) > 0: self.log.log(['dm', 'uid', addresses[0]]) self.result.append('Uid: {0}'.format(addresses[0])) for source in self._split_packages(section.get('Allow', '')): # Check for existance of source package to catch typos if session.query(DBSource).filter_by(source=source).first() is None: raise CommandError('Tried to grant permissions for unknown source package: {0}'.format(source)) if session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr, source=source).first() is None: aps = ACLPerSource() aps.acl = acl aps.fingerprint = fpr aps.source = source aps.created_by = fingerprint aps.reason = section.get('Reason') session.add(aps) self.log.log(['dm', 'allow', fpr.fingerprint, source]) self.result.append('Allowed: {0}'.format(source)) else: self.result.append('Already-Allowed: {0}'.format(source)) session.flush() for source in self._split_packages(section.get('Deny', '')): count = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr, source=source).delete() if count == 0: raise CommandError('Tried to remove upload permissions for package {0}, ' 'but no upload permissions were granted before.'.format(source)) self.log.log(['dm', 'deny', fpr.fingerprint, source]) self.result.append('Denied: {0}'.format(source)) session.commit()
[docs] def _action_dm_admin_common(self, fingerprint, section, session) -> None: cnf = Config() if 'Command::DM-Admin::AdminFingerprints' not in cnf \ or 'Command::DM::ACL' not in cnf: raise CommandError('DM admin command is not configured for this archive.') allowed_fingerprints = cnf.value_list('Command::DM-Admin::AdminFingerprints') if fingerprint.fingerprint not in allowed_fingerprints: raise CommandError('Key {0} is not allowed to admin DM'.format(fingerprint.fingerprint))
[docs] def action_dm_remove(self, fingerprint, section, session) -> None: self._action_dm_admin_common(fingerprint, section, session) cnf = Config() acl_name = cnf.get('Command::DM::ACL', 'dm') acl = session.query(ACL).filter_by(name=acl_name).one() fpr_hash = section['Fingerprint'].replace(' ', '') fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first() if fpr is None: self.result.append('Unknown fingerprint: {0}\nNo action taken.'.format(fpr_hash)) return self.log.log(['dm-remove', fpr.fingerprint]) count = 0 for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr): self.log.log(['dm-remove', fpr.fingerprint, 'source={0}'.format(entry.source)]) count += 1 session.delete(entry) self.result.append('Removed: {0}.\n{1} acl entries removed.'.format(fpr.fingerprint, count)) session.commit()
[docs] def action_dm_migrate(self, fingerprint, section, session) -> None: self._action_dm_admin_common(fingerprint, section, session) cnf = Config() acl_name = cnf.get('Command::DM::ACL', 'dm') acl = session.query(ACL).filter_by(name=acl_name).one() fpr_hash_from = section['From'].replace(' ', '') fpr_from = session.query(Fingerprint).filter_by(fingerprint=fpr_hash_from).first() if fpr_from is None: self.result.append('Unknown fingerprint (From): {0}\nNo action taken.'.format(fpr_hash_from)) return fpr_hash_to = section['To'].replace(' ', '') fpr_to = session.query(Fingerprint).filter_by(fingerprint=fpr_hash_to).first() if fpr_to is None: self.result.append('Unknown fingerprint (To): {0}\nNo action taken.'.format(fpr_hash_to)) return if fpr_to.keyring is None or fpr_to.keyring.keyring_name not in cnf.value_list('Command::DM::Keyrings'): self.result.append('Key (To) {0} is not in DM keyring.\nNo action taken.'.format(fpr_to.fingerprint)) return self.log.log(['dm-migrate', 'from={0}'.format(fpr_hash_from), 'to={0}'.format(fpr_hash_to)]) sources = [] for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr_from): self.log.log(['dm-migrate', 'from={0}'.format(fpr_hash_from), 'to={0}'.format(fpr_hash_to), 'source={0}'.format(entry.source)]) entry.fingerprint = fpr_to sources.append(entry.source) self.result.append('Migrated {0} to {1}.\n{2} acl entries changed: {3}'.format(fpr_hash_from, fpr_hash_to, len(sources), ", ".join(sources))) session.commit()
[docs] def action_break_the_archive(self, fingerprint, section, session) -> None: name = 'Dave' uid = fingerprint.uid if uid is not None and is not None: name =[0] self.result.append("DAK9000: I'm sorry, {0}. I'm afraid I can't do that.".format(name))
[docs] def _sourcename_from_dbchanges(self, changes: DBChange) -> str: source = changes.source # in case the Source contains spaces, e.g. in binNMU .changes source = source.split(' ')[0] return source
[docs] def _process_upload_add_command_file(self, upload: PolicyQueueUpload, command) -> None: source = self._sourcename_from_dbchanges(upload.changes) filename = f"{command}.{source}_{upload.changes.version}" content = "OK" if command == "ACCEPT" else "NOTOK" with open(os.path.join(upload.policy_queue.path, "COMMENTS", filename), "w") as f: f.write(content + "\n")
[docs] def _action_process_upload_common(self, fingerprint, section, session) -> None: cnf = Config() if 'Command::ProcessUpload::ACL' not in cnf: raise CommandError('Process Upload command is not configured for this archive.')
[docs] def action_process_upload(self, fingerprint, section, session) -> None: self._action_process_upload_common(fingerprint, section, session) cnf = Config() acl_name = cnf.get('Command::ProcessUpload::ACL', 'process-upload') acl = session.query(ACL).filter_by(name=acl_name).one() source = section['Source'].replace(' ', '') version = section['Version'].replace(' ', '') command = section['Command'].replace(' ', '') if command not in ('ACCEPT', 'REJECT'): raise CommandError('Invalid ProcessUpload command: {0}'.format(command)) dbsource = session.query(DBSource).filter_by(source=source, version=version) uploads = session.query(PolicyQueueUpload).join(PolicyQueueUpload.changes) \ .filter_by(version=version) \ .all() # we don't filter_by(source=source) because a source in a DBChange can # contain more than the source, e.g. 'source (version)' for binNMUs uploads = [upload for upload in uploads if self._sourcename_from_dbchanges(upload.changes) == source] if not uploads: raise CommandError('Could not find upload for {0} {1}'.format(source, version)) upload = uploads[0] # we consider all uploads except those for NEW, and take into account the # target suite when checking for permissions if upload.policy_queue.queue_name == 'new': raise CommandError('Processing uploads from NEW not allowed ({0} {1})'.format(source, version)) suite = upload.target_suite self.log.log(['process-upload', fingerprint.fingerprint, source, version, upload.policy_queue.queue_name, suite.suite_name]) allowed = False for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fingerprint, source=source): allowed = True if not allowed: for entry in session.query(ACLPerSuite).filter_by(acl=acl, fingerprint=fingerprint, suite=suite): allowed = True self.log.log(['process-upload', fingerprint.fingerprint, source, version, upload.policy_queue.queue_name, suite.suite_name, allowed]) if allowed: self._process_upload_add_command_file(upload, command) self.result.append('ProcessUpload: processed fp {0}: {1}_{2}/{3}'.format(fingerprint.fingerprint, source, version, suite.codename))