Source code for daklib.contents

"""
Helper code for contents generation.

@contact: Debian FTPMaster <ftpmaster@debian.org>
@copyright: 2011 Torsten Werner <twerner@debian.org>
@license: GNU General Public License version 2 or later
"""

################################################################################

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

################################################################################

import os.path
import subprocess
from collections.abc import Collection, Iterable
from shutil import rmtree
from tempfile import mkdtemp
from typing import TYPE_CHECKING, ClassVar, Optional

import sqlalchemy.sql as sql

from daklib.config import Config
from daklib.dbconn import (
    Architecture,
    Archive,
    BinContents,
    Component,
    DBBinary,
    DBConn,
    DBSource,
    OverrideType,
    SrcContents,
    Suite,
    get_architecture,
    get_override_type,
    get_suite,
)
from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter

from .dakmultiprocessing import DakProcessPool

if TYPE_CHECKING:
    from sqlalchemy.engine import Result

    from daklib.daklog import Logger


[docs] class BinaryContentsWriter: """ BinaryContentsWriter writes the Contents-$arch.gz files. """ def __init__( self, suite: Suite, architecture: Architecture, overridetype: OverrideType, component: Component, ) -> None: self.suite = suite self.architecture = architecture self.overridetype = overridetype self.component = component session = suite.session() assert session is not None self.session = session
[docs] def query(self) -> "Result[tuple[str, str]]": """ Returns a query object that is doing most of the work. """ overridesuite = ( get_suite(self.suite.overridesuite, self.session) if self.suite.overridesuite else self.suite ) assert overridesuite is not None params = { "suite": self.suite.suite_id, "overridesuite": overridesuite.suite_id, "component": self.component.component_id, "arch": self.architecture.arch_id, "type_id": self.overridetype.overridetype_id, "type": self.overridetype.overridetype, } if self.suite.separate_contents_architecture_all: sql_arch_part = "architecture = :arch" else: sql_arch_part = "(architecture = :arch_all or architecture = :arch)" arch_all = get_architecture("all", self.session) assert arch_all is not None params["arch_all"] = arch_all.arch_id sql_create_temp = ( """ create temp table newest_binaries ( id integer primary key, package text); create index newest_binaries_by_package on newest_binaries (package); insert into newest_binaries (id, package) select distinct on (package) id, package from binaries where type = :type and %s and id in (select bin from bin_associations where suite = :suite) order by package, version desc;""" % sql_arch_part ) self.session.execute(sql.text(sql_create_temp), params=params) query = sql.text( """ with unique_override as (select o.package, s.section from override o, section s where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and o.component = :component) select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist from newest_binaries b, bin_contents bc, unique_override o where b.id = bc.binary_id and o.package = b.package group by bc.file""" ) return self.session.execute(query, params=params)
[docs] def formatline(self, filename: str, package_list: str) -> str: """ Returns a formatted string for the filename argument. """ return "%-55s %s\n" % (filename, package_list)
[docs] def fetch(self) -> Iterable[str]: """ Yields a new line of the Contents-$arch.gz file in filename order. """ for filename, package_list in self.query().yield_per(100): yield self.formatline(filename, package_list) # end transaction to return connection to pool self.session.rollback()
[docs] def get_list(self) -> list[str]: """ Returns a list of lines for the Contents-$arch.gz file. """ return [item for item in self.fetch()]
[docs] def writer(self) -> BinaryContentsFileWriter: """ Returns a writer object. """ values = { "archive": self.suite.archive.path, "suite": self.suite.suite_name, "component": self.component.component_name, "debtype": self.overridetype.overridetype, "architecture": self.architecture.arch_string, } return BinaryContentsFileWriter(**values)
[docs] def write_file(self) -> None: """ Write the output file. """ writer = self.writer() file = writer.open() for item in self.fetch(): file.write(item) writer.close()
[docs] class SourceContentsWriter: """ SourceContentsWriter writes the Contents-source.gz files. """ def __init__(self, suite: Suite, component: Component): self.suite = suite self.component = component session = suite.session() assert session is not None self.session = session
[docs] def query(self) -> "Result[tuple[str, str]]": """ Returns a query object that is doing most of the work. """ params = { "suite_id": self.suite.suite_id, "component_id": self.component.component_id, } sql_create_temp = """ create temp table newest_sources ( id integer primary key, source text); create index sources_binaries_by_source on newest_sources (source); insert into newest_sources (id, source) select distinct on (source) s.id, s.source from source s join files_archive_map af on s.file = af.file_id where s.id in (select source from src_associations where suite = :suite_id) and af.component_id = :component_id order by source, version desc;""" self.session.execute(sql.text(sql_create_temp), params=params) query = sql.text( """ select sc.file, string_agg(s.source, ',' order by s.source) as pkglist from newest_sources s, src_contents sc where s.id = sc.source_id group by sc.file""" ) return self.session.execute(query, params=params)
[docs] def formatline(self, filename: str, package_list: str) -> str: """ Returns a formatted string for the filename argument. """ return "%s\t%s\n" % (filename, package_list)
[docs] def fetch(self) -> Iterable[str]: """ Yields a new line of the Contents-source.gz file in filename order. """ for filename, package_list in self.query().yield_per(100): yield self.formatline(filename, package_list) # end transaction to return connection to pool self.session.rollback()
[docs] def get_list(self) -> list[str]: """ Returns a list of lines for the Contents-source.gz file. """ return [item for item in self.fetch()]
[docs] def writer(self) -> SourceContentsFileWriter: """ Returns a writer object. """ values = { "archive": self.suite.archive.path, "suite": self.suite.suite_name, "component": self.component.component_name, } return SourceContentsFileWriter(**values)
[docs] def write_file(self) -> None: """ Write the output file. """ writer = self.writer() file = writer.open() for item in self.fetch(): file.write(item) writer.close()
[docs] def binary_helper( suite_id: int, arch_id: int, overridetype_id: int, component_id: int ) -> list[str]: """ This function is called in a new subprocess and multiprocessing wants a top level function. """ session = DBConn().session(work_mem=1000) suite = session.get_one(Suite, suite_id) architecture = session.get_one(Architecture, arch_id) overridetype = session.get_one(OverrideType, overridetype_id) component = session.get_one(Component, component_id) log_message = [ suite.suite_name, architecture.arch_string, overridetype.overridetype, component.component_name, ] contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) contents_writer.write_file() session.close() return log_message
[docs] def source_helper(suite_id: int, component_id: int) -> list[str]: """ This function is called in a new subprocess and multiprocessing wants a top level function. """ session = DBConn().session(work_mem=1000) suite = session.get_one(Suite, suite_id) component = session.get_one(Component, component_id) log_message = [suite.suite_name, "source", component.component_name] contents_writer = SourceContentsWriter(suite, component) contents_writer.write_file() session.close() return log_message
[docs] class ContentsWriter: """ Loop over all suites, architectures, overridetypes, and components to write all contents files. """ logger: ClassVar["Logger"]
[docs] @classmethod def log_result(class_, result) -> None: """ Writes a result message to the logfile. """ class_.logger.log(list(result))
[docs] @classmethod def write_all( class_, logger, archive_names: Collection[str] | None = None, suite_names: Collection[str] | None = None, component_names: Collection[str] | None = None, force=False, ): """ Writes all Contents files for suites in list suite_names which defaults to all 'touchable' suites if not specified explicitely. Untouchable suites will be included if the force argument is set to True. """ pool = DakProcessPool() class_.logger = logger session = DBConn().session() suite_query = session.query(Suite) if archive_names: suite_query = suite_query.join(Suite.archive).filter( Archive.archive_name.in_(archive_names) ) if suite_names: suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) component_query = session.query(Component) if component_names: component_query = component_query.filter( Component.component_name.in_(component_names) ) components = component_query.all() if not force: suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712 deb_type = get_override_type("deb", session) assert deb_type is not None deb_id = deb_type.overridetype_id udeb_type = get_override_type("udeb", session) assert udeb_type is not None udeb_id = udeb_type.overridetype_id # Lock tables so that nobody can change things underneath us session.execute(sql.text("LOCK TABLE bin_contents IN SHARE MODE")) session.execute(sql.text("LOCK TABLE src_contents IN SHARE MODE")) for suite in suite_query: suite_id = suite.suite_id skip_arch_all = True if suite.separate_contents_architecture_all: skip_arch_all = False for component in (c for c in suite.components if c in components): component_id = component.component_id # handle source packages pool.apply_async( source_helper, (suite_id, component_id), callback=class_.log_result ) for architecture in suite.get_architectures( skipsrc=True, skipall=skip_arch_all ): arch_id = architecture.arch_id # handle 'deb' packages pool.apply_async( binary_helper, (suite_id, arch_id, deb_id, component_id), callback=class_.log_result, ) # handle 'udeb' packages pool.apply_async( binary_helper, (suite_id, arch_id, udeb_id, component_id), callback=class_.log_result, ) pool.close() pool.join() session.close()
[docs] class BinaryContentsScanner: """ BinaryContentsScanner provides a threadsafe method scan() to scan the contents of a DBBinary object. """ def __init__(self, binary_id: int): """ The argument binary_id is the id of the DBBinary object that should be scanned. """ self.binary_id: int = binary_id
[docs] def scan(self) -> None: """ This method does the actual scan and fills in the associated BinContents property. It commits any changes to the database. The argument dummy_arg is ignored but needed by our threadpool implementation. """ session = DBConn().session() binary = session.get_one(DBBinary, self.binary_id) fileset = set(binary.scan_contents()) if len(fileset) == 0: fileset.add("EMPTY_PACKAGE") for filename in fileset: binary.contents.append(BinContents(file=filename)) session.commit() session.close()
[docs] @classmethod def scan_all(class_, limit=None): """ The class method scan_all() scans all binaries using multiple threads. The number of binaries to be scanned can be limited with the limit argument. Returns the number of processed and remaining packages as a dict. """ pool = DakProcessPool() session = DBConn().session() query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711 remaining = query.count if limit is not None: query = query.limit(limit) processed = query.count() for binary in query.yield_per(100): pool.apply_async(binary_scan_helper, (binary.binary_id,)) pool.close() pool.join() remaining_int = remaining() session.close() return {"processed": processed, "remaining": remaining_int}
[docs] def binary_scan_helper(binary_id: int) -> None: """ This function runs in a subprocess. """ try: scanner = BinaryContentsScanner(binary_id) scanner.scan() except Exception as e: print("binary_scan_helper raised an exception: %s" % (e))
[docs] class UnpackedSource: """ UnpackedSource extracts a source package into a temporary location and gives you some convinient function for accessing it. """ def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None): """ The dscfilename is a name of a DSC file that will be extracted. """ basedir = tmpbasedir if tmpbasedir else Config()["Dir::TempPath"] temp_directory = mkdtemp(dir=basedir) self.root_directory: Optional[str] = os.path.join(temp_directory, "root") command = ( "dpkg-source", "--no-copy", "--no-check", "-q", "-x", dscfilename, self.root_directory, ) subprocess.check_call(command)
[docs] def get_root_directory(self) -> str: """ Returns the name of the package's root directory which is the directory where the debian subdirectory is located. """ assert self.root_directory is not None return self.root_directory
[docs] def get_all_filenames(self) -> Iterable[str]: """ Returns an iterator over all filenames. The filenames will be relative to the root directory. """ assert self.root_directory is not None skip = len(self.root_directory) + 1 for root, _, files in os.walk(self.root_directory): for name in files: yield os.path.join(root[skip:], name)
[docs] def cleanup(self) -> None: """ Removes all temporary files. """ if self.root_directory is None: return parent_directory = os.path.dirname(self.root_directory) rmtree(parent_directory) self.root_directory = None
def __del__(self): """ Enforce cleanup. """ self.cleanup()
[docs] class SourceContentsScanner: """ SourceContentsScanner provides a method scan() to scan the contents of a DBSource object. """ def __init__(self, source_id: int): """ The argument source_id is the id of the DBSource object that should be scanned. """ self.source_id: int = source_id
[docs] def scan(self) -> None: """ This method does the actual scan and fills in the associated SrcContents property. It commits any changes to the database. """ session = DBConn().session() source = session.get_one(DBSource, self.source_id) fileset = set(source.scan_contents()) for filename in fileset: source.contents.append(SrcContents(file=filename)) session.commit() session.close()
[docs] @classmethod def scan_all(class_, limit=None): """ The class method scan_all() scans all source using multiple processes. The number of sources to be scanned can be limited with the limit argument. Returns the number of processed and remaining packages as a dict. """ pool = DakProcessPool() session = DBConn().session() query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711 remaining = query.count if limit is not None: query = query.limit(limit) processed = query.count() for source in query.yield_per(100): pool.apply_async(source_scan_helper, (source.source_id,)) pool.close() pool.join() remaining_int = remaining() session.close() return {"processed": processed, "remaining": remaining_int}
[docs] def source_scan_helper(source_id: int) -> None: """ This function runs in a subprocess. """ try: scanner = SourceContentsScanner(source_id) scanner.scan() except Exception as e: print("source_scan_helper raised an exception: %s" % (e))