Package daklib :: Module contents
[hide private]
[frames] | no frames]

Source Code for Module daklib.contents

  1  """ 
  2  Helper code for contents generation. 
  3   
  4  @contact: Debian FTPMaster <ftpmaster@debian.org> 
  5  @copyright: 2011 Torsten Werner <twerner@debian.org> 
  6  @license: GNU General Public License version 2 or later 
  7  """ 
  8   
  9  ################################################################################ 
 10   
 11  # This program is free software; you can redistribute it and/or modify 
 12  # it under the terms of the GNU General Public License as published by 
 13  # the Free Software Foundation; either version 2 of the License, or 
 14  # (at your option) any later version. 
 15   
 16  # This program is distributed in the hope that it will be useful, 
 17  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 19  # GNU General Public License for more details. 
 20   
 21  # You should have received a copy of the GNU General Public License 
 22  # along with this program; if not, write to the Free Software 
 23  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 24   
 25  ################################################################################ 
 26   
 27  from daklib.dbconn import * 
 28  from daklib.config import Config 
 29  from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter 
 30   
 31  from .dakmultiprocessing import DakProcessPool 
 32  from shutil import rmtree 
 33  from tempfile import mkdtemp 
 34   
 35  import subprocess 
 36  import os.path 
 37  import sqlalchemy.sql as sql 
38 39 40 -class BinaryContentsWriter:
41 ''' 42 BinaryContentsWriter writes the Contents-$arch.gz files. 43 ''' 44
45 - def __init__(self, suite, architecture, overridetype, component):
46 self.suite = suite 47 self.architecture = architecture 48 self.overridetype = overridetype 49 self.component = component 50 self.session = suite.session()
51
52 - def query(self):
53 ''' 54 Returns a query object that is doing most of the work. 55 ''' 56 overridesuite = self.suite 57 if self.suite.overridesuite is not None: 58 overridesuite = get_suite(self.suite.overridesuite, self.session) 59 params = { 60 'suite': self.suite.suite_id, 61 'overridesuite': overridesuite.suite_id, 62 'component': self.component.component_id, 63 'arch': self.architecture.arch_id, 64 'type_id': self.overridetype.overridetype_id, 65 'type': self.overridetype.overridetype, 66 } 67 68 if self.suite.separate_contents_architecture_all: 69 sql_arch_part = 'architecture = :arch' 70 else: 71 sql_arch_part = '(architecture = :arch_all or architecture = :arch)' 72 params['arch_all'] = get_architecture('all', self.session).arch_id 73 74 sql_create_temp = ''' 75 create temp table newest_binaries ( 76 id integer primary key, 77 package text); 78 79 create index newest_binaries_by_package on newest_binaries (package); 80 81 insert into newest_binaries (id, package) 82 select distinct on (package) id, package from binaries 83 where type = :type and 84 %s and 85 id in (select bin from bin_associations where suite = :suite) 86 order by package, version desc;''' % sql_arch_part 87 self.session.execute(sql_create_temp, params=params) 88 89 query = sql.text(''' 90 with 91 92 unique_override as 93 (select o.package, s.section 94 from override o, section s 95 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and 96 o.component = :component) 97 98 select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist 99 from newest_binaries b, bin_contents bc, unique_override o 100 where b.id = bc.binary_id and o.package = b.package 101 group by bc.file''') 102 103 return self.session.query(sql.column("file"), sql.column("pkglist")) \ 104 .from_statement(query).params(params)
105
106 - def formatline(self, filename, package_list):
107 ''' 108 Returns a formatted string for the filename argument. 109 ''' 110 return "%-55s %s\n" % (filename, package_list)
111
112 - def fetch(self):
113 ''' 114 Yields a new line of the Contents-$arch.gz file in filename order. 115 ''' 116 for filename, package_list in self.query().yield_per(100): 117 yield self.formatline(filename, package_list) 118 # end transaction to return connection to pool 119 self.session.rollback()
120
121 - def get_list(self):
122 ''' 123 Returns a list of lines for the Contents-$arch.gz file. 124 ''' 125 return [item for item in self.fetch()]
126
127 - def writer(self):
128 ''' 129 Returns a writer object. 130 ''' 131 values = { 132 'archive': self.suite.archive.path, 133 'suite': self.suite.suite_name, 134 'component': self.component.component_name, 135 'debtype': self.overridetype.overridetype, 136 'architecture': self.architecture.arch_string, 137 } 138 return BinaryContentsFileWriter(**values)
139
140 - def write_file(self):
141 ''' 142 Write the output file. 143 ''' 144 writer = self.writer() 145 file = writer.open() 146 for item in self.fetch(): 147 file.write(item) 148 writer.close()
149
150 151 -class SourceContentsWriter:
152 ''' 153 SourceContentsWriter writes the Contents-source.gz files. 154 ''' 155
156 - def __init__(self, suite, component):
157 self.suite = suite 158 self.component = component 159 self.session = suite.session()
160
161 - def query(self):
162 ''' 163 Returns a query object that is doing most of the work. 164 ''' 165 params = { 166 'suite_id': self.suite.suite_id, 167 'component_id': self.component.component_id, 168 } 169 170 sql_create_temp = ''' 171 create temp table newest_sources ( 172 id integer primary key, 173 source text); 174 175 create index sources_binaries_by_source on newest_sources (source); 176 177 insert into newest_sources (id, source) 178 select distinct on (source) s.id, s.source from source s 179 join files_archive_map af on s.file = af.file_id 180 where s.id in (select source from src_associations where suite = :suite_id) 181 and af.component_id = :component_id 182 order by source, version desc;''' 183 self.session.execute(sql_create_temp, params=params) 184 185 query = sql.text(''' 186 select sc.file, string_agg(s.source, ',' order by s.source) as pkglist 187 from newest_sources s, src_contents sc 188 where s.id = sc.source_id group by sc.file''') 189 190 return self.session.query(sql.column("file"), sql.column("pkglist")) \ 191 .from_statement(query).params(params)
192
193 - def formatline(self, filename, package_list):
194 ''' 195 Returns a formatted string for the filename argument. 196 ''' 197 return "%s\t%s\n" % (filename, package_list)
198
199 - def fetch(self):
200 ''' 201 Yields a new line of the Contents-source.gz file in filename order. 202 ''' 203 for filename, package_list in self.query().yield_per(100): 204 yield self.formatline(filename, package_list) 205 # end transaction to return connection to pool 206 self.session.rollback()
207
208 - def get_list(self):
209 ''' 210 Returns a list of lines for the Contents-source.gz file. 211 ''' 212 return [item for item in self.fetch()]
213
214 - def writer(self):
215 ''' 216 Returns a writer object. 217 ''' 218 values = { 219 'archive': self.suite.archive.path, 220 'suite': self.suite.suite_name, 221 'component': self.component.component_name 222 } 223 return SourceContentsFileWriter(**values)
224
225 - def write_file(self):
226 ''' 227 Write the output file. 228 ''' 229 writer = self.writer() 230 file = writer.open() 231 for item in self.fetch(): 232 file.write(item) 233 writer.close()
234
235 236 -def binary_helper(suite_id, arch_id, overridetype_id, component_id):
237 ''' 238 This function is called in a new subprocess and multiprocessing wants a top 239 level function. 240 ''' 241 session = DBConn().session(work_mem=1000) 242 suite = Suite.get(suite_id, session) 243 architecture = Architecture.get(arch_id, session) 244 overridetype = OverrideType.get(overridetype_id, session) 245 component = Component.get(component_id, session) 246 log_message = [suite.suite_name, architecture.arch_string, 247 overridetype.overridetype, component.component_name] 248 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) 249 contents_writer.write_file() 250 session.close() 251 return log_message
252
253 254 -def source_helper(suite_id, component_id):
255 ''' 256 This function is called in a new subprocess and multiprocessing wants a top 257 level function. 258 ''' 259 session = DBConn().session(work_mem=1000) 260 suite = Suite.get(suite_id, session) 261 component = Component.get(component_id, session) 262 log_message = [suite.suite_name, 'source', component.component_name] 263 contents_writer = SourceContentsWriter(suite, component) 264 contents_writer.write_file() 265 session.close() 266 return log_message
267
268 269 -class ContentsWriter:
270 ''' 271 Loop over all suites, architectures, overridetypes, and components to write 272 all contents files. 273 ''' 274 @classmethod
275 - def log_result(class_, result):
276 ''' 277 Writes a result message to the logfile. 278 ''' 279 class_.logger.log(list(result))
280 281 @classmethod
282 - def write_all(class_, logger, archive_names=None, suite_names=None, component_names=None, force=False):
283 ''' 284 Writes all Contents files for suites in list suite_names which defaults 285 to all 'touchable' suites if not specified explicitely. Untouchable 286 suites will be included if the force argument is set to True. 287 ''' 288 pool = DakProcessPool() 289 class_.logger = logger 290 session = DBConn().session() 291 suite_query = session.query(Suite) 292 if archive_names: 293 suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names)) 294 if suite_names: 295 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) 296 component_query = session.query(Component) 297 if component_names: 298 component_query = component_query.filter(Component.component_name.in_(component_names)) 299 components = component_query.all() 300 if not force: 301 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712 302 deb_id = get_override_type('deb', session).overridetype_id 303 udeb_id = get_override_type('udeb', session).overridetype_id 304 305 # Lock tables so that nobody can change things underneath us 306 session.execute("LOCK TABLE bin_contents IN SHARE MODE") 307 session.execute("LOCK TABLE src_contents IN SHARE MODE") 308 309 for suite in suite_query: 310 suite_id = suite.suite_id 311 312 skip_arch_all = True 313 if suite.separate_contents_architecture_all: 314 skip_arch_all = False 315 316 for component in (c for c in suite.components if c in components): 317 component_id = component.component_id 318 # handle source packages 319 pool.apply_async(source_helper, (suite_id, component_id), 320 callback=class_.log_result) 321 for architecture in suite.get_architectures(skipsrc=True, skipall=skip_arch_all): 322 arch_id = architecture.arch_id 323 # handle 'deb' packages 324 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), 325 callback=class_.log_result) 326 # handle 'udeb' packages 327 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), 328 callback=class_.log_result) 329 pool.close() 330 pool.join() 331 session.close()
332
333 334 -class BinaryContentsScanner:
335 ''' 336 BinaryContentsScanner provides a threadsafe method scan() to scan the 337 contents of a DBBinary object. 338 ''' 339
340 - def __init__(self, binary_id):
341 ''' 342 The argument binary_id is the id of the DBBinary object that 343 should be scanned. 344 ''' 345 self.binary_id = binary_id
346
347 - def scan(self):
348 ''' 349 This method does the actual scan and fills in the associated BinContents 350 property. It commits any changes to the database. The argument dummy_arg 351 is ignored but needed by our threadpool implementation. 352 ''' 353 session = DBConn().session() 354 binary = session.query(DBBinary).get(self.binary_id) 355 fileset = set(binary.scan_contents()) 356 if len(fileset) == 0: 357 fileset.add('EMPTY_PACKAGE') 358 for filename in fileset: 359 binary.contents.append(BinContents(file=filename)) 360 session.commit() 361 session.close()
362 363 @classmethod
364 - def scan_all(class_, limit=None):
365 ''' 366 The class method scan_all() scans all binaries using multiple threads. 367 The number of binaries to be scanned can be limited with the limit 368 argument. Returns the number of processed and remaining packages as a 369 dict. 370 ''' 371 pool = DakProcessPool() 372 session = DBConn().session() 373 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711 374 remaining = query.count 375 if limit is not None: 376 query = query.limit(limit) 377 processed = query.count() 378 for binary in query.yield_per(100): 379 pool.apply_async(binary_scan_helper, (binary.binary_id, )) 380 pool.close() 381 pool.join() 382 remaining = remaining() 383 session.close() 384 return {'processed': processed, 'remaining': remaining}
385
386 387 -def binary_scan_helper(binary_id):
388 ''' 389 This function runs in a subprocess. 390 ''' 391 try: 392 scanner = BinaryContentsScanner(binary_id) 393 scanner.scan() 394 except Exception as e: 395 print("binary_scan_helper raised an exception: %s" % (e))
396
397 398 -class UnpackedSource:
399 ''' 400 UnpackedSource extracts a source package into a temporary location and 401 gives you some convinient function for accessing it. 402 ''' 403
404 - def __init__(self, dscfilename, tmpbasedir=None):
405 ''' 406 The dscfilename is a name of a DSC file that will be extracted. 407 ''' 408 basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath'] 409 temp_directory = mkdtemp(dir=basedir) 410 self.root_directory = os.path.join(temp_directory, 'root') 411 command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x', 412 dscfilename, self.root_directory) 413 subprocess.check_call(command)
414
415 - def get_root_directory(self):
416 ''' 417 Returns the name of the package's root directory which is the directory 418 where the debian subdirectory is located. 419 ''' 420 return self.root_directory
421
422 - def get_all_filenames(self):
423 ''' 424 Returns an iterator over all filenames. The filenames will be relative 425 to the root directory. 426 ''' 427 skip = len(self.root_directory) + 1 428 for root, _, files in os.walk(self.root_directory): 429 for name in files: 430 yield os.path.join(root[skip:], name)
431
432 - def cleanup(self):
433 ''' 434 Removes all temporary files. 435 ''' 436 if self.root_directory is None: 437 return 438 parent_directory = os.path.dirname(self.root_directory) 439 rmtree(parent_directory) 440 self.root_directory = None
441
442 - def __del__(self):
443 ''' 444 Enforce cleanup. 445 ''' 446 self.cleanup()
447
448 449 -class SourceContentsScanner:
450 ''' 451 SourceContentsScanner provides a method scan() to scan the contents of a 452 DBSource object. 453 ''' 454
455 - def __init__(self, source_id):
456 ''' 457 The argument source_id is the id of the DBSource object that 458 should be scanned. 459 ''' 460 self.source_id = source_id
461
462 - def scan(self):
463 ''' 464 This method does the actual scan and fills in the associated SrcContents 465 property. It commits any changes to the database. 466 ''' 467 session = DBConn().session() 468 source = session.query(DBSource).get(self.source_id) 469 fileset = set(source.scan_contents()) 470 for filename in fileset: 471 source.contents.append(SrcContents(file=filename)) 472 session.commit() 473 session.close()
474 475 @classmethod
476 - def scan_all(class_, limit=None):
477 ''' 478 The class method scan_all() scans all source using multiple processes. 479 The number of sources to be scanned can be limited with the limit 480 argument. Returns the number of processed and remaining packages as a 481 dict. 482 ''' 483 pool = DakProcessPool() 484 session = DBConn().session() 485 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711 486 remaining = query.count 487 if limit is not None: 488 query = query.limit(limit) 489 processed = query.count() 490 for source in query.yield_per(100): 491 pool.apply_async(source_scan_helper, (source.source_id, )) 492 pool.close() 493 pool.join() 494 remaining = remaining() 495 session.close() 496 return {'processed': processed, 'remaining': remaining}
497
498 499 -def source_scan_helper(source_id):
500 ''' 501 This function runs in a subprocess. 502 ''' 503 try: 504 scanner = SourceContentsScanner(source_id) 505 scanner.scan() 506 except Exception as e: 507 print("source_scan_helper raised an exception: %s" % (e))
508