"""module to handle command files
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2012, Ansgar Burchardt <ansgar@debian.org>
@copyright: 2023 Emilio Pozuelo Monfort <pochu@debian.org>
@license: GPL-2+
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import apt_pkg
import os
import tempfile
from daklib.config import Config
from daklib.dak_exceptions import *
from daklib.dbconn import *
from daklib.gpg import SignedFile
from daklib.regexes import re_field_package
from daklib.textutils import fix_maintainer
from daklib.utils import gpg_get_key_addresses, send_mail, TemplateSubst
[docs]class CommandError(Exception):
pass
[docs]class CommandFile:
def __init__(self, filename: str, data: bytes, log=None):
if log is None:
from daklib.daklog import Logger
log = Logger()
self.cc: list[str] = []
self.result = []
self.log = log
self.filename: str = filename
self.data = data
self.uploader = None
[docs] def _check_replay(self, signed_file: SignedFile, session):
"""check for replays
.. note::
Will commit changes to the database.
:param session: database session
"""
# Mark commands file as seen to prevent replays.
signature_history = SignatureHistory.from_signed_file(signed_file)
session.add(signature_history)
session.commit()
[docs] def _quote_section(self, section) -> str:
lines = []
for l in str(section).splitlines():
lines.append("> {0}".format(l))
return "\n".join(lines)
[docs] def _evaluate_sections(self, sections, session):
session.rollback()
try:
while True:
next(sections)
section = sections.section
self.result.append(self._quote_section(section))
action = section.get('Action', None)
if action is None:
raise CommandError('Encountered section without Action field')
if action == 'dm':
self.action_dm(self.fingerprint, section, session)
elif action == 'dm-remove':
self.action_dm_remove(self.fingerprint, section, session)
elif action == 'dm-migrate':
self.action_dm_migrate(self.fingerprint, section, session)
elif action == 'break-the-archive':
self.action_break_the_archive(self.fingerprint, section, session)
elif action == 'process-upload':
self.action_process_upload(self.fingerprint, section, session)
else:
raise CommandError('Unknown action: {0}'.format(action))
self.result.append('')
except StopIteration:
pass
finally:
session.rollback()
[docs] def _notify_uploader(self):
cnf = Config()
bcc = 'X-DAK: dak process-command'
if 'Dinstall::Bcc' in cnf:
bcc = '{0}\nBcc: {1}'.format(bcc, cnf['Dinstall::Bcc'])
maint_to = None
addresses = gpg_get_key_addresses(self.fingerprint.fingerprint)
if len(addresses) > 0:
maint_to = addresses[0]
if self.uploader:
try:
maint_to = fix_maintainer(self.uploader)[1]
except ParseMaintError:
self.log.log('ignoring malformed uploader', self.filename)
cc = set()
for address in self.cc:
try:
cc.add(fix_maintainer(address)[1])
except ParseMaintError:
self.log.log('ignoring malformed cc', self.filename)
subst = {
'__DAK_ADDRESS__': cnf['Dinstall::MyEmailAddress'],
'__MAINTAINER_TO__': maint_to,
'__CC__': ", ".join(cc),
'__BCC__': bcc,
'__RESULTS__': "\n".join(self.result),
'__FILENAME__': self.filename,
}
message = TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-command.processed'))
send_mail(message)
[docs] def evaluate(self) -> bool:
"""evaluate commands file
:return: :const:`True` if the file was processed sucessfully,
:const:`False` otherwise
"""
result = True
session = DBConn().session()
keyrings = session.query(Keyring).filter_by(active=True).order_by(Keyring.priority)
keyring_files = [k.keyring_name for k in keyrings]
signed_file = SignedFile(self.data, keyring_files)
if not signed_file.valid:
self.log.log(['invalid signature', self.filename])
return False
self.fingerprint = session.query(Fingerprint).filter_by(fingerprint=signed_file.primary_fingerprint).one()
if self.fingerprint.keyring is None:
self.log.log(['signed by key in unknown keyring', self.filename])
return False
assert self.fingerprint.keyring.active
self.log.log(['processing', self.filename, 'signed-by={0}'.format(self.fingerprint.fingerprint)])
with tempfile.TemporaryFile() as fh:
fh.write(signed_file.contents)
fh.seek(0)
sections = apt_pkg.TagFile(fh)
try:
next(sections)
section = sections.section
if 'Uploader' in section:
self.uploader = section['Uploader']
if 'Cc' in section:
self.cc.append(section['Cc'])
# TODO: Verify first section has valid Archive field
if 'Archive' not in section:
raise CommandError('No Archive field in first section.')
# TODO: send mail when we detected a replay.
self._check_replay(signed_file, session)
self._evaluate_sections(sections, session)
self.result.append('')
except Exception as e:
self.log.log(['ERROR', e])
self.result.append("There was an error processing this section. No changes were committed.\nDetails:\n{0}".format(e))
result = False
self._notify_uploader()
session.close()
return result
[docs] def _split_packages(self, value: str) -> list[str]:
names = value.split()
for name in names:
if not re_field_package.match(name):
raise CommandError('Invalid package name "{0}"'.format(name))
return names
[docs] def action_dm(self, fingerprint, section, session) -> None:
cnf = Config()
if 'Command::DM::AdminKeyrings' not in cnf \
or 'Command::DM::ACL' not in cnf \
or 'Command::DM::Keyrings' not in cnf:
raise CommandError('DM command is not configured for this archive.')
allowed_keyrings = cnf.value_list('Command::DM::AdminKeyrings')
if fingerprint.keyring.keyring_name not in allowed_keyrings:
raise CommandError('Key {0} is not allowed to set DM'.format(fingerprint.fingerprint))
acl_name = cnf.get('Command::DM::ACL', 'dm')
acl = session.query(ACL).filter_by(name=acl_name).one()
fpr_hash = section['Fingerprint'].replace(' ', '')
fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first()
if fpr is None:
raise CommandError('Unknown fingerprint {0}'.format(fpr_hash))
if fpr.keyring is None or fpr.keyring.keyring_name not in cnf.value_list('Command::DM::Keyrings'):
raise CommandError('Key {0} is not in DM keyring.'.format(fpr.fingerprint))
addresses = gpg_get_key_addresses(fpr.fingerprint)
if len(addresses) > 0:
self.cc.append(addresses[0])
self.log.log(['dm', 'fingerprint', fpr.fingerprint])
self.result.append('Fingerprint: {0}'.format(fpr.fingerprint))
if len(addresses) > 0:
self.log.log(['dm', 'uid', addresses[0]])
self.result.append('Uid: {0}'.format(addresses[0]))
for source in self._split_packages(section.get('Allow', '')):
# Check for existance of source package to catch typos
if session.query(DBSource).filter_by(source=source).first() is None:
raise CommandError('Tried to grant permissions for unknown source package: {0}'.format(source))
if session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr, source=source).first() is None:
aps = ACLPerSource()
aps.acl = acl
aps.fingerprint = fpr
aps.source = source
aps.created_by = fingerprint
aps.reason = section.get('Reason')
session.add(aps)
self.log.log(['dm', 'allow', fpr.fingerprint, source])
self.result.append('Allowed: {0}'.format(source))
else:
self.result.append('Already-Allowed: {0}'.format(source))
session.flush()
for source in self._split_packages(section.get('Deny', '')):
count = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr, source=source).delete()
if count == 0:
raise CommandError('Tried to remove upload permissions for package {0}, '
'but no upload permissions were granted before.'.format(source))
self.log.log(['dm', 'deny', fpr.fingerprint, source])
self.result.append('Denied: {0}'.format(source))
session.commit()
[docs] def _action_dm_admin_common(self, fingerprint, section, session) -> None:
cnf = Config()
if 'Command::DM-Admin::AdminFingerprints' not in cnf \
or 'Command::DM::ACL' not in cnf:
raise CommandError('DM admin command is not configured for this archive.')
allowed_fingerprints = cnf.value_list('Command::DM-Admin::AdminFingerprints')
if fingerprint.fingerprint not in allowed_fingerprints:
raise CommandError('Key {0} is not allowed to admin DM'.format(fingerprint.fingerprint))
[docs] def action_dm_remove(self, fingerprint, section, session) -> None:
self._action_dm_admin_common(fingerprint, section, session)
cnf = Config()
acl_name = cnf.get('Command::DM::ACL', 'dm')
acl = session.query(ACL).filter_by(name=acl_name).one()
fpr_hash = section['Fingerprint'].replace(' ', '')
fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first()
if fpr is None:
self.result.append('Unknown fingerprint: {0}\nNo action taken.'.format(fpr_hash))
return
self.log.log(['dm-remove', fpr.fingerprint])
count = 0
for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr):
self.log.log(['dm-remove', fpr.fingerprint, 'source={0}'.format(entry.source)])
count += 1
session.delete(entry)
self.result.append('Removed: {0}.\n{1} acl entries removed.'.format(fpr.fingerprint, count))
session.commit()
[docs] def action_dm_migrate(self, fingerprint, section, session) -> None:
self._action_dm_admin_common(fingerprint, section, session)
cnf = Config()
acl_name = cnf.get('Command::DM::ACL', 'dm')
acl = session.query(ACL).filter_by(name=acl_name).one()
fpr_hash_from = section['From'].replace(' ', '')
fpr_from = session.query(Fingerprint).filter_by(fingerprint=fpr_hash_from).first()
if fpr_from is None:
self.result.append('Unknown fingerprint (From): {0}\nNo action taken.'.format(fpr_hash_from))
return
fpr_hash_to = section['To'].replace(' ', '')
fpr_to = session.query(Fingerprint).filter_by(fingerprint=fpr_hash_to).first()
if fpr_to is None:
self.result.append('Unknown fingerprint (To): {0}\nNo action taken.'.format(fpr_hash_to))
return
if fpr_to.keyring is None or fpr_to.keyring.keyring_name not in cnf.value_list('Command::DM::Keyrings'):
self.result.append('Key (To) {0} is not in DM keyring.\nNo action taken.'.format(fpr_to.fingerprint))
return
self.log.log(['dm-migrate', 'from={0}'.format(fpr_hash_from), 'to={0}'.format(fpr_hash_to)])
sources = []
for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr_from):
self.log.log(['dm-migrate', 'from={0}'.format(fpr_hash_from), 'to={0}'.format(fpr_hash_to), 'source={0}'.format(entry.source)])
entry.fingerprint = fpr_to
sources.append(entry.source)
self.result.append('Migrated {0} to {1}.\n{2} acl entries changed: {3}'.format(fpr_hash_from, fpr_hash_to, len(sources), ", ".join(sources)))
session.commit()
[docs] def action_break_the_archive(self, fingerprint, section, session) -> None:
name = 'Dave'
uid = fingerprint.uid
if uid is not None and uid.name is not None:
name = uid.name.split()[0]
self.result.append("DAK9000: I'm sorry, {0}. I'm afraid I can't do that.".format(name))
[docs] def _sourcename_from_dbchanges(self, changes: DBChange) -> str:
source = changes.source
# in case the Source contains spaces, e.g. in binNMU .changes
source = source.split(' ')[0]
return source
[docs] def _process_upload_add_command_file(self, upload: PolicyQueueUpload, command) -> None:
source = self._sourcename_from_dbchanges(upload.changes)
filename = f"{command}.{source}_{upload.changes.version}"
content = "OK" if command == "ACCEPT" else "NOTOK"
with open(os.path.join(upload.policy_queue.path, "COMMENTS", filename), "x") as f:
f.write(content + "\n")
[docs] def _action_process_upload_common(self, fingerprint, section, session) -> None:
cnf = Config()
if 'Command::ProcessUpload::ACL' not in cnf:
raise CommandError('Process Upload command is not configured for this archive.')
[docs] def action_process_upload(self, fingerprint, section, session) -> None:
self._action_process_upload_common(fingerprint, section, session)
cnf = Config()
acl_name = cnf.get('Command::ProcessUpload::ACL', 'process-upload')
acl = session.query(ACL).filter_by(name=acl_name).one()
source = section['Source'].replace(' ', '')
version = section['Version'].replace(' ', '')
command = section['Command'].replace(' ', '')
if command not in ('ACCEPT', 'REJECT'):
raise CommandError('Invalid ProcessUpload command: {0}'.format(command))
dbsource = session.query(DBSource).filter_by(source=source, version=version)
uploads = session.query(PolicyQueueUpload).join(PolicyQueueUpload.changes) \
.filter_by(version=version) \
.all()
# we don't filter_by(source=source) because a source in a DBChange can
# contain more than the source, e.g. 'source (version)' for binNMUs
uploads = [upload for upload in uploads if self._sourcename_from_dbchanges(upload.changes) == source]
if not uploads:
raise CommandError('Could not find upload for {0} {1}'.format(source, version))
upload = uploads[0]
# we consider all uploads except those for NEW, and take into account the
# target suite when checking for permissions
if upload.policy_queue.queue_name == 'new':
raise CommandError('Processing uploads from NEW not allowed ({0} {1})'.format(source, version))
suite = upload.target_suite
self.log.log(['process-upload', fingerprint.fingerprint, source, version, upload.policy_queue.queue_name, suite.suite_name])
allowed = False
for entry in session.query(ACLPerSource).filter_by(acl=acl,
fingerprint=fingerprint,
source=source):
allowed = True
if not allowed:
for entry in session.query(ACLPerSuite).filter_by(acl=acl,
fingerprint=fingerprint,
suite=suite):
allowed = True
self.log.log(['process-upload', fingerprint.fingerprint, source, version, upload.policy_queue.queue_name, suite.suite_name, allowed])
if allowed:
self._process_upload_add_command_file(upload, command)
self.result.append('ProcessUpload: processed fp {0}: {1}_{2}/{3}'.format(fingerprint.fingerprint, source, version, suite.codename))