1"""
2Helper code for contents generation.
4@contact: Debian FTPMaster <ftpmaster@debian.org>
5@copyright: 2011 Torsten Werner <twerner@debian.org>
6@license: GNU General Public License version 2 or later
7"""
9################################################################################
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25################################################################################
27from daklib.dbconn import *
28from daklib.config import Config
29from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
31from .dakmultiprocessing import DakProcessPool
32from shutil import rmtree
33from tempfile import mkdtemp
34from collections.abc import Iterable
35from typing import Optional
37import subprocess
38import os.path
39import sqlalchemy.sql as sql
42class BinaryContentsWriter:
43 '''
44 BinaryContentsWriter writes the Contents-$arch.gz files.
45 '''
47 def __init__(self, suite, architecture, overridetype, component):
48 self.suite = suite
49 self.architecture = architecture
50 self.overridetype = overridetype
51 self.component = component
52 self.session = suite.session()
54 def query(self):
55 '''
56 Returns a query object that is doing most of the work.
57 '''
58 overridesuite = self.suite
59 if self.suite.overridesuite is not None:
60 overridesuite = get_suite(self.suite.overridesuite, self.session)
61 params = {
62 'suite': self.suite.suite_id,
63 'overridesuite': overridesuite.suite_id,
64 'component': self.component.component_id,
65 'arch': self.architecture.arch_id,
66 'type_id': self.overridetype.overridetype_id,
67 'type': self.overridetype.overridetype,
68 }
70 if self.suite.separate_contents_architecture_all:
71 sql_arch_part = 'architecture = :arch'
72 else:
73 sql_arch_part = '(architecture = :arch_all or architecture = :arch)'
74 params['arch_all'] = get_architecture('all', self.session).arch_id
76 sql_create_temp = '''
77create temp table newest_binaries (
78 id integer primary key,
79 package text);
81create index newest_binaries_by_package on newest_binaries (package);
83insert into newest_binaries (id, package)
84 select distinct on (package) id, package from binaries
85 where type = :type and
86 %s and
87 id in (select bin from bin_associations where suite = :suite)
88 order by package, version desc;''' % sql_arch_part
89 self.session.execute(sql_create_temp, params=params)
91 query = sql.text('''
92with
94unique_override as
95 (select o.package, s.section
96 from override o, section s
97 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
98 o.component = :component)
100select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
101 from newest_binaries b, bin_contents bc, unique_override o
102 where b.id = bc.binary_id and o.package = b.package
103 group by bc.file''')
105 return self.session.query(sql.column("file"), sql.column("pkglist")) \
106 .from_statement(query).params(params)
108 def formatline(self, filename, package_list) -> str:
109 '''
110 Returns a formatted string for the filename argument.
111 '''
112 return "%-55s %s\n" % (filename, package_list)
114 def fetch(self) -> Iterable[str]:
115 '''
116 Yields a new line of the Contents-$arch.gz file in filename order.
117 '''
118 for filename, package_list in self.query().yield_per(100):
119 yield self.formatline(filename, package_list)
120 # end transaction to return connection to pool
121 self.session.rollback()
123 def get_list(self) -> list[str]:
124 '''
125 Returns a list of lines for the Contents-$arch.gz file.
126 '''
127 return [item for item in self.fetch()]
129 def writer(self):
130 '''
131 Returns a writer object.
132 '''
133 values = {
134 'archive': self.suite.archive.path,
135 'suite': self.suite.suite_name,
136 'component': self.component.component_name,
137 'debtype': self.overridetype.overridetype,
138 'architecture': self.architecture.arch_string,
139 }
140 return BinaryContentsFileWriter(**values)
142 def write_file(self) -> None:
143 '''
144 Write the output file.
145 '''
146 writer = self.writer()
147 file = writer.open()
148 for item in self.fetch():
149 file.write(item)
150 writer.close()
153class SourceContentsWriter:
154 '''
155 SourceContentsWriter writes the Contents-source.gz files.
156 '''
158 def __init__(self, suite, component):
159 self.suite = suite
160 self.component = component
161 self.session = suite.session()
163 def query(self):
164 '''
165 Returns a query object that is doing most of the work.
166 '''
167 params = {
168 'suite_id': self.suite.suite_id,
169 'component_id': self.component.component_id,
170 }
172 sql_create_temp = '''
173create temp table newest_sources (
174 id integer primary key,
175 source text);
177create index sources_binaries_by_source on newest_sources (source);
179insert into newest_sources (id, source)
180 select distinct on (source) s.id, s.source from source s
181 join files_archive_map af on s.file = af.file_id
182 where s.id in (select source from src_associations where suite = :suite_id)
183 and af.component_id = :component_id
184 order by source, version desc;'''
185 self.session.execute(sql_create_temp, params=params)
187 query = sql.text('''
188select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
189 from newest_sources s, src_contents sc
190 where s.id = sc.source_id group by sc.file''')
192 return self.session.query(sql.column("file"), sql.column("pkglist")) \
193 .from_statement(query).params(params)
195 def formatline(self, filename, package_list):
196 '''
197 Returns a formatted string for the filename argument.
198 '''
199 return "%s\t%s\n" % (filename, package_list)
201 def fetch(self):
202 '''
203 Yields a new line of the Contents-source.gz file in filename order.
204 '''
205 for filename, package_list in self.query().yield_per(100):
206 yield self.formatline(filename, package_list)
207 # end transaction to return connection to pool
208 self.session.rollback()
210 def get_list(self):
211 '''
212 Returns a list of lines for the Contents-source.gz file.
213 '''
214 return [item for item in self.fetch()]
216 def writer(self):
217 '''
218 Returns a writer object.
219 '''
220 values = {
221 'archive': self.suite.archive.path,
222 'suite': self.suite.suite_name,
223 'component': self.component.component_name
224 }
225 return SourceContentsFileWriter(**values)
227 def write_file(self):
228 '''
229 Write the output file.
230 '''
231 writer = self.writer()
232 file = writer.open()
233 for item in self.fetch():
234 file.write(item)
235 writer.close()
238def binary_helper(suite_id: int, arch_id: int, overridetype_id: int, component_id: int):
239 '''
240 This function is called in a new subprocess and multiprocessing wants a top
241 level function.
242 '''
243 session = DBConn().session(work_mem=1000)
244 suite = Suite.get(suite_id, session)
245 architecture = Architecture.get(arch_id, session)
246 overridetype = OverrideType.get(overridetype_id, session)
247 component = Component.get(component_id, session)
248 log_message = [suite.suite_name, architecture.arch_string,
249 overridetype.overridetype, component.component_name]
250 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
251 contents_writer.write_file()
252 session.close()
253 return log_message
256def source_helper(suite_id: int, component_id: int):
257 '''
258 This function is called in a new subprocess and multiprocessing wants a top
259 level function.
260 '''
261 session = DBConn().session(work_mem=1000)
262 suite = Suite.get(suite_id, session)
263 component = Component.get(component_id, session)
264 log_message = [suite.suite_name, 'source', component.component_name]
265 contents_writer = SourceContentsWriter(suite, component)
266 contents_writer.write_file()
267 session.close()
268 return log_message
271class ContentsWriter:
272 '''
273 Loop over all suites, architectures, overridetypes, and components to write
274 all contents files.
275 '''
276 @classmethod
277 def log_result(class_, result) -> None:
278 '''
279 Writes a result message to the logfile.
280 '''
281 class_.logger.log(list(result))
283 @classmethod
284 def write_all(class_, logger, archive_names=None, suite_names=None, component_names=None, force=False):
285 '''
286 Writes all Contents files for suites in list suite_names which defaults
287 to all 'touchable' suites if not specified explicitely. Untouchable
288 suites will be included if the force argument is set to True.
289 '''
290 pool = DakProcessPool()
291 class_.logger = logger
292 session = DBConn().session()
293 suite_query = session.query(Suite)
294 if archive_names: 294 ↛ 296line 294 didn't jump to line 296, because the condition on line 294 was never false
295 suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
296 if suite_names:
297 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
298 component_query = session.query(Component)
299 if component_names: 299 ↛ 300line 299 didn't jump to line 300, because the condition on line 299 was never true
300 component_query = component_query.filter(Component.component_name.in_(component_names))
301 components = component_query.all()
302 if not force: 302 ↛ 304line 302 didn't jump to line 304, because the condition on line 302 was never false
303 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712
304 deb_id = get_override_type('deb', session).overridetype_id
305 udeb_id = get_override_type('udeb', session).overridetype_id
307 # Lock tables so that nobody can change things underneath us
308 session.execute("LOCK TABLE bin_contents IN SHARE MODE")
309 session.execute("LOCK TABLE src_contents IN SHARE MODE")
311 for suite in suite_query:
312 suite_id = suite.suite_id
314 skip_arch_all = True
315 if suite.separate_contents_architecture_all:
316 skip_arch_all = False
318 for component in (c for c in suite.components if c in components):
319 component_id = component.component_id
320 # handle source packages
321 pool.apply_async(source_helper, (suite_id, component_id),
322 callback=class_.log_result)
323 for architecture in suite.get_architectures(skipsrc=True, skipall=skip_arch_all):
324 arch_id = architecture.arch_id
325 # handle 'deb' packages
326 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id),
327 callback=class_.log_result)
328 # handle 'udeb' packages
329 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id),
330 callback=class_.log_result)
331 pool.close()
332 pool.join()
333 session.close()
336class BinaryContentsScanner:
337 '''
338 BinaryContentsScanner provides a threadsafe method scan() to scan the
339 contents of a DBBinary object.
340 '''
342 def __init__(self, binary_id: int):
343 '''
344 The argument binary_id is the id of the DBBinary object that
345 should be scanned.
346 '''
347 self.binary_id: int = binary_id
349 def scan(self) -> None:
350 '''
351 This method does the actual scan and fills in the associated BinContents
352 property. It commits any changes to the database. The argument dummy_arg
353 is ignored but needed by our threadpool implementation.
354 '''
355 session = DBConn().session()
356 binary = session.query(DBBinary).get(self.binary_id)
357 fileset = set(binary.scan_contents())
358 if len(fileset) == 0: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true
359 fileset.add('EMPTY_PACKAGE')
360 for filename in fileset:
361 binary.contents.append(BinContents(file=filename))
362 session.commit()
363 session.close()
365 @classmethod
366 def scan_all(class_, limit=None):
367 '''
368 The class method scan_all() scans all binaries using multiple threads.
369 The number of binaries to be scanned can be limited with the limit
370 argument. Returns the number of processed and remaining packages as a
371 dict.
372 '''
373 pool = DakProcessPool()
374 session = DBConn().session()
375 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711
376 remaining = query.count
377 if limit is not None: 377 ↛ 378line 377 didn't jump to line 378, because the condition on line 377 was never true
378 query = query.limit(limit)
379 processed = query.count()
380 for binary in query.yield_per(100):
381 pool.apply_async(binary_scan_helper, (binary.binary_id, ))
382 pool.close()
383 pool.join()
384 remaining = remaining()
385 session.close()
386 return {'processed': processed, 'remaining': remaining}
389def binary_scan_helper(binary_id: int) -> None:
390 '''
391 This function runs in a subprocess.
392 '''
393 try:
394 scanner = BinaryContentsScanner(binary_id)
395 scanner.scan()
396 except Exception as e:
397 print("binary_scan_helper raised an exception: %s" % (e))
400class UnpackedSource:
401 '''
402 UnpackedSource extracts a source package into a temporary location and
403 gives you some convinient function for accessing it.
404 '''
406 def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None):
407 '''
408 The dscfilename is a name of a DSC file that will be extracted.
409 '''
410 basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
411 temp_directory = mkdtemp(dir=basedir)
412 self.root_directory: Optional[str] = os.path.join(temp_directory, 'root')
413 command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
414 dscfilename, self.root_directory)
415 subprocess.check_call(command)
417 def get_root_directory(self) -> str:
418 '''
419 Returns the name of the package's root directory which is the directory
420 where the debian subdirectory is located.
421 '''
422 return self.root_directory
424 def get_all_filenames(self) -> Iterable[str]:
425 '''
426 Returns an iterator over all filenames. The filenames will be relative
427 to the root directory.
428 '''
429 skip = len(self.root_directory) + 1
430 for root, _, files in os.walk(self.root_directory):
431 for name in files:
432 yield os.path.join(root[skip:], name)
434 def cleanup(self) -> None:
435 '''
436 Removes all temporary files.
437 '''
438 if self.root_directory is None:
439 return
440 parent_directory = os.path.dirname(self.root_directory)
441 rmtree(parent_directory)
442 self.root_directory = None
444 def __del__(self):
445 '''
446 Enforce cleanup.
447 '''
448 self.cleanup()
451class SourceContentsScanner:
452 '''
453 SourceContentsScanner provides a method scan() to scan the contents of a
454 DBSource object.
455 '''
457 def __init__(self, source_id: int):
458 '''
459 The argument source_id is the id of the DBSource object that
460 should be scanned.
461 '''
462 self.source_id: int = source_id
464 def scan(self) -> None:
465 '''
466 This method does the actual scan and fills in the associated SrcContents
467 property. It commits any changes to the database.
468 '''
469 session = DBConn().session()
470 source = session.query(DBSource).get(self.source_id)
471 fileset = set(source.scan_contents())
472 for filename in fileset:
473 source.contents.append(SrcContents(file=filename))
474 session.commit()
475 session.close()
477 @classmethod
478 def scan_all(class_, limit=None):
479 '''
480 The class method scan_all() scans all source using multiple processes.
481 The number of sources to be scanned can be limited with the limit
482 argument. Returns the number of processed and remaining packages as a
483 dict.
484 '''
485 pool = DakProcessPool()
486 session = DBConn().session()
487 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711
488 remaining = query.count
489 if limit is not None: 489 ↛ 490line 489 didn't jump to line 490, because the condition on line 489 was never true
490 query = query.limit(limit)
491 processed = query.count()
492 for source in query.yield_per(100):
493 pool.apply_async(source_scan_helper, (source.source_id, ))
494 pool.close()
495 pool.join()
496 remaining = remaining()
497 session.close()
498 return {'processed': processed, 'remaining': remaining}
501def source_scan_helper(source_id: int) -> None:
502 '''
503 This function runs in a subprocess.
504 '''
505 try:
506 scanner = SourceContentsScanner(source_id)
507 scanner.scan()
508 except Exception as e:
509 print("source_scan_helper raised an exception: %s" % (e))