1"""
2Helper code for contents generation.
4@contact: Debian FTPMaster <ftpmaster@debian.org>
5@copyright: 2011 Torsten Werner <twerner@debian.org>
6@license: GNU General Public License version 2 or later
7"""
9################################################################################
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25################################################################################
27import os.path
28import subprocess
29from collections.abc import Iterable
30from shutil import rmtree
31from tempfile import mkdtemp
32from typing import Optional
34import sqlalchemy.sql as sql
36from daklib.config import Config
37from daklib.dbconn import (
38 Architecture,
39 Archive,
40 BinContents,
41 Component,
42 DBBinary,
43 DBConn,
44 DBSource,
45 OverrideType,
46 SrcContents,
47 Suite,
48 get_architecture,
49 get_override_type,
50 get_suite,
51)
52from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
54from .dakmultiprocessing import DakProcessPool
57class BinaryContentsWriter:
58 """
59 BinaryContentsWriter writes the Contents-$arch.gz files.
60 """
62 def __init__(self, suite, architecture, overridetype, component):
63 self.suite = suite
64 self.architecture = architecture
65 self.overridetype = overridetype
66 self.component = component
67 self.session = suite.session()
69 def query(self):
70 """
71 Returns a query object that is doing most of the work.
72 """
73 overridesuite = self.suite
74 if self.suite.overridesuite is not None:
75 overridesuite = get_suite(self.suite.overridesuite, self.session)
76 params = {
77 "suite": self.suite.suite_id,
78 "overridesuite": overridesuite.suite_id,
79 "component": self.component.component_id,
80 "arch": self.architecture.arch_id,
81 "type_id": self.overridetype.overridetype_id,
82 "type": self.overridetype.overridetype,
83 }
85 if self.suite.separate_contents_architecture_all:
86 sql_arch_part = "architecture = :arch"
87 else:
88 sql_arch_part = "(architecture = :arch_all or architecture = :arch)"
89 params["arch_all"] = get_architecture("all", self.session).arch_id
91 sql_create_temp = (
92 """
93create temp table newest_binaries (
94 id integer primary key,
95 package text);
97create index newest_binaries_by_package on newest_binaries (package);
99insert into newest_binaries (id, package)
100 select distinct on (package) id, package from binaries
101 where type = :type and
102 %s and
103 id in (select bin from bin_associations where suite = :suite)
104 order by package, version desc;"""
105 % sql_arch_part
106 )
107 self.session.execute(sql_create_temp, params=params)
109 query = sql.text(
110 """
111with
113unique_override as
114 (select o.package, s.section
115 from override o, section s
116 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
117 o.component = :component)
119select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
120 from newest_binaries b, bin_contents bc, unique_override o
121 where b.id = bc.binary_id and o.package = b.package
122 group by bc.file"""
123 )
125 return (
126 self.session.query(sql.column("file"), sql.column("pkglist"))
127 .from_statement(query)
128 .params(params)
129 )
131 def formatline(self, filename, package_list) -> str:
132 """
133 Returns a formatted string for the filename argument.
134 """
135 return "%-55s %s\n" % (filename, package_list)
137 def fetch(self) -> Iterable[str]:
138 """
139 Yields a new line of the Contents-$arch.gz file in filename order.
140 """
141 for filename, package_list in self.query().yield_per(100):
142 yield self.formatline(filename, package_list)
143 # end transaction to return connection to pool
144 self.session.rollback()
146 def get_list(self) -> list[str]:
147 """
148 Returns a list of lines for the Contents-$arch.gz file.
149 """
150 return [item for item in self.fetch()]
152 def writer(self):
153 """
154 Returns a writer object.
155 """
156 values = {
157 "archive": self.suite.archive.path,
158 "suite": self.suite.suite_name,
159 "component": self.component.component_name,
160 "debtype": self.overridetype.overridetype,
161 "architecture": self.architecture.arch_string,
162 }
163 return BinaryContentsFileWriter(**values)
165 def write_file(self) -> None:
166 """
167 Write the output file.
168 """
169 writer = self.writer()
170 file = writer.open()
171 for item in self.fetch():
172 file.write(item)
173 writer.close()
176class SourceContentsWriter:
177 """
178 SourceContentsWriter writes the Contents-source.gz files.
179 """
181 def __init__(self, suite, component):
182 self.suite = suite
183 self.component = component
184 self.session = suite.session()
186 def query(self):
187 """
188 Returns a query object that is doing most of the work.
189 """
190 params = {
191 "suite_id": self.suite.suite_id,
192 "component_id": self.component.component_id,
193 }
195 sql_create_temp = """
196create temp table newest_sources (
197 id integer primary key,
198 source text);
200create index sources_binaries_by_source on newest_sources (source);
202insert into newest_sources (id, source)
203 select distinct on (source) s.id, s.source from source s
204 join files_archive_map af on s.file = af.file_id
205 where s.id in (select source from src_associations where suite = :suite_id)
206 and af.component_id = :component_id
207 order by source, version desc;"""
208 self.session.execute(sql_create_temp, params=params)
210 query = sql.text(
211 """
212select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
213 from newest_sources s, src_contents sc
214 where s.id = sc.source_id group by sc.file"""
215 )
217 return (
218 self.session.query(sql.column("file"), sql.column("pkglist"))
219 .from_statement(query)
220 .params(params)
221 )
223 def formatline(self, filename, package_list):
224 """
225 Returns a formatted string for the filename argument.
226 """
227 return "%s\t%s\n" % (filename, package_list)
229 def fetch(self):
230 """
231 Yields a new line of the Contents-source.gz file in filename order.
232 """
233 for filename, package_list in self.query().yield_per(100):
234 yield self.formatline(filename, package_list)
235 # end transaction to return connection to pool
236 self.session.rollback()
238 def get_list(self):
239 """
240 Returns a list of lines for the Contents-source.gz file.
241 """
242 return [item for item in self.fetch()]
244 def writer(self):
245 """
246 Returns a writer object.
247 """
248 values = {
249 "archive": self.suite.archive.path,
250 "suite": self.suite.suite_name,
251 "component": self.component.component_name,
252 }
253 return SourceContentsFileWriter(**values)
255 def write_file(self):
256 """
257 Write the output file.
258 """
259 writer = self.writer()
260 file = writer.open()
261 for item in self.fetch():
262 file.write(item)
263 writer.close()
266def binary_helper(suite_id: int, arch_id: int, overridetype_id: int, component_id: int):
267 """
268 This function is called in a new subprocess and multiprocessing wants a top
269 level function.
270 """
271 session = DBConn().session(work_mem=1000)
272 suite = Suite.get(suite_id, session)
273 architecture = Architecture.get(arch_id, session)
274 overridetype = OverrideType.get(overridetype_id, session)
275 component = Component.get(component_id, session)
276 log_message = [
277 suite.suite_name,
278 architecture.arch_string,
279 overridetype.overridetype,
280 component.component_name,
281 ]
282 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
283 contents_writer.write_file()
284 session.close()
285 return log_message
288def source_helper(suite_id: int, component_id: int):
289 """
290 This function is called in a new subprocess and multiprocessing wants a top
291 level function.
292 """
293 session = DBConn().session(work_mem=1000)
294 suite = Suite.get(suite_id, session)
295 component = Component.get(component_id, session)
296 log_message = [suite.suite_name, "source", component.component_name]
297 contents_writer = SourceContentsWriter(suite, component)
298 contents_writer.write_file()
299 session.close()
300 return log_message
303class ContentsWriter:
304 """
305 Loop over all suites, architectures, overridetypes, and components to write
306 all contents files.
307 """
309 @classmethod
310 def log_result(class_, result) -> None:
311 """
312 Writes a result message to the logfile.
313 """
314 class_.logger.log(list(result))
316 @classmethod
317 def write_all(
318 class_,
319 logger,
320 archive_names=None,
321 suite_names=None,
322 component_names=None,
323 force=False,
324 ):
325 """
326 Writes all Contents files for suites in list suite_names which defaults
327 to all 'touchable' suites if not specified explicitely. Untouchable
328 suites will be included if the force argument is set to True.
329 """
330 pool = DakProcessPool()
331 class_.logger = logger
332 session = DBConn().session()
333 suite_query = session.query(Suite)
334 if archive_names: 334 ↛ 338line 334 didn't jump to line 338, because the condition on line 334 was never false
335 suite_query = suite_query.join(Suite.archive).filter(
336 Archive.archive_name.in_(archive_names)
337 )
338 if suite_names:
339 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
340 component_query = session.query(Component)
341 if component_names: 341 ↛ 342line 341 didn't jump to line 342, because the condition on line 341 was never true
342 component_query = component_query.filter(
343 Component.component_name.in_(component_names)
344 )
345 components = component_query.all()
346 if not force: 346 ↛ 348line 346 didn't jump to line 348, because the condition on line 346 was never false
347 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712
348 deb_id = get_override_type("deb", session).overridetype_id
349 udeb_id = get_override_type("udeb", session).overridetype_id
351 # Lock tables so that nobody can change things underneath us
352 session.execute("LOCK TABLE bin_contents IN SHARE MODE")
353 session.execute("LOCK TABLE src_contents IN SHARE MODE")
355 for suite in suite_query:
356 suite_id = suite.suite_id
358 skip_arch_all = True
359 if suite.separate_contents_architecture_all:
360 skip_arch_all = False
362 for component in (c for c in suite.components if c in components):
363 component_id = component.component_id
364 # handle source packages
365 pool.apply_async(
366 source_helper, (suite_id, component_id), callback=class_.log_result
367 )
368 for architecture in suite.get_architectures(
369 skipsrc=True, skipall=skip_arch_all
370 ):
371 arch_id = architecture.arch_id
372 # handle 'deb' packages
373 pool.apply_async(
374 binary_helper,
375 (suite_id, arch_id, deb_id, component_id),
376 callback=class_.log_result,
377 )
378 # handle 'udeb' packages
379 pool.apply_async(
380 binary_helper,
381 (suite_id, arch_id, udeb_id, component_id),
382 callback=class_.log_result,
383 )
384 pool.close()
385 pool.join()
386 session.close()
389class BinaryContentsScanner:
390 """
391 BinaryContentsScanner provides a threadsafe method scan() to scan the
392 contents of a DBBinary object.
393 """
395 def __init__(self, binary_id: int):
396 """
397 The argument binary_id is the id of the DBBinary object that
398 should be scanned.
399 """
400 self.binary_id: int = binary_id
402 def scan(self) -> None:
403 """
404 This method does the actual scan and fills in the associated BinContents
405 property. It commits any changes to the database. The argument dummy_arg
406 is ignored but needed by our threadpool implementation.
407 """
408 session = DBConn().session()
409 binary = session.query(DBBinary).get(self.binary_id)
410 fileset = set(binary.scan_contents())
411 if len(fileset) == 0: 411 ↛ 412line 411 didn't jump to line 412, because the condition on line 411 was never true
412 fileset.add("EMPTY_PACKAGE")
413 for filename in fileset:
414 binary.contents.append(BinContents(file=filename))
415 session.commit()
416 session.close()
418 @classmethod
419 def scan_all(class_, limit=None):
420 """
421 The class method scan_all() scans all binaries using multiple threads.
422 The number of binaries to be scanned can be limited with the limit
423 argument. Returns the number of processed and remaining packages as a
424 dict.
425 """
426 pool = DakProcessPool()
427 session = DBConn().session()
428 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711
429 remaining = query.count
430 if limit is not None: 430 ↛ 431line 430 didn't jump to line 431, because the condition on line 430 was never true
431 query = query.limit(limit)
432 processed = query.count()
433 for binary in query.yield_per(100):
434 pool.apply_async(binary_scan_helper, (binary.binary_id,))
435 pool.close()
436 pool.join()
437 remaining = remaining()
438 session.close()
439 return {"processed": processed, "remaining": remaining}
442def binary_scan_helper(binary_id: int) -> None:
443 """
444 This function runs in a subprocess.
445 """
446 try:
447 scanner = BinaryContentsScanner(binary_id)
448 scanner.scan()
449 except Exception as e:
450 print("binary_scan_helper raised an exception: %s" % (e))
453class UnpackedSource:
454 """
455 UnpackedSource extracts a source package into a temporary location and
456 gives you some convinient function for accessing it.
457 """
459 def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None):
460 """
461 The dscfilename is a name of a DSC file that will be extracted.
462 """
463 basedir = tmpbasedir if tmpbasedir else Config()["Dir::TempPath"]
464 temp_directory = mkdtemp(dir=basedir)
465 self.root_directory: Optional[str] = os.path.join(temp_directory, "root")
466 command = (
467 "dpkg-source",
468 "--no-copy",
469 "--no-check",
470 "-q",
471 "-x",
472 dscfilename,
473 self.root_directory,
474 )
475 subprocess.check_call(command)
477 def get_root_directory(self) -> str:
478 """
479 Returns the name of the package's root directory which is the directory
480 where the debian subdirectory is located.
481 """
482 return self.root_directory
484 def get_all_filenames(self) -> Iterable[str]:
485 """
486 Returns an iterator over all filenames. The filenames will be relative
487 to the root directory.
488 """
489 skip = len(self.root_directory) + 1
490 for root, _, files in os.walk(self.root_directory):
491 for name in files:
492 yield os.path.join(root[skip:], name)
494 def cleanup(self) -> None:
495 """
496 Removes all temporary files.
497 """
498 if self.root_directory is None:
499 return
500 parent_directory = os.path.dirname(self.root_directory)
501 rmtree(parent_directory)
502 self.root_directory = None
504 def __del__(self):
505 """
506 Enforce cleanup.
507 """
508 self.cleanup()
511class SourceContentsScanner:
512 """
513 SourceContentsScanner provides a method scan() to scan the contents of a
514 DBSource object.
515 """
517 def __init__(self, source_id: int):
518 """
519 The argument source_id is the id of the DBSource object that
520 should be scanned.
521 """
522 self.source_id: int = source_id
524 def scan(self) -> None:
525 """
526 This method does the actual scan and fills in the associated SrcContents
527 property. It commits any changes to the database.
528 """
529 session = DBConn().session()
530 source = session.query(DBSource).get(self.source_id)
531 fileset = set(source.scan_contents())
532 for filename in fileset:
533 source.contents.append(SrcContents(file=filename))
534 session.commit()
535 session.close()
537 @classmethod
538 def scan_all(class_, limit=None):
539 """
540 The class method scan_all() scans all source using multiple processes.
541 The number of sources to be scanned can be limited with the limit
542 argument. Returns the number of processed and remaining packages as a
543 dict.
544 """
545 pool = DakProcessPool()
546 session = DBConn().session()
547 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711
548 remaining = query.count
549 if limit is not None: 549 ↛ 550line 549 didn't jump to line 550, because the condition on line 549 was never true
550 query = query.limit(limit)
551 processed = query.count()
552 for source in query.yield_per(100):
553 pool.apply_async(source_scan_helper, (source.source_id,))
554 pool.close()
555 pool.join()
556 remaining = remaining()
557 session.close()
558 return {"processed": processed, "remaining": remaining}
561def source_scan_helper(source_id: int) -> None:
562 """
563 This function runs in a subprocess.
564 """
565 try:
566 scanner = SourceContentsScanner(source_id)
567 scanner.scan()
568 except Exception as e:
569 print("source_scan_helper raised an exception: %s" % (e))