Helper code for contents generation.

@contact: Debian FTPMaster <>
@copyright: 2011 Torsten Werner <>
@license: GNU General Public License version 2 or later


from daklib.dbconn import *
from daklib.config import Config
from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter

from .dakmultiprocessing import DakProcessPool
from shutil import rmtree
from tempfile import mkdtemp
from import Iterable
from typing import Optional

import subprocess
import os.path
import sqlalchemy.sql as sql

[docs]class BinaryContentsWriter: ''' BinaryContentsWriter writes the Contents-$arch.gz files. ''' def __init__(self, suite, architecture, overridetype, component): self.suite = suite self.architecture = architecture self.overridetype = overridetype self.component = component self.session = suite.session()
[docs] def query(self): ''' Returns a query object that is doing most of the work. ''' overridesuite = self.suite if self.suite.overridesuite is not None: overridesuite = get_suite(self.suite.overridesuite, self.session) params = { 'suite': self.suite.suite_id, 'overridesuite': overridesuite.suite_id, 'component': self.component.component_id, 'arch': self.architecture.arch_id, 'type_id': self.overridetype.overridetype_id, 'type': self.overridetype.overridetype, } if self.suite.separate_contents_architecture_all: sql_arch_part = 'architecture = :arch' else: sql_arch_part = '(architecture = :arch_all or architecture = :arch)' params['arch_all'] = get_architecture('all', self.session).arch_id sql_create_temp = ''' create temp table newest_binaries ( id integer primary key, package text); create index newest_binaries_by_package on newest_binaries (package); insert into newest_binaries (id, package) select distinct on (package) id, package from binaries where type = :type and %s and id in (select bin from bin_associations where suite = :suite) order by package, version desc;''' % sql_arch_part self.session.execute(sql_create_temp, params=params) query = sql.text(''' with unique_override as (select o.package, s.section from override o, section s where o.suite = :overridesuite and o.type = :type_id and o.section = and o.component = :component) select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist from newest_binaries b, bin_contents bc, unique_override o where = bc.binary_id and o.package = b.package group by bc.file''') return self.session.query(sql.column("file"), sql.column("pkglist")) \ .from_statement(query).params(params)
[docs] def formatline(self, filename, package_list) -> str: ''' Returns a formatted string for the filename argument. ''' return "%-55s %s\n" % (filename, package_list)
[docs] def fetch(self) -> Iterable[str]: ''' Yields a new line of the Contents-$arch.gz file in filename order. ''' for filename, package_list in self.query().yield_per(100): yield self.formatline(filename, package_list) # end transaction to return connection to pool self.session.rollback()
[docs] def get_list(self) -> list[str]: ''' Returns a list of lines for the Contents-$arch.gz file. ''' return [item for item in self.fetch()]
[docs] def writer(self): ''' Returns a writer object. ''' values = { 'archive': self.suite.archive.path, 'suite': self.suite.suite_name, 'component': self.component.component_name, 'debtype': self.overridetype.overridetype, 'architecture': self.architecture.arch_string, } return BinaryContentsFileWriter(**values)
[docs] def write_file(self) -> None: ''' Write the output file. ''' writer = self.writer() file = for item in self.fetch(): file.write(item) writer.close()
[docs]class SourceContentsWriter: ''' SourceContentsWriter writes the Contents-source.gz files. ''' def __init__(self, suite, component): self.suite = suite self.component = component self.session = suite.session()
[docs] def query(self): ''' Returns a query object that is doing most of the work. ''' params = { 'suite_id': self.suite.suite_id, 'component_id': self.component.component_id, } sql_create_temp = ''' create temp table newest_sources ( id integer primary key, source text); create index sources_binaries_by_source on newest_sources (source); insert into newest_sources (id, source) select distinct on (source), s.source from source s join files_archive_map af on s.file = af.file_id where in (select source from src_associations where suite = :suite_id) and af.component_id = :component_id order by source, version desc;''' self.session.execute(sql_create_temp, params=params) query = sql.text(''' select sc.file, string_agg(s.source, ',' order by s.source) as pkglist from newest_sources s, src_contents sc where = sc.source_id group by sc.file''') return self.session.query(sql.column("file"), sql.column("pkglist")) \ .from_statement(query).params(params)
[docs] def formatline(self, filename, package_list): ''' Returns a formatted string for the filename argument. ''' return "%s\t%s\n" % (filename, package_list)
[docs] def fetch(self): ''' Yields a new line of the Contents-source.gz file in filename order. ''' for filename, package_list in self.query().yield_per(100): yield self.formatline(filename, package_list) # end transaction to return connection to pool self.session.rollback()
[docs] def get_list(self): ''' Returns a list of lines for the Contents-source.gz file. ''' return [item for item in self.fetch()]
[docs] def writer(self): ''' Returns a writer object. ''' values = { 'archive': self.suite.archive.path, 'suite': self.suite.suite_name, 'component': self.component.component_name } return SourceContentsFileWriter(**values)
[docs] def write_file(self): ''' Write the output file. ''' writer = self.writer() file = for item in self.fetch(): file.write(item) writer.close()
[docs]def binary_helper(suite_id: int, arch_id: int, overridetype_id: int, component_id: int): ''' This function is called in a new subprocess and multiprocessing wants a top level function. ''' session = DBConn().session(work_mem=1000) suite = Suite.get(suite_id, session) architecture = Architecture.get(arch_id, session) overridetype = OverrideType.get(overridetype_id, session) component = Component.get(component_id, session) log_message = [suite.suite_name, architecture.arch_string, overridetype.overridetype, component.component_name] contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) contents_writer.write_file() session.close() return log_message
[docs]def source_helper(suite_id: int, component_id: int): ''' This function is called in a new subprocess and multiprocessing wants a top level function. ''' session = DBConn().session(work_mem=1000) suite = Suite.get(suite_id, session) component = Component.get(component_id, session) log_message = [suite.suite_name, 'source', component.component_name] contents_writer = SourceContentsWriter(suite, component) contents_writer.write_file() session.close() return log_message
[docs]class ContentsWriter: ''' Loop over all suites, architectures, overridetypes, and components to write all contents files. '''
[docs] @classmethod def log_result(class_, result) -> None: ''' Writes a result message to the logfile. ''' class_.logger.log(list(result))
[docs] @classmethod def write_all(class_, logger, archive_names=None, suite_names=None, component_names=None, force=False): ''' Writes all Contents files for suites in list suite_names which defaults to all 'touchable' suites if not specified explicitely. Untouchable suites will be included if the force argument is set to True. ''' pool = DakProcessPool() class_.logger = logger session = DBConn().session() suite_query = session.query(Suite) if archive_names: suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names)) if suite_names: suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) component_query = session.query(Component) if component_names: component_query = component_query.filter(Component.component_name.in_(component_names)) components = component_query.all() if not force: suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712 deb_id = get_override_type('deb', session).overridetype_id udeb_id = get_override_type('udeb', session).overridetype_id # Lock tables so that nobody can change things underneath us session.execute("LOCK TABLE bin_contents IN SHARE MODE") session.execute("LOCK TABLE src_contents IN SHARE MODE") for suite in suite_query: suite_id = suite.suite_id skip_arch_all = True if suite.separate_contents_architecture_all: skip_arch_all = False for component in (c for c in suite.components if c in components): component_id = component.component_id # handle source packages pool.apply_async(source_helper, (suite_id, component_id), callback=class_.log_result) for architecture in suite.get_architectures(skipsrc=True, skipall=skip_arch_all): arch_id = architecture.arch_id # handle 'deb' packages pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), callback=class_.log_result) # handle 'udeb' packages pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), callback=class_.log_result) pool.close() pool.join() session.close()
[docs]class BinaryContentsScanner: ''' BinaryContentsScanner provides a threadsafe method scan() to scan the contents of a DBBinary object. ''' def __init__(self, binary_id: int): ''' The argument binary_id is the id of the DBBinary object that should be scanned. ''' self.binary_id: int = binary_id
[docs] def scan(self) -> None: ''' This method does the actual scan and fills in the associated BinContents property. It commits any changes to the database. The argument dummy_arg is ignored but needed by our threadpool implementation. ''' session = DBConn().session() binary = session.query(DBBinary).get(self.binary_id) fileset = set(binary.scan_contents()) if len(fileset) == 0: fileset.add('EMPTY_PACKAGE') for filename in fileset: binary.contents.append(BinContents(file=filename)) session.commit() session.close()
[docs] @classmethod def scan_all(class_, limit=None): ''' The class method scan_all() scans all binaries using multiple threads. The number of binaries to be scanned can be limited with the limit argument. Returns the number of processed and remaining packages as a dict. ''' pool = DakProcessPool() session = DBConn().session() query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711 remaining = query.count if limit is not None: query = query.limit(limit) processed = query.count() for binary in query.yield_per(100): pool.apply_async(binary_scan_helper, (binary.binary_id, )) pool.close() pool.join() remaining = remaining() session.close() return {'processed': processed, 'remaining': remaining}
[docs]def binary_scan_helper(binary_id: int) -> None: ''' This function runs in a subprocess. ''' try: scanner = BinaryContentsScanner(binary_id) scanner.scan() except Exception as e: print("binary_scan_helper raised an exception: %s" % (e))
[docs]class UnpackedSource: ''' UnpackedSource extracts a source package into a temporary location and gives you some convinient function for accessing it. ''' def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None): ''' The dscfilename is a name of a DSC file that will be extracted. ''' basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath'] temp_directory = mkdtemp(dir=basedir) self.root_directory: Optional[str] = os.path.join(temp_directory, 'root') command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x', dscfilename, self.root_directory) subprocess.check_call(command)
[docs] def get_root_directory(self) -> str: ''' Returns the name of the package's root directory which is the directory where the debian subdirectory is located. ''' return self.root_directory
[docs] def get_all_filenames(self) -> Iterable[str]: ''' Returns an iterator over all filenames. The filenames will be relative to the root directory. ''' skip = len(self.root_directory) + 1 for root, _, files in os.walk(self.root_directory): for name in files: yield os.path.join(root[skip:], name)
[docs] def cleanup(self) -> None: ''' Removes all temporary files. ''' if self.root_directory is None: return parent_directory = os.path.dirname(self.root_directory) rmtree(parent_directory) self.root_directory = None
def __del__(self): ''' Enforce cleanup. ''' self.cleanup()
[docs]class SourceContentsScanner: ''' SourceContentsScanner provides a method scan() to scan the contents of a DBSource object. ''' def __init__(self, source_id: int): ''' The argument source_id is the id of the DBSource object that should be scanned. ''' self.source_id: int = source_id
[docs] def scan(self) -> None: ''' This method does the actual scan and fills in the associated SrcContents property. It commits any changes to the database. ''' session = DBConn().session() source = session.query(DBSource).get(self.source_id) fileset = set(source.scan_contents()) for filename in fileset: source.contents.append(SrcContents(file=filename)) session.commit() session.close()
[docs] @classmethod def scan_all(class_, limit=None): ''' The class method scan_all() scans all source using multiple processes. The number of sources to be scanned can be limited with the limit argument. Returns the number of processed and remaining packages as a dict. ''' pool = DakProcessPool() session = DBConn().session() query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711 remaining = query.count if limit is not None: query = query.limit(limit) processed = query.count() for source in query.yield_per(100): pool.apply_async(source_scan_helper, (source.source_id, )) pool.close() pool.join() remaining = remaining() session.close() return {'processed': processed, 'remaining': remaining}
[docs]def source_scan_helper(source_id: int) -> None: ''' This function runs in a subprocess. ''' try: scanner = SourceContentsScanner(source_id) scanner.scan() except Exception as e: print("source_scan_helper raised an exception: %s" % (e))