Source code for dak.process_policy

#! /usr/bin/env python3
# vim:set et ts=4 sw=4:

"""Handles packages from policy queues

@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2001, 2002, 2003, 2004, 2005, 2006  James Troup <james@nocrew.org>
@copyright: 2009 Joerg Jaspert <joerg@debian.org>
@copyright: 2009 Frank Lichtenheld <djpig@debian.org>
@copyright: 2009 Mark Hymers <mhy@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

################################################################################

# <mhy> So how do we handle that at the moment?
# <stew> Probably incorrectly.

################################################################################

import datetime
import functools
import os
import re
import sys
import traceback
from collections.abc import Callable
from typing import NoReturn

import apt_pkg
import sqlalchemy.sql as sql
from sqlalchemy.orm.exc import NoResultFound

import daklib.announce
import daklib.upload
import daklib.utils
from daklib import daklog, utils
from daklib.archive import ArchiveTransaction, source_component_from_package_list
from daklib.config import Config
from daklib.dbconn import (
    ArchiveFile,
    Component,
    DBBinary,
    DBChange,
    DBConn,
    DBSource,
    Override,
    OverrideType,
    PolicyQueue,
    PolicyQueueUpload,
    PoolFile,
    Suite,
    get_mapped_component,
)
from daklib.externalsignature import check_upload_for_external_signature_request
from daklib.packagelist import PackageList
from daklib.urgencylog import UrgencyLog

# Globals
Options = None
Logger = None

################################################################################

ProcessingCallable = Callable[
    [PolicyQueueUpload, PolicyQueue, str, ArchiveTransaction], None
]


[docs]def do_comments( dir: str, srcqueue: PolicyQueue, opref: str, npref: str, line: str, fn: ProcessingCallable, transaction: ArchiveTransaction, ) -> None: session = transaction.session actions: list[tuple[PolicyQueueUpload, str]] = [] for comm in [x for x in os.listdir(dir) if x.startswith(opref)]: with open(os.path.join(dir, comm)) as fd: lines = fd.readlines() if len(lines) == 0 or lines[0] != line + "\n": continue # If the ACCEPT includes a _<arch> we only accept that .changes. # Otherwise we accept all .changes that start with the given prefix changes_prefix = comm[len(opref) :] if changes_prefix.count("_") < 2: changes_prefix = changes_prefix + "_" else: changes_prefix = changes_prefix + ".changes" # We need to escape "_" as we use it with the LIKE operator (via the # SQLA startwith) later. changes_prefix = changes_prefix.replace("_", r"\_") uploads = ( session.query(PolicyQueueUpload) .filter_by(policy_queue=srcqueue) .join(PolicyQueueUpload.changes) .filter(DBChange.changesname.startswith(changes_prefix)) .order_by(PolicyQueueUpload.source_id) ) reason = "".join(lines[1:]) actions.extend((u, reason) for u in uploads) if opref != npref: newcomm = npref + comm[len(opref) :] newcomm = utils.find_next_free(os.path.join(dir, newcomm)) transaction.fs.move(os.path.join(dir, comm), newcomm) actions.sort() for u, reason in actions: print(("Processing changes file: {0}".format(u.changes.changesname))) fn(u, srcqueue, reason, transaction)
################################################################################
[docs]def try_or_reject(function: ProcessingCallable) -> ProcessingCallable: @functools.wraps(function) def wrapper( upload: PolicyQueueUpload, srcqueue: PolicyQueue, comments: str, transaction: ArchiveTransaction, ) -> None: try: function(upload, srcqueue, comments, transaction) except Exception: comments = "An exception was raised while processing the package:\n{0}\nOriginal comments:\n{1}".format( traceback.format_exc(), comments ) try: transaction.rollback() real_comment_reject(upload, srcqueue, comments, transaction) except Exception: comments = "In addition an exception was raised while trying to reject the upload:\n{0}\nOriginal rejection:\n{1}".format( traceback.format_exc(), comments ) transaction.rollback() real_comment_reject( upload, srcqueue, comments, transaction, notify=False ) if not Options["No-Action"]: transaction.commit() else: transaction.rollback() return wrapper
################################################################################
[docs]@try_or_reject def comment_accept( upload: PolicyQueueUpload, srcqueue: PolicyQueue, comments: str, transaction: ArchiveTransaction, ) -> None: for byhand in upload.byhand: path = os.path.join(srcqueue.path, byhand.filename) if os.path.exists(path): raise Exception( "E: cannot ACCEPT upload with unprocessed byhand file {0}".format( byhand.filename ) ) cnf = Config() fs = transaction.fs session = transaction.session changesname = upload.changes.changesname allow_tainted = srcqueue.suite.archive.tainted # We need overrides to get the target component overridesuite = upload.target_suite if overridesuite.overridesuite is not None: overridesuite = ( session.query(Suite).filter_by(suite_name=overridesuite.overridesuite).one() ) def binary_component_func(db_binary: DBBinary) -> Component: section = db_binary.proxy["Section"] component_name = "main" if section.find("/") != -1: component_name = section.split("/", 1)[0] return get_mapped_component(component_name, session=session) def is_debug_binary(db_binary: DBBinary) -> bool: return daklib.utils.is_in_debug_section(db_binary.proxy) def has_debug_binaries(upload: PolicyQueueUpload) -> bool: return any((is_debug_binary(x) for x in upload.binaries)) def source_component_func(db_source: DBSource) -> Component: package_list = PackageList(db_source.proxy) component = source_component_from_package_list( package_list, upload.target_suite ) if component is not None: return get_mapped_component(component.component_name, session=session) # Fallback for packages without Package-List field query = ( session.query(Override) .filter_by(suite=overridesuite, package=db_source.source) .join(OverrideType) .filter(OverrideType.overridetype == "dsc") .join(Component) ) return query.one().component policy_queue = upload.target_suite.policy_queue if policy_queue == srcqueue: policy_queue = None all_target_suites = [ upload.target_suite if policy_queue is None else policy_queue.suite ] if policy_queue is None or policy_queue.send_to_build_queues: all_target_suites.extend([q.suite for q in upload.target_suite.copy_queues]) throw_away_binaries = False if upload.source is not None: source_component = source_component_func(upload.source) if upload.target_suite.suite_name in cnf.value_list( "Dinstall::ThrowAwayNewBinarySuites" ) and source_component.component_name in cnf.value_list( "Dinstall::ThrowAwayNewBinaryComponents" ): throw_away_binaries = True for suite in all_target_suites: debug_suite = suite.debug_suite if upload.source is not None: # If we have Source in this upload, let's include it into # upload suite. transaction.copy_source( upload.source, suite, source_component, allow_tainted=allow_tainted, ) if not throw_away_binaries: if debug_suite is not None and has_debug_binaries(upload): # If we're handing a debug package, we also need to include the # source in the debug suite as well. transaction.copy_source( upload.source, debug_suite, source_component_func(upload.source), allow_tainted=allow_tainted, ) if not throw_away_binaries: for db_binary in upload.binaries: # Now, let's work out where to copy this guy to -- if it's # a debug binary, and the suite has a debug suite, let's go # ahead and target the debug suite rather then the stock # suite. copy_to_suite = suite if debug_suite is not None and is_debug_binary(db_binary): copy_to_suite = debug_suite # build queues and debug suites may miss the source package # if this is a binary-only upload. if copy_to_suite != upload.target_suite: transaction.copy_source( db_binary.source, copy_to_suite, source_component_func(db_binary.source), allow_tainted=allow_tainted, ) transaction.copy_binary( db_binary, copy_to_suite, binary_component_func(db_binary), allow_tainted=allow_tainted, extra_archives=[upload.target_suite.archive], ) check_upload_for_external_signature_request( session, suite, copy_to_suite, db_binary ) suite.update_last_changed() # Copy .changes if needed if policy_queue is None and upload.target_suite.copychanges: src = os.path.join(upload.policy_queue.path, upload.changes.changesname) dst = os.path.join(upload.target_suite.path, upload.changes.changesname) fs.copy(src, dst, mode=upload.target_suite.archive.mode) # List of files in the queue directory queue_files = [changesname] chg = daklib.upload.Changes( upload.policy_queue.path, changesname, keyrings=[], require_signature=False ) queue_files.extend(f.filename for f in chg.buildinfo_files) # TODO: similar code exists in archive.py's `ArchiveUpload._install_policy` if policy_queue is not None: # register upload in policy queue new_upload = PolicyQueueUpload() new_upload.policy_queue = policy_queue new_upload.target_suite = upload.target_suite new_upload.changes = upload.changes new_upload.source = upload.source new_upload.binaries = upload.binaries session.add(new_upload) session.flush() # copy .changes & similar to policy queue for fn in queue_files: src = os.path.join(upload.policy_queue.path, fn) dst = os.path.join(policy_queue.path, fn) transaction.fs.copy(src, dst, mode=policy_queue.change_perms) # Copy upload to Process-Policy::CopyDir # Used on security.d.o to sync accepted packages to ftp-master, but this # should eventually be replaced by something else. copydir = cnf.get("Process-Policy::CopyDir") or None if policy_queue is None and copydir is not None: mode = upload.target_suite.archive.mode if upload.source is not None: for f in [df.poolfile for df in upload.source.srcfiles]: dst = os.path.join(copydir, f.basename) if not os.path.exists(dst): fs.copy(f.fullpath, dst, mode=mode) for db_binary in upload.binaries: f = db_binary.poolfile dst = os.path.join(copydir, f.basename) if not os.path.exists(dst): fs.copy(f.fullpath, dst, mode=mode) for fn in queue_files: src = os.path.join(upload.policy_queue.path, fn) dst = os.path.join(copydir, fn) # We check for `src` to exist as old uploads in policy queues # might still miss the `.buildinfo` files. if os.path.exists(src) and not os.path.exists(dst): fs.copy(src, dst, mode=mode) if policy_queue is None: utils.process_buildinfos( upload.policy_queue.path, chg.buildinfo_files, fs, Logger ) if policy_queue is None and upload.source is not None and not Options["No-Action"]: urgency = upload.changes.urgency # As per policy 5.6.17, the urgency can be followed by a space and a # comment. Extract only the urgency from the string. if " " in urgency: urgency, comment = urgency.split(" ", 1) if urgency not in cnf.value_list("Urgency::Valid"): urgency = cnf["Urgency::Default"] UrgencyLog().log(upload.source.source, upload.source.version, urgency) if policy_queue is None: print(" ACCEPT") else: print(" ACCEPT-TO-QUEUE") if not Options["No-Action"]: Logger.log(["Policy Queue ACCEPT", srcqueue.queue_name, changesname]) if policy_queue is None: pu = get_processed_upload(upload) daklib.announce.announce_accept(pu) # TODO: code duplication. Similar code is in process-upload. # Move .changes to done now = datetime.datetime.now() donedir = os.path.join(cnf["Dir::Done"], now.strftime("%Y/%m/%d")) if policy_queue is None: for fn in queue_files: src = os.path.join(upload.policy_queue.path, fn) if os.path.exists(src): dst = os.path.join(donedir, fn) dst = utils.find_next_free(dst) fs.copy(src, dst, mode=0o644) if throw_away_binaries and upload.target_suite.archive.use_morgue: morguesubdir = cnf.get("New::MorgueSubDir", "new") utils.move_to_morgue( morguesubdir, [db_binary.poolfile.fullpath for db_binary in upload.binaries], fs, Logger, ) remove_upload(upload, transaction)
################################################################################
[docs]@try_or_reject def comment_reject(*args) -> None: real_comment_reject(*args, manual=True)
[docs]def real_comment_reject( upload: PolicyQueueUpload, srcqueue: PolicyQueue, comments: str, transaction: ArchiveTransaction, notify=True, manual=False, ) -> None: cnf = Config() fs = transaction.fs session = transaction.session changesname = upload.changes.changesname queuedir = upload.policy_queue.path rejectdir = cnf["Dir::Reject"] ### Copy files to reject/ poolfiles = [b.poolfile for b in upload.binaries] if upload.source is not None: poolfiles.extend([df.poolfile for df in upload.source.srcfiles]) # Not beautiful... files = [ af.path for af in session.query(ArchiveFile) .filter_by(archive=upload.policy_queue.suite.archive) .join(ArchiveFile.file) .filter(PoolFile.file_id.in_([f.file_id for f in poolfiles])) ] for byhand in upload.byhand: path = os.path.join(queuedir, byhand.filename) if os.path.exists(path): files.append(path) chg = daklib.upload.Changes( queuedir, changesname, keyrings=[], require_signature=False ) for f in chg.buildinfo_files: path = os.path.join(queuedir, f.filename) if os.path.exists(path): files.append(path) files.append(os.path.join(queuedir, changesname)) for fn in files: dst = utils.find_next_free(os.path.join(rejectdir, os.path.basename(fn))) fs.copy(fn, dst, link=True) ### Write reason dst = utils.find_next_free( os.path.join(rejectdir, "{0}.reason".format(changesname)) ) fh = fs.create(dst) fh.write(comments) fh.close() ### Send mail notification if notify: rejected_by = None reason = comments # Try to use From: from comment file if there is one. # This is not very elegant... match = re.match(r"\AFrom: ([^\n]+)\n\n", comments) if match: rejected_by = match.group(1) reason = "\n".join(comments.splitlines()[2:]) pu = get_processed_upload(upload) daklib.announce.announce_reject(pu, reason, rejected_by) print(" REJECT") if not Options["No-Action"]: Logger.log( ["Policy Queue REJECT", srcqueue.queue_name, upload.changes.changesname] ) changes = upload.changes remove_upload(upload, transaction) session.delete(changes)
################################################################################
[docs]def remove_upload(upload: PolicyQueueUpload, transaction: ArchiveTransaction) -> None: fs = transaction.fs session = transaction.session # Remove byhand and changes files. Binary and source packages will be # removed from {bin,src}_associations and eventually removed by clean-suites automatically. queuedir = upload.policy_queue.path for byhand in upload.byhand: path = os.path.join(queuedir, byhand.filename) if os.path.exists(path): fs.unlink(path) session.delete(byhand) chg = daklib.upload.Changes( queuedir, upload.changes.changesname, keyrings=[], require_signature=False ) queue_files = [upload.changes.changesname] queue_files.extend(f.filename for f in chg.buildinfo_files) for fn in queue_files: # We check for `path` to exist as old uploads in policy queues # might still miss the `.buildinfo` files. path = os.path.join(queuedir, fn) if os.path.exists(path): fs.unlink(path) session.delete(upload) session.flush()
################################################################################
[docs]def get_processed_upload(upload: PolicyQueueUpload) -> daklib.announce.ProcessedUpload: pu = daklib.announce.ProcessedUpload() pu.maintainer = upload.changes.maintainer pu.changed_by = upload.changes.changedby pu.fingerprint = upload.changes.fingerprint pu.authorized_by_fingerprint = upload.changes.authorized_by_fingerprint pu.suites = [upload.target_suite] pu.from_policy_suites = [upload.target_suite] changes_path = os.path.join(upload.policy_queue.path, upload.changes.changesname) with open(changes_path, "r") as fd: pu.changes = fd.read() pu.changes_filename = upload.changes.changesname pu.sourceful = upload.source is not None pu.source = upload.changes.source pu.version = upload.changes.version pu.architecture = upload.changes.architecture pu.bugs = upload.changes.closes pu.program = "process-policy" return pu
################################################################################
[docs]def remove_unreferenced_binaries( policy_queue: PolicyQueue, transaction: ArchiveTransaction ) -> None: """Remove binaries that are no longer referenced by an upload""" session = transaction.session suite = policy_queue.suite query = sql.text( """ SELECT b.* FROM binaries b JOIN bin_associations ba ON b.id = ba.bin WHERE ba.suite = :suite_id AND NOT EXISTS (SELECT 1 FROM policy_queue_upload_binaries_map pqubm JOIN policy_queue_upload pqu ON pqubm.policy_queue_upload_id = pqu.id WHERE pqu.policy_queue_id = :policy_queue_id AND pqubm.binary_id = b.id)""" ) binaries = ( session.query(DBBinary) .from_statement(query) .params( { "suite_id": policy_queue.suite_id, "policy_queue_id": policy_queue.policy_queue_id, } ) ) for binary in binaries: Logger.log( [ "removed binary from policy queue", policy_queue.queue_name, binary.package, binary.version, ] ) transaction.remove_binary(binary, suite)
[docs]def remove_unreferenced_sources( policy_queue: PolicyQueue, transaction: ArchiveTransaction ) -> None: """Remove sources that are no longer referenced by an upload or a binary""" session = transaction.session suite = policy_queue.suite query = sql.text( """ SELECT s.* FROM source s JOIN src_associations sa ON s.id = sa.source WHERE sa.suite = :suite_id AND NOT EXISTS (SELECT 1 FROM policy_queue_upload pqu WHERE pqu.policy_queue_id = :policy_queue_id AND pqu.source_id = s.id) AND NOT EXISTS (SELECT 1 FROM binaries b JOIN bin_associations ba ON b.id = ba.bin WHERE b.source = s.id AND ba.suite = :suite_id)""" ) sources = ( session.query(DBSource) .from_statement(query) .params( { "suite_id": policy_queue.suite_id, "policy_queue_id": policy_queue.policy_queue_id, } ) ) for source in sources: Logger.log( [ "removed source from policy queue", policy_queue.queue_name, source.source, source.version, ] ) transaction.remove_source(source, suite)
################################################################################
[docs]def usage(status=0) -> NoReturn: print("""Usage: dak process-policy QUEUE""") sys.exit(status)
################################################################################
[docs]def main(): global Options, Logger cnf = Config() session = DBConn().session() Arguments = [ ("h", "help", "Process-Policy::Options::Help"), ("n", "no-action", "Process-Policy::Options::No-Action"), ] for i in ["help", "no-action"]: key = "Process-Policy::Options::%s" % i if key not in cnf: cnf[key] = "" queue_name = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv) Options = cnf.subtree("Process-Policy::Options") if Options["Help"]: usage() if len(queue_name) != 1: print("E: Specify exactly one policy queue") sys.exit(1) queue_name = queue_name[0] Logger = daklog.Logger("process-policy") if not Options["No-Action"]: urgencylog = UrgencyLog() with ArchiveTransaction() as transaction: session = transaction.session try: pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one() except NoResultFound: print("E: Cannot find policy queue %s" % queue_name) sys.exit(1) commentsdir = os.path.join(pq.path, "COMMENTS") # The comments stuff relies on being in the right directory os.chdir(pq.path) do_comments( commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, transaction, ) do_comments( commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, transaction ) do_comments( commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, transaction ) remove_unreferenced_binaries(pq, transaction) remove_unreferenced_sources(pq, transaction) if not Options["No-Action"]: urgencylog.close()
################################################################################ if __name__ == "__main__": main()