Source code for dak.process_policy
#! /usr/bin/env python3
# vim:set et ts=4 sw=4:
""" Handles packages from policy queues
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2001, 2002, 2003, 2004, 2005, 2006 James Troup <james@nocrew.org>
@copyright: 2009 Joerg Jaspert <joerg@debian.org>
@copyright: 2009 Frank Lichtenheld <djpig@debian.org>
@copyright: 2009 Mark Hymers <mhy@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
################################################################################
# <mhy> So how do we handle that at the moment?
# <stew> Probably incorrectly.
################################################################################
import os
import datetime
import functools
import re
import sys
import traceback
import apt_pkg
from sqlalchemy.orm.exc import NoResultFound
import sqlalchemy.sql as sql
from collections.abc import Callable, Iterable
from typing import NoReturn
from daklib.dbconn import *
from daklib import daklog
from daklib import utils
from daklib.externalsignature import check_upload_for_external_signature_request
from daklib.config import Config
from daklib.archive import ArchiveTransaction, source_component_from_package_list
from daklib.urgencylog import UrgencyLog
from daklib.packagelist import PackageList
import daklib.announce
import daklib.upload
import daklib.utils
# Globals
Options = None
Logger = None
################################################################################
ProcessingCallable = Callable[[PolicyQueueUpload, PolicyQueue, str, ArchiveTransaction], None]
[docs]def do_comments(
dir: str,
srcqueue: PolicyQueue,
opref: str,
npref: str,
line: str,
fn: ProcessingCallable,
transaction: ArchiveTransaction
) -> None:
session = transaction.session
actions: list[tuple[PolicyQueueUpload, str]] = []
for comm in [x for x in os.listdir(dir) if x.startswith(opref)]:
with open(os.path.join(dir, comm)) as fd:
lines = fd.readlines()
if len(lines) == 0 or lines[0] != line + "\n":
continue
# If the ACCEPT includes a _<arch> we only accept that .changes.
# Otherwise we accept all .changes that start with the given prefix
changes_prefix = comm[len(opref):]
if changes_prefix.count('_') < 2:
changes_prefix = changes_prefix + '_'
else:
changes_prefix = changes_prefix + '.changes'
# We need to escape "_" as we use it with the LIKE operator (via the
# SQLA startwith) later.
changes_prefix = changes_prefix.replace("_", r"\_")
uploads = session.query(PolicyQueueUpload).filter_by(policy_queue=srcqueue) \
.join(PolicyQueueUpload.changes).filter(DBChange.changesname.startswith(changes_prefix)) \
.order_by(PolicyQueueUpload.source_id)
reason = "".join(lines[1:])
actions.extend((u, reason) for u in uploads)
if opref != npref:
newcomm = npref + comm[len(opref):]
newcomm = utils.find_next_free(os.path.join(dir, newcomm))
transaction.fs.move(os.path.join(dir, comm), newcomm)
actions.sort()
for u, reason in actions:
print(("Processing changes file: {0}".format(u.changes.changesname)))
fn(u, srcqueue, reason, transaction)
################################################################################
[docs]def try_or_reject(function: ProcessingCallable) -> ProcessingCallable:
@functools.wraps(function)
def wrapper(upload: PolicyQueueUpload, srcqueue: PolicyQueue, comments: str, transaction: ArchiveTransaction) -> None:
try:
function(upload, srcqueue, comments, transaction)
except Exception as e:
comments = 'An exception was raised while processing the package:\n{0}\nOriginal comments:\n{1}'.format(traceback.format_exc(), comments)
try:
transaction.rollback()
real_comment_reject(upload, srcqueue, comments, transaction)
except Exception as e:
comments = 'In addition an exception was raised while trying to reject the upload:\n{0}\nOriginal rejection:\n{1}'.format(traceback.format_exc(), comments)
transaction.rollback()
real_comment_reject(upload, srcqueue, comments, transaction, notify=False)
if not Options['No-Action']:
transaction.commit()
else:
transaction.rollback()
return wrapper
################################################################################
[docs]@try_or_reject
def comment_accept(
upload: PolicyQueueUpload,
srcqueue: PolicyQueue,
comments: str,
transaction: ArchiveTransaction
) -> None:
for byhand in upload.byhand:
path = os.path.join(srcqueue.path, byhand.filename)
if os.path.exists(path):
raise Exception('E: cannot ACCEPT upload with unprocessed byhand file {0}'.format(byhand.filename))
cnf = Config()
fs = transaction.fs
session = transaction.session
changesname = upload.changes.changesname
allow_tainted = srcqueue.suite.archive.tainted
# We need overrides to get the target component
overridesuite = upload.target_suite
if overridesuite.overridesuite is not None:
overridesuite = session.query(Suite).filter_by(suite_name=overridesuite.overridesuite).one()
def binary_component_func(db_binary: DBBinary) -> Component:
section = db_binary.proxy['Section']
component_name = 'main'
if section.find('/') != -1:
component_name = section.split('/', 1)[0]
return get_mapped_component(component_name, session=session)
def is_debug_binary(db_binary: DBBinary) -> bool:
return daklib.utils.is_in_debug_section(db_binary.proxy)
def has_debug_binaries(upload: PolicyQueueUpload) -> bool:
return any((is_debug_binary(x) for x in upload.binaries))
def source_component_func(db_source: DBSource) -> Component:
package_list = PackageList(db_source.proxy)
component = source_component_from_package_list(package_list, upload.target_suite)
if component is not None:
return get_mapped_component(component.component_name, session=session)
# Fallback for packages without Package-List field
query = session.query(Override).filter_by(suite=overridesuite, package=db_source.source) \
.join(OverrideType).filter(OverrideType.overridetype == 'dsc') \
.join(Component)
return query.one().component
policy_queue = upload.target_suite.policy_queue
if policy_queue == srcqueue:
policy_queue = None
all_target_suites = [upload.target_suite if policy_queue is None else policy_queue.suite]
if policy_queue is None or policy_queue.send_to_build_queues:
all_target_suites.extend([q.suite for q in upload.target_suite.copy_queues])
throw_away_binaries = False
if upload.source is not None:
source_component = source_component_func(upload.source)
if upload.target_suite.suite_name in cnf.value_list('Dinstall::ThrowAwayNewBinarySuites') and \
source_component.component_name in cnf.value_list('Dinstall::ThrowAwayNewBinaryComponents'):
throw_away_binaries = True
for suite in all_target_suites:
debug_suite = suite.debug_suite
if upload.source is not None:
# If we have Source in this upload, let's include it into
# upload suite.
transaction.copy_source(
upload.source,
suite,
source_component,
allow_tainted=allow_tainted,
)
if not throw_away_binaries:
if debug_suite is not None and has_debug_binaries(upload):
# If we're handing a debug package, we also need to include the
# source in the debug suite as well.
transaction.copy_source(
upload.source,
debug_suite,
source_component_func(upload.source),
allow_tainted=allow_tainted,
)
if not throw_away_binaries:
for db_binary in upload.binaries:
# Now, let's work out where to copy this guy to -- if it's
# a debug binary, and the suite has a debug suite, let's go
# ahead and target the debug suite rather then the stock
# suite.
copy_to_suite = suite
if debug_suite is not None and is_debug_binary(db_binary):
copy_to_suite = debug_suite
# build queues and debug suites may miss the source package
# if this is a binary-only upload.
if copy_to_suite != upload.target_suite:
transaction.copy_source(
db_binary.source,
copy_to_suite,
source_component_func(db_binary.source),
allow_tainted=allow_tainted,
)
transaction.copy_binary(
db_binary,
copy_to_suite,
binary_component_func(db_binary),
allow_tainted=allow_tainted,
extra_archives=[upload.target_suite.archive],
)
check_upload_for_external_signature_request(session, suite, copy_to_suite, db_binary)
suite.update_last_changed()
# Copy .changes if needed
if policy_queue is None and upload.target_suite.copychanges:
src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
dst = os.path.join(upload.target_suite.path, upload.changes.changesname)
fs.copy(src, dst, mode=upload.target_suite.archive.mode)
# List of files in the queue directory
queue_files = [changesname]
chg = daklib.upload.Changes(upload.policy_queue.path, changesname, keyrings=[], require_signature=False)
queue_files.extend(f.filename for f in chg.buildinfo_files)
# TODO: similar code exists in archive.py's `ArchiveUpload._install_policy`
if policy_queue is not None:
# register upload in policy queue
new_upload = PolicyQueueUpload()
new_upload.policy_queue = policy_queue
new_upload.target_suite = upload.target_suite
new_upload.changes = upload.changes
new_upload.source = upload.source
new_upload.binaries = upload.binaries
session.add(new_upload)
session.flush()
# copy .changes & similar to policy queue
for fn in queue_files:
src = os.path.join(upload.policy_queue.path, fn)
dst = os.path.join(policy_queue.path, fn)
transaction.fs.copy(src, dst, mode=policy_queue.change_perms)
# Copy upload to Process-Policy::CopyDir
# Used on security.d.o to sync accepted packages to ftp-master, but this
# should eventually be replaced by something else.
copydir = cnf.get('Process-Policy::CopyDir') or None
if policy_queue is None and copydir is not None:
mode = upload.target_suite.archive.mode
if upload.source is not None:
for f in [df.poolfile for df in upload.source.srcfiles]:
dst = os.path.join(copydir, f.basename)
if not os.path.exists(dst):
fs.copy(f.fullpath, dst, mode=mode)
for db_binary in upload.binaries:
f = db_binary.poolfile
dst = os.path.join(copydir, f.basename)
if not os.path.exists(dst):
fs.copy(f.fullpath, dst, mode=mode)
for fn in queue_files:
src = os.path.join(upload.policy_queue.path, fn)
dst = os.path.join(copydir, fn)
# We check for `src` to exist as old uploads in policy queues
# might still miss the `.buildinfo` files.
if os.path.exists(src) and not os.path.exists(dst):
fs.copy(src, dst, mode=mode)
if policy_queue is None:
utils.process_buildinfos(upload.policy_queue.path, chg.buildinfo_files,
fs, Logger)
if policy_queue is None and upload.source is not None and not Options['No-Action']:
urgency = upload.changes.urgency
# As per policy 5.6.17, the urgency can be followed by a space and a
# comment. Extract only the urgency from the string.
if ' ' in urgency:
urgency, comment = urgency.split(' ', 1)
if urgency not in cnf.value_list('Urgency::Valid'):
urgency = cnf['Urgency::Default']
UrgencyLog().log(upload.source.source, upload.source.version, urgency)
if policy_queue is None:
print(" ACCEPT")
else:
print(" ACCEPT-TO-QUEUE")
if not Options['No-Action']:
Logger.log(["Policy Queue ACCEPT", srcqueue.queue_name, changesname])
if policy_queue is None:
pu = get_processed_upload(upload)
daklib.announce.announce_accept(pu)
# TODO: code duplication. Similar code is in process-upload.
# Move .changes to done
now = datetime.datetime.now()
donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d'))
if policy_queue is None:
for fn in queue_files:
src = os.path.join(upload.policy_queue.path, fn)
if os.path.exists(src):
dst = os.path.join(donedir, fn)
dst = utils.find_next_free(dst)
fs.copy(src, dst, mode=0o644)
if throw_away_binaries and upload.target_suite.archive.use_morgue:
morguesubdir = cnf.get("New::MorgueSubDir", 'new')
utils.move_to_morgue(morguesubdir,
[db_binary.poolfile.fullpath for db_binary in upload.binaries],
fs, Logger)
remove_upload(upload, transaction)
################################################################################
[docs]def real_comment_reject(
upload: PolicyQueueUpload,
srcqueue: PolicyQueue,
comments: str,
transaction: ArchiveTransaction,
notify=True,
manual=False
) -> None:
cnf = Config()
fs = transaction.fs
session = transaction.session
changesname = upload.changes.changesname
queuedir = upload.policy_queue.path
rejectdir = cnf['Dir::Reject']
### Copy files to reject/
poolfiles = [b.poolfile for b in upload.binaries]
if upload.source is not None:
poolfiles.extend([df.poolfile for df in upload.source.srcfiles])
# Not beautiful...
files = [af.path for af in session.query(ArchiveFile)
.filter_by(archive=upload.policy_queue.suite.archive)
.join(ArchiveFile.file)
.filter(PoolFile.file_id.in_([f.file_id for f in poolfiles]))]
for byhand in upload.byhand:
path = os.path.join(queuedir, byhand.filename)
if os.path.exists(path):
files.append(path)
chg = daklib.upload.Changes(queuedir, changesname, keyrings=[], require_signature=False)
for f in chg.buildinfo_files:
path = os.path.join(queuedir, f.filename)
if os.path.exists(path):
files.append(path)
files.append(os.path.join(queuedir, changesname))
for fn in files:
dst = utils.find_next_free(os.path.join(rejectdir, os.path.basename(fn)))
fs.copy(fn, dst, link=True)
### Write reason
dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(changesname)))
fh = fs.create(dst)
fh.write(comments)
fh.close()
### Send mail notification
if notify:
rejected_by = None
reason = comments
# Try to use From: from comment file if there is one.
# This is not very elegant...
match = re.match(r"\AFrom: ([^\n]+)\n\n", comments)
if match:
rejected_by = match.group(1)
reason = '\n'.join(comments.splitlines()[2:])
pu = get_processed_upload(upload)
daklib.announce.announce_reject(pu, reason, rejected_by)
print(" REJECT")
if not Options["No-Action"]:
Logger.log(["Policy Queue REJECT", srcqueue.queue_name, upload.changes.changesname])
changes = upload.changes
remove_upload(upload, transaction)
session.delete(changes)
################################################################################
[docs]def remove_upload(upload: PolicyQueueUpload, transaction: ArchiveTransaction) -> None:
fs = transaction.fs
session = transaction.session
changes = upload.changes
# Remove byhand and changes files. Binary and source packages will be
# removed from {bin,src}_associations and eventually removed by clean-suites automatically.
queuedir = upload.policy_queue.path
for byhand in upload.byhand:
path = os.path.join(queuedir, byhand.filename)
if os.path.exists(path):
fs.unlink(path)
session.delete(byhand)
chg = daklib.upload.Changes(queuedir, upload.changes.changesname, keyrings=[], require_signature=False)
queue_files = [upload.changes.changesname]
queue_files.extend(f.filename for f in chg.buildinfo_files)
for fn in queue_files:
# We check for `path` to exist as old uploads in policy queues
# might still miss the `.buildinfo` files.
path = os.path.join(queuedir, fn)
if os.path.exists(path):
fs.unlink(path)
session.delete(upload)
session.flush()
################################################################################
[docs]def get_processed_upload(upload: PolicyQueueUpload) -> daklib.announce.ProcessedUpload:
pu = daklib.announce.ProcessedUpload()
pu.maintainer = upload.changes.maintainer
pu.changed_by = upload.changes.changedby
pu.fingerprint = upload.changes.fingerprint
pu.suites = [upload.target_suite]
pu.from_policy_suites = [upload.target_suite]
changes_path = os.path.join(upload.policy_queue.path, upload.changes.changesname)
with open(changes_path, 'r') as fd:
pu.changes = fd.read()
pu.changes_filename = upload.changes.changesname
pu.sourceful = upload.source is not None
pu.source = upload.changes.source
pu.version = upload.changes.version
pu.architecture = upload.changes.architecture
pu.bugs = upload.changes.closes
pu.program = "process-policy"
return pu
################################################################################
[docs]def remove_unreferenced_binaries(policy_queue: PolicyQueue, transaction: ArchiveTransaction) -> None:
"""Remove binaries that are no longer referenced by an upload"""
session = transaction.session
suite = policy_queue.suite
query = sql.text("""
SELECT b.*
FROM binaries b
JOIN bin_associations ba ON b.id = ba.bin
WHERE ba.suite = :suite_id
AND NOT EXISTS (SELECT 1 FROM policy_queue_upload_binaries_map pqubm
JOIN policy_queue_upload pqu ON pqubm.policy_queue_upload_id = pqu.id
WHERE pqu.policy_queue_id = :policy_queue_id
AND pqubm.binary_id = b.id)""")
binaries = session.query(DBBinary).from_statement(query) \
.params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
for binary in binaries:
Logger.log(["removed binary from policy queue", policy_queue.queue_name, binary.package, binary.version])
transaction.remove_binary(binary, suite)
[docs]def remove_unreferenced_sources(policy_queue: PolicyQueue, transaction: ArchiveTransaction) -> None:
"""Remove sources that are no longer referenced by an upload or a binary"""
session = transaction.session
suite = policy_queue.suite
query = sql.text("""
SELECT s.*
FROM source s
JOIN src_associations sa ON s.id = sa.source
WHERE sa.suite = :suite_id
AND NOT EXISTS (SELECT 1 FROM policy_queue_upload pqu
WHERE pqu.policy_queue_id = :policy_queue_id
AND pqu.source_id = s.id)
AND NOT EXISTS (SELECT 1 FROM binaries b
JOIN bin_associations ba ON b.id = ba.bin
WHERE b.source = s.id
AND ba.suite = :suite_id)""")
sources = session.query(DBSource).from_statement(query) \
.params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
for source in sources:
Logger.log(["removed source from policy queue", policy_queue.queue_name, source.source, source.version])
transaction.remove_source(source, suite)
################################################################################
[docs]def usage(status=0) -> NoReturn:
print("""Usage: dak process-policy QUEUE""")
sys.exit(status)
################################################################################
[docs]def main():
global Options, Logger
cnf = Config()
session = DBConn().session()
Arguments = [('h', "help", "Process-Policy::Options::Help"),
('n', "no-action", "Process-Policy::Options::No-Action")]
for i in ["help", "no-action"]:
key = "Process-Policy::Options::%s" % i
if key not in cnf:
cnf[key] = ""
queue_name = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv)
Options = cnf.subtree("Process-Policy::Options")
if Options["Help"]:
usage()
if len(queue_name) != 1:
print("E: Specify exactly one policy queue")
sys.exit(1)
queue_name = queue_name[0]
Logger = daklog.Logger("process-policy")
if not Options["No-Action"]:
urgencylog = UrgencyLog()
with ArchiveTransaction() as transaction:
session = transaction.session
try:
pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one()
except NoResultFound:
print("E: Cannot find policy queue %s" % queue_name)
sys.exit(1)
commentsdir = os.path.join(pq.path, 'COMMENTS')
# The comments stuff relies on being in the right directory
os.chdir(pq.path)
do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, transaction)
do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, transaction)
do_comments(commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, transaction)
remove_unreferenced_binaries(pq, transaction)
remove_unreferenced_sources(pq, transaction)
if not Options['No-Action']:
urgencylog.close()
################################################################################
if __name__ == '__main__':
main()