Source code for daklib.policy

# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""module to process policy queue uploads"""

import errno
import os
import shutil
from typing import Optional

import daklib.utils as utils

from .config import Config
from .dbconn import (
    Component,
    Override,
    OverrideType,
    PolicyQueueUpload,
    Priority,
    Section,
    Suite,
    get_mapped_component,
    get_mapped_component_name,
)
from .fstransactions import FilesystemTransaction
from .packagelist import PackageList
from .regexes import re_file_changes, re_file_safe


[docs]class UploadCopy: """export a policy queue upload This class can be used in a with-statement:: with UploadCopy(...) as copy: ... Doing so will provide a temporary copy of the upload in the directory given by the :attr:`directory` attribute. The copy will be removed on leaving the with-block. """ def __init__(self, upload: PolicyQueueUpload, group: Optional[str] = None): """initializer :param upload: upload to handle """ self.directory: Optional[str] = None self.upload = upload self.group = group
[docs] def export( self, directory: str, mode: Optional[int] = None, symlink: bool = True, ignore_existing: bool = False, ) -> None: """export a copy of the upload :param directory: directory to export to :param mode: permissions to use for the copied files :param symlink: use symlinks instead of copying the files :param ignore_existing: ignore already existing files """ with FilesystemTransaction() as fs: source = self.upload.source queue = self.upload.policy_queue if source is not None: for dsc_file in source.srcfiles: f = dsc_file.poolfile dst = os.path.join(directory, os.path.basename(f.filename)) if not os.path.exists(dst) or not ignore_existing: fs.copy(f.fullpath, dst, mode=mode, symlink=symlink) for binary in self.upload.binaries: f = binary.poolfile dst = os.path.join(directory, os.path.basename(f.filename)) if not os.path.exists(dst) or not ignore_existing: fs.copy(f.fullpath, dst, mode=mode, symlink=symlink) # copy byhand files for byhand in self.upload.byhand: src = os.path.join(queue.path, byhand.filename) dst = os.path.join(directory, byhand.filename) if os.path.exists(src) and ( not os.path.exists(dst) or not ignore_existing ): fs.copy(src, dst, mode=mode, symlink=symlink) # copy .changes src = os.path.join(queue.path, self.upload.changes.changesname) dst = os.path.join(directory, self.upload.changes.changesname) if not os.path.exists(dst) or not ignore_existing: fs.copy(src, dst, mode=mode, symlink=symlink)
def __enter__(self): assert self.directory is None mode = 0o0700 symlink = True if self.group is not None: mode = 0o2750 symlink = False cnf = Config() self.directory = utils.temp_dirname( parent=cnf.get("Dir::TempPath"), mode=mode, group=self.group ) self.export(self.directory, symlink=symlink) return self def __exit__(self, *args): if self.directory is not None: shutil.rmtree(self.directory) self.directory = None return None
[docs]class PolicyQueueUploadHandler: """process uploads to policy queues This class allows to accept or reject uploads and to get a list of missing overrides (for NEW processing). """ def __init__(self, upload: PolicyQueueUpload, session): """initializer :param upload: upload to process :param session: database session """ self.upload = upload self.session = session @property def _overridesuite(self) -> Suite: overridesuite = self.upload.target_suite if overridesuite.overridesuite is not None: overridesuite = ( self.session.query(Suite) .filter_by(suite_name=overridesuite.overridesuite) .one() ) return overridesuite
[docs] def _source_override(self, component_name: str) -> Override: package = self.upload.source.source suite = self._overridesuite component = get_mapped_component(component_name, self.session) query = ( self.session.query(Override) .filter_by(package=package, suite=suite) .join(OverrideType) .filter(OverrideType.overridetype == "dsc") .filter(Override.component == component) ) return query.first()
[docs] def _binary_override(self, name: str, binarytype, component_name: str) -> Override: suite = self._overridesuite component = get_mapped_component(component_name, self.session) query = ( self.session.query(Override) .filter_by(package=name, suite=suite) .join(OverrideType) .filter(OverrideType.overridetype == binarytype) .filter(Override.component == component) ) return query.first()
@property def _changes_prefix(self) -> str: changesname = self.upload.changes.changesname assert changesname.endswith(".changes") assert re_file_changes.match(changesname) return changesname[0:-8]
[docs] def accept(self) -> None: """mark upload as accepted""" assert len(self.missing_overrides()) == 0 fn1 = "ACCEPT.{0}".format(self._changes_prefix) fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1) try: fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644) with os.fdopen(fh, "wt") as f: f.write("OK\n") except OSError as e: if e.errno == errno.EEXIST: pass else: raise
[docs] def reject(self, reason: str) -> None: """mark upload as rejected :param reason: reason for the rejection """ cnf = Config() fn1 = "REJECT.{0}".format(self._changes_prefix) assert re_file_safe.match(fn1) fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1) try: fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644) with os.fdopen(fh, "wt") as f: f.write("NOTOK\n") f.write( "From: {0} <{1}>\n\n".format( utils.whoami(), cnf["Dinstall::MyAdminAddress"] ) ) f.write(reason) except OSError as e: if e.errno == errno.EEXIST: pass else: raise
[docs] def get_action(self) -> Optional[str]: """get current action :return: string giving the current action, one of 'ACCEPT', 'ACCEPTED', 'REJECT' """ changes_prefix = self._changes_prefix for action in ("ACCEPT", "ACCEPTED", "REJECT"): fn1 = "{0}.{1}".format(action, changes_prefix) fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1) if os.path.exists(fn): return action return None
[docs] def missing_overrides(self, hints: Optional[list[dict]] = None) -> list[dict]: """get missing override entries for the upload :param hints: suggested hints for new overrides in the same format as the return value :return: list of dicts with the following keys: - package: package name - priority: default priority (from upload) - section: default section (from upload) - component: default component (from upload) - type: type of required override ('dsc', 'deb' or 'udeb') All values are strings. """ # TODO: use Package-List field missing = [] components = set() source = self.upload.source if hints is None: hints = [] hints_map = dict([((o["type"], o["package"]), o) for o in hints]) def check_override(name, type, priority, section, included): component = "main" if section.find("/") != -1: component = section.split("/", 1)[0] override = self._binary_override(name, type, component) if override is None and not any( o["package"] == name and o["type"] == type for o in missing ): hint = hints_map.get((type, name)) if hint is not None: missing.append(hint) component = hint["component"] else: missing.append( dict( package=name, priority=priority, section=section, component=component, type=type, included=included, ) ) components.add(component) for binary in self.upload.binaries: binary_proxy = binary.proxy priority = binary_proxy.get("Priority", "optional") section = binary_proxy["Section"] check_override( binary.package, binary.binarytype, priority, section, included=True ) if source is not None: source_proxy = source.proxy package_list = PackageList(source_proxy) if not package_list.fallback: packages = package_list.packages_for_suite(self.upload.target_suite) for p in packages: check_override( p.name, p.type, p.priority, p.section, included=False ) # see daklib.archive.source_component_from_package_list # which we cannot use here as we might not have a Package-List # field for old packages mapped_components = [get_mapped_component_name(c) for c in components] query = ( self.session.query(Component) .order_by(Component.ordering) .filter(Component.component_name.in_(mapped_components)) ) source_component = query.first().component_name override = self._source_override(source_component) if override is None: hint = hints_map.get(("dsc", source.source)) if hint is not None: missing.append(hint) else: section = "misc" if source_component != "main": section = "{0}/{1}".format(source_component, section) missing.append( dict( package=source.source, priority="optional", section=section, component=source_component, type="dsc", included=True, ) ) return missing
[docs] def add_overrides(self, new_overrides, suite: Suite) -> None: if suite.overridesuite is not None: suite = ( self.session.query(Suite) .filter_by(suite_name=suite.overridesuite) .one() ) for override in new_overrides: package = override["package"] priority = ( self.session.query(Priority) .filter_by(priority=override["priority"]) .first() ) section = ( self.session.query(Section) .filter_by(section=override["section"]) .first() ) component = get_mapped_component(override["component"], self.session) overridetype = ( self.session.query(OverrideType) .filter_by(overridetype=override["type"]) .one() ) if priority is None: raise Exception( "Invalid priority {0} for package {1}".format(priority, package) ) if section is None: raise Exception( "Invalid section {0} for package {1}".format(section, package) ) if component is None: raise Exception( "Invalid component {0} for package {1}".format(component, package) ) o = Override( package=package, suite=suite, component=component, priority=priority, section=section, overridetype=overridetype, ) self.session.add(o) self.session.commit()