Coverage for daklib/contents.py: 95%
237 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1"""
2Helper code for contents generation.
4@contact: Debian FTPMaster <ftpmaster@debian.org>
5@copyright: 2011 Torsten Werner <twerner@debian.org>
6@license: GNU General Public License version 2 or later
7"""
9################################################################################
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25################################################################################
27import os.path
28import subprocess
29from collections.abc import Collection, Iterable
30from shutil import rmtree
31from tempfile import mkdtemp
32from typing import TYPE_CHECKING, ClassVar, Optional
34import sqlalchemy.sql as sql
36from daklib.config import Config
37from daklib.dbconn import (
38 Architecture,
39 Archive,
40 BinContents,
41 Component,
42 DBBinary,
43 DBConn,
44 DBSource,
45 OverrideType,
46 SrcContents,
47 Suite,
48 get_architecture,
49 get_override_type,
50 get_suite,
51)
52from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
54from .dakmultiprocessing import DakProcessPool
56if TYPE_CHECKING:
57 from sqlalchemy.engine import Result
59 from daklib.daklog import Logger
62class BinaryContentsWriter:
63 """
64 BinaryContentsWriter writes the Contents-$arch.gz files.
65 """
67 def __init__(
68 self,
69 suite: Suite,
70 architecture: Architecture,
71 overridetype: OverrideType,
72 component: Component,
73 ) -> None:
74 self.suite = suite
75 self.architecture = architecture
76 self.overridetype = overridetype
77 self.component = component
78 session = suite.session()
79 assert session is not None
80 self.session = session
82 def query(self) -> "Result[tuple[str, str]]":
83 """
84 Returns a query object that is doing most of the work.
85 """
86 overridesuite = (
87 get_suite(self.suite.overridesuite, self.session)
88 if self.suite.overridesuite
89 else self.suite
90 )
91 assert overridesuite is not None
92 params = {
93 "suite": self.suite.suite_id,
94 "overridesuite": overridesuite.suite_id,
95 "component": self.component.component_id,
96 "arch": self.architecture.arch_id,
97 "type_id": self.overridetype.overridetype_id,
98 "type": self.overridetype.overridetype,
99 }
101 if self.suite.separate_contents_architecture_all:
102 sql_arch_part = "architecture = :arch"
103 else:
104 sql_arch_part = "(architecture = :arch_all or architecture = :arch)"
105 arch_all = get_architecture("all", self.session)
106 assert arch_all is not None
107 params["arch_all"] = arch_all.arch_id
109 sql_create_temp = (
110 """
111create temp table newest_binaries (
112 id integer primary key,
113 package text);
115create index newest_binaries_by_package on newest_binaries (package);
117insert into newest_binaries (id, package)
118 select distinct on (package) id, package from binaries
119 where type = :type and
120 %s and
121 id in (select bin from bin_associations where suite = :suite)
122 order by package, version desc;"""
123 % sql_arch_part
124 )
125 self.session.execute(sql.text(sql_create_temp), params=params)
127 query = sql.text(
128 """
129with
131unique_override as
132 (select o.package, s.section
133 from override o, section s
134 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
135 o.component = :component)
137select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
138 from newest_binaries b, bin_contents bc, unique_override o
139 where b.id = bc.binary_id and o.package = b.package
140 group by bc.file"""
141 )
143 return self.session.execute(query, params=params)
145 def formatline(self, filename: str, package_list: str) -> str:
146 """
147 Returns a formatted string for the filename argument.
148 """
149 return "%-55s %s\n" % (filename, package_list)
151 def fetch(self) -> Iterable[str]:
152 """
153 Yields a new line of the Contents-$arch.gz file in filename order.
154 """
155 for filename, package_list in self.query().yield_per(100):
156 yield self.formatline(filename, package_list)
157 # end transaction to return connection to pool
158 self.session.rollback()
160 def get_list(self) -> list[str]:
161 """
162 Returns a list of lines for the Contents-$arch.gz file.
163 """
164 return [item for item in self.fetch()]
166 def writer(self) -> BinaryContentsFileWriter:
167 """
168 Returns a writer object.
169 """
170 values = {
171 "archive": self.suite.archive.path,
172 "suite": self.suite.suite_name,
173 "component": self.component.component_name,
174 "debtype": self.overridetype.overridetype,
175 "architecture": self.architecture.arch_string,
176 }
177 return BinaryContentsFileWriter(**values)
179 def write_file(self) -> None:
180 """
181 Write the output file.
182 """
183 writer = self.writer()
184 file = writer.open()
185 for item in self.fetch():
186 file.write(item)
187 writer.close()
190class SourceContentsWriter:
191 """
192 SourceContentsWriter writes the Contents-source.gz files.
193 """
195 def __init__(self, suite: Suite, component: Component):
196 self.suite = suite
197 self.component = component
198 session = suite.session()
199 assert session is not None
200 self.session = session
202 def query(self) -> "Result[tuple[str, str]]":
203 """
204 Returns a query object that is doing most of the work.
205 """
206 params = {
207 "suite_id": self.suite.suite_id,
208 "component_id": self.component.component_id,
209 }
211 sql_create_temp = """
212create temp table newest_sources (
213 id integer primary key,
214 source text);
216create index sources_binaries_by_source on newest_sources (source);
218insert into newest_sources (id, source)
219 select distinct on (source) s.id, s.source from source s
220 join files_archive_map af on s.file = af.file_id
221 where s.id in (select source from src_associations where suite = :suite_id)
222 and af.component_id = :component_id
223 order by source, version desc;"""
224 self.session.execute(sql.text(sql_create_temp), params=params)
226 query = sql.text(
227 """
228select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
229 from newest_sources s, src_contents sc
230 where s.id = sc.source_id group by sc.file"""
231 )
233 return self.session.execute(query, params=params)
235 def formatline(self, filename: str, package_list: str) -> str:
236 """
237 Returns a formatted string for the filename argument.
238 """
239 return "%s\t%s\n" % (filename, package_list)
241 def fetch(self) -> Iterable[str]:
242 """
243 Yields a new line of the Contents-source.gz file in filename order.
244 """
245 for filename, package_list in self.query().yield_per(100):
246 yield self.formatline(filename, package_list)
247 # end transaction to return connection to pool
248 self.session.rollback()
250 def get_list(self) -> list[str]:
251 """
252 Returns a list of lines for the Contents-source.gz file.
253 """
254 return [item for item in self.fetch()]
256 def writer(self) -> SourceContentsFileWriter:
257 """
258 Returns a writer object.
259 """
260 values = {
261 "archive": self.suite.archive.path,
262 "suite": self.suite.suite_name,
263 "component": self.component.component_name,
264 }
265 return SourceContentsFileWriter(**values)
267 def write_file(self) -> None:
268 """
269 Write the output file.
270 """
271 writer = self.writer()
272 file = writer.open()
273 for item in self.fetch():
274 file.write(item)
275 writer.close()
278def binary_helper(
279 suite_id: int, arch_id: int, overridetype_id: int, component_id: int
280) -> list[str]:
281 """
282 This function is called in a new subprocess and multiprocessing wants a top
283 level function.
284 """
285 session = DBConn().session(work_mem=1000)
286 suite = session.get_one(Suite, suite_id)
287 architecture = session.get_one(Architecture, arch_id)
288 overridetype = session.get_one(OverrideType, overridetype_id)
289 component = session.get_one(Component, component_id)
290 log_message = [
291 suite.suite_name,
292 architecture.arch_string,
293 overridetype.overridetype,
294 component.component_name,
295 ]
296 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
297 contents_writer.write_file()
298 session.close()
299 return log_message
302def source_helper(suite_id: int, component_id: int) -> list[str]:
303 """
304 This function is called in a new subprocess and multiprocessing wants a top
305 level function.
306 """
307 session = DBConn().session(work_mem=1000)
308 suite = session.get_one(Suite, suite_id)
309 component = session.get_one(Component, component_id)
310 log_message = [suite.suite_name, "source", component.component_name]
311 contents_writer = SourceContentsWriter(suite, component)
312 contents_writer.write_file()
313 session.close()
314 return log_message
317class ContentsWriter:
318 """
319 Loop over all suites, architectures, overridetypes, and components to write
320 all contents files.
321 """
323 logger: ClassVar["Logger"]
325 @classmethod
326 def log_result(class_, result) -> None:
327 """
328 Writes a result message to the logfile.
329 """
330 class_.logger.log(list(result))
332 @classmethod
333 def write_all(
334 class_,
335 logger,
336 archive_names: Collection[str] | None = None,
337 suite_names: Collection[str] | None = None,
338 component_names: Collection[str] | None = None,
339 force=False,
340 ):
341 """
342 Writes all Contents files for suites in list suite_names which defaults
343 to all 'touchable' suites if not specified explicitely. Untouchable
344 suites will be included if the force argument is set to True.
345 """
346 pool = DakProcessPool()
347 class_.logger = logger
348 session = DBConn().session()
349 suite_query = session.query(Suite)
350 if archive_names: 350 ↛ 354line 350 didn't jump to line 354 because the condition on line 350 was always true
351 suite_query = suite_query.join(Suite.archive).filter(
352 Archive.archive_name.in_(archive_names)
353 )
354 if suite_names:
355 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
356 component_query = session.query(Component)
357 if component_names: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 component_query = component_query.filter(
359 Component.component_name.in_(component_names)
360 )
361 components = component_query.all()
362 if not force: 362 ↛ 364line 362 didn't jump to line 364 because the condition on line 362 was always true
363 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712
364 deb_type = get_override_type("deb", session)
365 assert deb_type is not None
366 deb_id = deb_type.overridetype_id
367 udeb_type = get_override_type("udeb", session)
368 assert udeb_type is not None
369 udeb_id = udeb_type.overridetype_id
371 # Lock tables so that nobody can change things underneath us
372 session.execute(sql.text("LOCK TABLE bin_contents IN SHARE MODE"))
373 session.execute(sql.text("LOCK TABLE src_contents IN SHARE MODE"))
375 for suite in suite_query:
376 suite_id = suite.suite_id
378 skip_arch_all = True
379 if suite.separate_contents_architecture_all:
380 skip_arch_all = False
382 for component in (c for c in suite.components if c in components):
383 component_id = component.component_id
384 # handle source packages
385 pool.apply_async(
386 source_helper, (suite_id, component_id), callback=class_.log_result
387 )
388 for architecture in suite.get_architectures(
389 skipsrc=True, skipall=skip_arch_all
390 ):
391 arch_id = architecture.arch_id
392 # handle 'deb' packages
393 pool.apply_async(
394 binary_helper,
395 (suite_id, arch_id, deb_id, component_id),
396 callback=class_.log_result,
397 )
398 # handle 'udeb' packages
399 pool.apply_async(
400 binary_helper,
401 (suite_id, arch_id, udeb_id, component_id),
402 callback=class_.log_result,
403 )
404 pool.close()
405 pool.join()
406 session.close()
409class BinaryContentsScanner:
410 """
411 BinaryContentsScanner provides a threadsafe method scan() to scan the
412 contents of a DBBinary object.
413 """
415 def __init__(self, binary_id: int):
416 """
417 The argument binary_id is the id of the DBBinary object that
418 should be scanned.
419 """
420 self.binary_id: int = binary_id
422 def scan(self) -> None:
423 """
424 This method does the actual scan and fills in the associated BinContents
425 property. It commits any changes to the database. The argument dummy_arg
426 is ignored but needed by our threadpool implementation.
427 """
428 session = DBConn().session()
429 binary = session.get_one(DBBinary, self.binary_id)
430 fileset = set(binary.scan_contents())
431 if len(fileset) == 0: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true
432 fileset.add("EMPTY_PACKAGE")
433 for filename in fileset:
434 binary.contents.append(BinContents(file=filename))
435 session.commit()
436 session.close()
438 @classmethod
439 def scan_all(class_, limit=None):
440 """
441 The class method scan_all() scans all binaries using multiple threads.
442 The number of binaries to be scanned can be limited with the limit
443 argument. Returns the number of processed and remaining packages as a
444 dict.
445 """
446 pool = DakProcessPool()
447 session = DBConn().session()
448 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711
449 remaining = query.count
450 if limit is not None: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true
451 query = query.limit(limit)
452 processed = query.count()
453 for binary in query.yield_per(100):
454 pool.apply_async(binary_scan_helper, (binary.binary_id,))
455 pool.close()
456 pool.join()
457 remaining_int = remaining()
458 session.close()
459 return {"processed": processed, "remaining": remaining_int}
462def binary_scan_helper(binary_id: int) -> None:
463 """
464 This function runs in a subprocess.
465 """
466 try:
467 scanner = BinaryContentsScanner(binary_id)
468 scanner.scan()
469 except Exception as e:
470 print("binary_scan_helper raised an exception: %s" % (e))
473class UnpackedSource:
474 """
475 UnpackedSource extracts a source package into a temporary location and
476 gives you some convinient function for accessing it.
477 """
479 def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None):
480 """
481 The dscfilename is a name of a DSC file that will be extracted.
482 """
483 basedir = tmpbasedir if tmpbasedir else Config()["Dir::TempPath"]
484 temp_directory = mkdtemp(dir=basedir)
485 self.root_directory: Optional[str] = os.path.join(temp_directory, "root")
486 command = (
487 "dpkg-source",
488 "--no-copy",
489 "--no-check",
490 "-q",
491 "-x",
492 dscfilename,
493 self.root_directory,
494 )
495 subprocess.check_call(command)
497 def get_root_directory(self) -> str:
498 """
499 Returns the name of the package's root directory which is the directory
500 where the debian subdirectory is located.
501 """
502 assert self.root_directory is not None
503 return self.root_directory
505 def get_all_filenames(self) -> Iterable[str]:
506 """
507 Returns an iterator over all filenames. The filenames will be relative
508 to the root directory.
509 """
510 assert self.root_directory is not None
511 skip = len(self.root_directory) + 1
512 for root, _, files in os.walk(self.root_directory):
513 for name in files:
514 yield os.path.join(root[skip:], name)
516 def cleanup(self) -> None:
517 """
518 Removes all temporary files.
519 """
520 if self.root_directory is None:
521 return
522 parent_directory = os.path.dirname(self.root_directory)
523 rmtree(parent_directory)
524 self.root_directory = None
526 def __del__(self):
527 """
528 Enforce cleanup.
529 """
530 self.cleanup()
533class SourceContentsScanner:
534 """
535 SourceContentsScanner provides a method scan() to scan the contents of a
536 DBSource object.
537 """
539 def __init__(self, source_id: int):
540 """
541 The argument source_id is the id of the DBSource object that
542 should be scanned.
543 """
544 self.source_id: int = source_id
546 def scan(self) -> None:
547 """
548 This method does the actual scan and fills in the associated SrcContents
549 property. It commits any changes to the database.
550 """
551 session = DBConn().session()
552 source = session.get_one(DBSource, self.source_id)
553 fileset = set(source.scan_contents())
554 for filename in fileset:
555 source.contents.append(SrcContents(file=filename))
556 session.commit()
557 session.close()
559 @classmethod
560 def scan_all(class_, limit=None):
561 """
562 The class method scan_all() scans all source using multiple processes.
563 The number of sources to be scanned can be limited with the limit
564 argument. Returns the number of processed and remaining packages as a
565 dict.
566 """
567 pool = DakProcessPool()
568 session = DBConn().session()
569 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711
570 remaining = query.count
571 if limit is not None: 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true
572 query = query.limit(limit)
573 processed = query.count()
574 for source in query.yield_per(100):
575 pool.apply_async(source_scan_helper, (source.source_id,))
576 pool.close()
577 pool.join()
578 remaining_int = remaining()
579 session.close()
580 return {"processed": processed, "remaining": remaining_int}
583def source_scan_helper(source_id: int) -> None:
584 """
585 This function runs in a subprocess.
586 """
587 try:
588 scanner = SourceContentsScanner(source_id)
589 scanner.scan()
590 except Exception as e:
591 print("source_scan_helper raised an exception: %s" % (e))