"""
Helper code for contents generation.
@contact: Debian FTPMaster <ftpmaster@debian.org>
@copyright: 2011 Torsten Werner <twerner@debian.org>
@license: GNU General Public License version 2 or later
"""
################################################################################
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
################################################################################
import os.path
import subprocess
from collections.abc import Collection, Iterable
from shutil import rmtree
from tempfile import mkdtemp
from typing import TYPE_CHECKING, ClassVar, Optional
import sqlalchemy.sql as sql
from daklib.config import Config
from daklib.dbconn import (
Architecture,
Archive,
BinContents,
Component,
DBBinary,
DBConn,
DBSource,
OverrideType,
SrcContents,
Suite,
get_architecture,
get_override_type,
get_suite,
)
from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
from .dakmultiprocessing import DakProcessPool
if TYPE_CHECKING:
from sqlalchemy.engine import Result
from daklib.daklog import Logger
[docs]
class BinaryContentsWriter:
"""
BinaryContentsWriter writes the Contents-$arch.gz files.
"""
def __init__(
self,
suite: Suite,
architecture: Architecture,
overridetype: OverrideType,
component: Component,
) -> None:
self.suite = suite
self.architecture = architecture
self.overridetype = overridetype
self.component = component
session = suite.session()
assert session is not None
self.session = session
[docs]
def query(self) -> "Result[tuple[str, str]]":
"""
Returns a query object that is doing most of the work.
"""
overridesuite = (
get_suite(self.suite.overridesuite, self.session)
if self.suite.overridesuite
else self.suite
)
assert overridesuite is not None
params = {
"suite": self.suite.suite_id,
"overridesuite": overridesuite.suite_id,
"component": self.component.component_id,
"arch": self.architecture.arch_id,
"type_id": self.overridetype.overridetype_id,
"type": self.overridetype.overridetype,
}
if self.suite.separate_contents_architecture_all:
sql_arch_part = "architecture = :arch"
else:
sql_arch_part = "(architecture = :arch_all or architecture = :arch)"
arch_all = get_architecture("all", self.session)
assert arch_all is not None
params["arch_all"] = arch_all.arch_id
sql_create_temp = (
"""
create temp table newest_binaries (
id integer primary key,
package text);
create index newest_binaries_by_package on newest_binaries (package);
insert into newest_binaries (id, package)
select distinct on (package) id, package from binaries
where type = :type and
%s and
id in (select bin from bin_associations where suite = :suite)
order by package, version desc;"""
% sql_arch_part
)
self.session.execute(sql.text(sql_create_temp), params=params)
query = sql.text(
"""
with
unique_override as
(select o.package, s.section
from override o, section s
where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
o.component = :component)
select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
from newest_binaries b, bin_contents bc, unique_override o
where b.id = bc.binary_id and o.package = b.package
group by bc.file"""
)
return self.session.execute(query, params=params)
[docs]
def formatline(self, filename: str, package_list: str) -> str:
"""
Returns a formatted string for the filename argument.
"""
return "%-55s %s\n" % (filename, package_list)
[docs]
def fetch(self) -> Iterable[str]:
"""
Yields a new line of the Contents-$arch.gz file in filename order.
"""
for filename, package_list in self.query().yield_per(100):
yield self.formatline(filename, package_list)
# end transaction to return connection to pool
self.session.rollback()
[docs]
def get_list(self) -> list[str]:
"""
Returns a list of lines for the Contents-$arch.gz file.
"""
return [item for item in self.fetch()]
[docs]
def writer(self) -> BinaryContentsFileWriter:
"""
Returns a writer object.
"""
values = {
"archive": self.suite.archive.path,
"suite": self.suite.suite_name,
"component": self.component.component_name,
"debtype": self.overridetype.overridetype,
"architecture": self.architecture.arch_string,
}
return BinaryContentsFileWriter(**values)
[docs]
def write_file(self) -> None:
"""
Write the output file.
"""
writer = self.writer()
file = writer.open()
for item in self.fetch():
file.write(item)
writer.close()
[docs]
class SourceContentsWriter:
"""
SourceContentsWriter writes the Contents-source.gz files.
"""
def __init__(self, suite: Suite, component: Component):
self.suite = suite
self.component = component
session = suite.session()
assert session is not None
self.session = session
[docs]
def query(self) -> "Result[tuple[str, str]]":
"""
Returns a query object that is doing most of the work.
"""
params = {
"suite_id": self.suite.suite_id,
"component_id": self.component.component_id,
}
sql_create_temp = """
create temp table newest_sources (
id integer primary key,
source text);
create index sources_binaries_by_source on newest_sources (source);
insert into newest_sources (id, source)
select distinct on (source) s.id, s.source from source s
join files_archive_map af on s.file = af.file_id
where s.id in (select source from src_associations where suite = :suite_id)
and af.component_id = :component_id
order by source, version desc;"""
self.session.execute(sql.text(sql_create_temp), params=params)
query = sql.text(
"""
select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
from newest_sources s, src_contents sc
where s.id = sc.source_id group by sc.file"""
)
return self.session.execute(query, params=params)
[docs]
def formatline(self, filename: str, package_list: str) -> str:
"""
Returns a formatted string for the filename argument.
"""
return "%s\t%s\n" % (filename, package_list)
[docs]
def fetch(self) -> Iterable[str]:
"""
Yields a new line of the Contents-source.gz file in filename order.
"""
for filename, package_list in self.query().yield_per(100):
yield self.formatline(filename, package_list)
# end transaction to return connection to pool
self.session.rollback()
[docs]
def get_list(self) -> list[str]:
"""
Returns a list of lines for the Contents-source.gz file.
"""
return [item for item in self.fetch()]
[docs]
def writer(self) -> SourceContentsFileWriter:
"""
Returns a writer object.
"""
values = {
"archive": self.suite.archive.path,
"suite": self.suite.suite_name,
"component": self.component.component_name,
}
return SourceContentsFileWriter(**values)
[docs]
def write_file(self) -> None:
"""
Write the output file.
"""
writer = self.writer()
file = writer.open()
for item in self.fetch():
file.write(item)
writer.close()
[docs]
def binary_helper(
suite_id: int, arch_id: int, overridetype_id: int, component_id: int
) -> list[str]:
"""
This function is called in a new subprocess and multiprocessing wants a top
level function.
"""
session = DBConn().session(work_mem=1000)
suite = session.get_one(Suite, suite_id)
architecture = session.get_one(Architecture, arch_id)
overridetype = session.get_one(OverrideType, overridetype_id)
component = session.get_one(Component, component_id)
log_message = [
suite.suite_name,
architecture.arch_string,
overridetype.overridetype,
component.component_name,
]
contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
contents_writer.write_file()
session.close()
return log_message
[docs]
def source_helper(suite_id: int, component_id: int) -> list[str]:
"""
This function is called in a new subprocess and multiprocessing wants a top
level function.
"""
session = DBConn().session(work_mem=1000)
suite = session.get_one(Suite, suite_id)
component = session.get_one(Component, component_id)
log_message = [suite.suite_name, "source", component.component_name]
contents_writer = SourceContentsWriter(suite, component)
contents_writer.write_file()
session.close()
return log_message
[docs]
class ContentsWriter:
"""
Loop over all suites, architectures, overridetypes, and components to write
all contents files.
"""
logger: ClassVar["Logger"]
[docs]
@classmethod
def log_result(class_, result) -> None:
"""
Writes a result message to the logfile.
"""
class_.logger.log(list(result))
[docs]
@classmethod
def write_all(
class_,
logger,
archive_names: Collection[str] | None = None,
suite_names: Collection[str] | None = None,
component_names: Collection[str] | None = None,
force=False,
):
"""
Writes all Contents files for suites in list suite_names which defaults
to all 'touchable' suites if not specified explicitely. Untouchable
suites will be included if the force argument is set to True.
"""
pool = DakProcessPool()
class_.logger = logger
session = DBConn().session()
suite_query = session.query(Suite)
if archive_names:
suite_query = suite_query.join(Suite.archive).filter(
Archive.archive_name.in_(archive_names)
)
if suite_names:
suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
component_query = session.query(Component)
if component_names:
component_query = component_query.filter(
Component.component_name.in_(component_names)
)
components = component_query.all()
if not force:
suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712
deb_type = get_override_type("deb", session)
assert deb_type is not None
deb_id = deb_type.overridetype_id
udeb_type = get_override_type("udeb", session)
assert udeb_type is not None
udeb_id = udeb_type.overridetype_id
# Lock tables so that nobody can change things underneath us
session.execute(sql.text("LOCK TABLE bin_contents IN SHARE MODE"))
session.execute(sql.text("LOCK TABLE src_contents IN SHARE MODE"))
for suite in suite_query:
suite_id = suite.suite_id
skip_arch_all = True
if suite.separate_contents_architecture_all:
skip_arch_all = False
for component in (c for c in suite.components if c in components):
component_id = component.component_id
# handle source packages
pool.apply_async(
source_helper, (suite_id, component_id), callback=class_.log_result
)
for architecture in suite.get_architectures(
skipsrc=True, skipall=skip_arch_all
):
arch_id = architecture.arch_id
# handle 'deb' packages
pool.apply_async(
binary_helper,
(suite_id, arch_id, deb_id, component_id),
callback=class_.log_result,
)
# handle 'udeb' packages
pool.apply_async(
binary_helper,
(suite_id, arch_id, udeb_id, component_id),
callback=class_.log_result,
)
pool.close()
pool.join()
session.close()
[docs]
class BinaryContentsScanner:
"""
BinaryContentsScanner provides a threadsafe method scan() to scan the
contents of a DBBinary object.
"""
def __init__(self, binary_id: int):
"""
The argument binary_id is the id of the DBBinary object that
should be scanned.
"""
self.binary_id: int = binary_id
[docs]
def scan(self) -> None:
"""
This method does the actual scan and fills in the associated BinContents
property. It commits any changes to the database. The argument dummy_arg
is ignored but needed by our threadpool implementation.
"""
session = DBConn().session()
binary = session.get_one(DBBinary, self.binary_id)
fileset = set(binary.scan_contents())
if len(fileset) == 0:
fileset.add("EMPTY_PACKAGE")
for filename in fileset:
binary.contents.append(BinContents(file=filename))
session.commit()
session.close()
[docs]
@classmethod
def scan_all(class_, limit=None):
"""
The class method scan_all() scans all binaries using multiple threads.
The number of binaries to be scanned can be limited with the limit
argument. Returns the number of processed and remaining packages as a
dict.
"""
pool = DakProcessPool()
session = DBConn().session()
query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711
remaining = query.count
if limit is not None:
query = query.limit(limit)
processed = query.count()
for binary in query.yield_per(100):
pool.apply_async(binary_scan_helper, (binary.binary_id,))
pool.close()
pool.join()
remaining_int = remaining()
session.close()
return {"processed": processed, "remaining": remaining_int}
[docs]
def binary_scan_helper(binary_id: int) -> None:
"""
This function runs in a subprocess.
"""
try:
scanner = BinaryContentsScanner(binary_id)
scanner.scan()
except Exception as e:
print("binary_scan_helper raised an exception: %s" % (e))
[docs]
class UnpackedSource:
"""
UnpackedSource extracts a source package into a temporary location and
gives you some convinient function for accessing it.
"""
def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None):
"""
The dscfilename is a name of a DSC file that will be extracted.
"""
basedir = tmpbasedir if tmpbasedir else Config()["Dir::TempPath"]
temp_directory = mkdtemp(dir=basedir)
self.root_directory: Optional[str] = os.path.join(temp_directory, "root")
command = (
"dpkg-source",
"--no-copy",
"--no-check",
"-q",
"-x",
dscfilename,
self.root_directory,
)
subprocess.check_call(command)
[docs]
def get_root_directory(self) -> str:
"""
Returns the name of the package's root directory which is the directory
where the debian subdirectory is located.
"""
assert self.root_directory is not None
return self.root_directory
[docs]
def get_all_filenames(self) -> Iterable[str]:
"""
Returns an iterator over all filenames. The filenames will be relative
to the root directory.
"""
assert self.root_directory is not None
skip = len(self.root_directory) + 1
for root, _, files in os.walk(self.root_directory):
for name in files:
yield os.path.join(root[skip:], name)
[docs]
def cleanup(self) -> None:
"""
Removes all temporary files.
"""
if self.root_directory is None:
return
parent_directory = os.path.dirname(self.root_directory)
rmtree(parent_directory)
self.root_directory = None
def __del__(self):
"""
Enforce cleanup.
"""
self.cleanup()
[docs]
class SourceContentsScanner:
"""
SourceContentsScanner provides a method scan() to scan the contents of a
DBSource object.
"""
def __init__(self, source_id: int):
"""
The argument source_id is the id of the DBSource object that
should be scanned.
"""
self.source_id: int = source_id
[docs]
def scan(self) -> None:
"""
This method does the actual scan and fills in the associated SrcContents
property. It commits any changes to the database.
"""
session = DBConn().session()
source = session.get_one(DBSource, self.source_id)
fileset = set(source.scan_contents())
for filename in fileset:
source.contents.append(SrcContents(file=filename))
session.commit()
session.close()
[docs]
@classmethod
def scan_all(class_, limit=None):
"""
The class method scan_all() scans all source using multiple processes.
The number of sources to be scanned can be limited with the limit
argument. Returns the number of processed and remaining packages as a
dict.
"""
pool = DakProcessPool()
session = DBConn().session()
query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711
remaining = query.count
if limit is not None:
query = query.limit(limit)
processed = query.count()
for source in query.yield_per(100):
pool.apply_async(source_scan_helper, (source.source_id,))
pool.close()
pool.join()
remaining_int = remaining()
session.close()
return {"processed": processed, "remaining": remaining_int}
[docs]
def source_scan_helper(source_id: int) -> None:
"""
This function runs in a subprocess.
"""
try:
scanner = SourceContentsScanner(source_id)
scanner.scan()
except Exception as e:
print("source_scan_helper raised an exception: %s" % (e))