1""" 

2Helper code for contents generation. 

3 

4@contact: Debian FTPMaster <ftpmaster@debian.org> 

5@copyright: 2011 Torsten Werner <twerner@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9################################################################################ 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27from daklib.dbconn import * 

28from daklib.config import Config 

29from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter 

30 

31from .dakmultiprocessing import DakProcessPool 

32from shutil import rmtree 

33from tempfile import mkdtemp 

34from collections.abc import Iterable 

35from typing import Optional 

36 

37import subprocess 

38import os.path 

39import sqlalchemy.sql as sql 

40 

41 

42class BinaryContentsWriter: 

43 ''' 

44 BinaryContentsWriter writes the Contents-$arch.gz files. 

45 ''' 

46 

47 def __init__(self, suite, architecture, overridetype, component): 

48 self.suite = suite 

49 self.architecture = architecture 

50 self.overridetype = overridetype 

51 self.component = component 

52 self.session = suite.session() 

53 

54 def query(self): 

55 ''' 

56 Returns a query object that is doing most of the work. 

57 ''' 

58 overridesuite = self.suite 

59 if self.suite.overridesuite is not None: 

60 overridesuite = get_suite(self.suite.overridesuite, self.session) 

61 params = { 

62 'suite': self.suite.suite_id, 

63 'overridesuite': overridesuite.suite_id, 

64 'component': self.component.component_id, 

65 'arch': self.architecture.arch_id, 

66 'type_id': self.overridetype.overridetype_id, 

67 'type': self.overridetype.overridetype, 

68 } 

69 

70 if self.suite.separate_contents_architecture_all: 

71 sql_arch_part = 'architecture = :arch' 

72 else: 

73 sql_arch_part = '(architecture = :arch_all or architecture = :arch)' 

74 params['arch_all'] = get_architecture('all', self.session).arch_id 

75 

76 sql_create_temp = ''' 

77create temp table newest_binaries ( 

78 id integer primary key, 

79 package text); 

80 

81create index newest_binaries_by_package on newest_binaries (package); 

82 

83insert into newest_binaries (id, package) 

84 select distinct on (package) id, package from binaries 

85 where type = :type and 

86 %s and 

87 id in (select bin from bin_associations where suite = :suite) 

88 order by package, version desc;''' % sql_arch_part 

89 self.session.execute(sql_create_temp, params=params) 

90 

91 query = sql.text(''' 

92with 

93 

94unique_override as 

95 (select o.package, s.section 

96 from override o, section s 

97 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and 

98 o.component = :component) 

99 

100select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist 

101 from newest_binaries b, bin_contents bc, unique_override o 

102 where b.id = bc.binary_id and o.package = b.package 

103 group by bc.file''') 

104 

105 return self.session.query(sql.column("file"), sql.column("pkglist")) \ 

106 .from_statement(query).params(params) 

107 

108 def formatline(self, filename, package_list) -> str: 

109 ''' 

110 Returns a formatted string for the filename argument. 

111 ''' 

112 return "%-55s %s\n" % (filename, package_list) 

113 

114 def fetch(self) -> Iterable[str]: 

115 ''' 

116 Yields a new line of the Contents-$arch.gz file in filename order. 

117 ''' 

118 for filename, package_list in self.query().yield_per(100): 

119 yield self.formatline(filename, package_list) 

120 # end transaction to return connection to pool 

121 self.session.rollback() 

122 

123 def get_list(self) -> list[str]: 

124 ''' 

125 Returns a list of lines for the Contents-$arch.gz file. 

126 ''' 

127 return [item for item in self.fetch()] 

128 

129 def writer(self): 

130 ''' 

131 Returns a writer object. 

132 ''' 

133 values = { 

134 'archive': self.suite.archive.path, 

135 'suite': self.suite.suite_name, 

136 'component': self.component.component_name, 

137 'debtype': self.overridetype.overridetype, 

138 'architecture': self.architecture.arch_string, 

139 } 

140 return BinaryContentsFileWriter(**values) 

141 

142 def write_file(self) -> None: 

143 ''' 

144 Write the output file. 

145 ''' 

146 writer = self.writer() 

147 file = writer.open() 

148 for item in self.fetch(): 

149 file.write(item) 

150 writer.close() 

151 

152 

153class SourceContentsWriter: 

154 ''' 

155 SourceContentsWriter writes the Contents-source.gz files. 

156 ''' 

157 

158 def __init__(self, suite, component): 

159 self.suite = suite 

160 self.component = component 

161 self.session = suite.session() 

162 

163 def query(self): 

164 ''' 

165 Returns a query object that is doing most of the work. 

166 ''' 

167 params = { 

168 'suite_id': self.suite.suite_id, 

169 'component_id': self.component.component_id, 

170 } 

171 

172 sql_create_temp = ''' 

173create temp table newest_sources ( 

174 id integer primary key, 

175 source text); 

176 

177create index sources_binaries_by_source on newest_sources (source); 

178 

179insert into newest_sources (id, source) 

180 select distinct on (source) s.id, s.source from source s 

181 join files_archive_map af on s.file = af.file_id 

182 where s.id in (select source from src_associations where suite = :suite_id) 

183 and af.component_id = :component_id 

184 order by source, version desc;''' 

185 self.session.execute(sql_create_temp, params=params) 

186 

187 query = sql.text(''' 

188select sc.file, string_agg(s.source, ',' order by s.source) as pkglist 

189 from newest_sources s, src_contents sc 

190 where s.id = sc.source_id group by sc.file''') 

191 

192 return self.session.query(sql.column("file"), sql.column("pkglist")) \ 

193 .from_statement(query).params(params) 

194 

195 def formatline(self, filename, package_list): 

196 ''' 

197 Returns a formatted string for the filename argument. 

198 ''' 

199 return "%s\t%s\n" % (filename, package_list) 

200 

201 def fetch(self): 

202 ''' 

203 Yields a new line of the Contents-source.gz file in filename order. 

204 ''' 

205 for filename, package_list in self.query().yield_per(100): 

206 yield self.formatline(filename, package_list) 

207 # end transaction to return connection to pool 

208 self.session.rollback() 

209 

210 def get_list(self): 

211 ''' 

212 Returns a list of lines for the Contents-source.gz file. 

213 ''' 

214 return [item for item in self.fetch()] 

215 

216 def writer(self): 

217 ''' 

218 Returns a writer object. 

219 ''' 

220 values = { 

221 'archive': self.suite.archive.path, 

222 'suite': self.suite.suite_name, 

223 'component': self.component.component_name 

224 } 

225 return SourceContentsFileWriter(**values) 

226 

227 def write_file(self): 

228 ''' 

229 Write the output file. 

230 ''' 

231 writer = self.writer() 

232 file = writer.open() 

233 for item in self.fetch(): 

234 file.write(item) 

235 writer.close() 

236 

237 

238def binary_helper(suite_id: int, arch_id: int, overridetype_id: int, component_id: int): 

239 ''' 

240 This function is called in a new subprocess and multiprocessing wants a top 

241 level function. 

242 ''' 

243 session = DBConn().session(work_mem=1000) 

244 suite = Suite.get(suite_id, session) 

245 architecture = Architecture.get(arch_id, session) 

246 overridetype = OverrideType.get(overridetype_id, session) 

247 component = Component.get(component_id, session) 

248 log_message = [suite.suite_name, architecture.arch_string, 

249 overridetype.overridetype, component.component_name] 

250 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) 

251 contents_writer.write_file() 

252 session.close() 

253 return log_message 

254 

255 

256def source_helper(suite_id: int, component_id: int): 

257 ''' 

258 This function is called in a new subprocess and multiprocessing wants a top 

259 level function. 

260 ''' 

261 session = DBConn().session(work_mem=1000) 

262 suite = Suite.get(suite_id, session) 

263 component = Component.get(component_id, session) 

264 log_message = [suite.suite_name, 'source', component.component_name] 

265 contents_writer = SourceContentsWriter(suite, component) 

266 contents_writer.write_file() 

267 session.close() 

268 return log_message 

269 

270 

271class ContentsWriter: 

272 ''' 

273 Loop over all suites, architectures, overridetypes, and components to write 

274 all contents files. 

275 ''' 

276 @classmethod 

277 def log_result(class_, result) -> None: 

278 ''' 

279 Writes a result message to the logfile. 

280 ''' 

281 class_.logger.log(list(result)) 

282 

283 @classmethod 

284 def write_all(class_, logger, archive_names=None, suite_names=None, component_names=None, force=False): 

285 ''' 

286 Writes all Contents files for suites in list suite_names which defaults 

287 to all 'touchable' suites if not specified explicitely. Untouchable 

288 suites will be included if the force argument is set to True. 

289 ''' 

290 pool = DakProcessPool() 

291 class_.logger = logger 

292 session = DBConn().session() 

293 suite_query = session.query(Suite) 

294 if archive_names: 294 ↛ 296line 294 didn't jump to line 296, because the condition on line 294 was never false

295 suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names)) 

296 if suite_names: 

297 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) 

298 component_query = session.query(Component) 

299 if component_names: 299 ↛ 300line 299 didn't jump to line 300, because the condition on line 299 was never true

300 component_query = component_query.filter(Component.component_name.in_(component_names)) 

301 components = component_query.all() 

302 if not force: 302 ↛ 304line 302 didn't jump to line 304, because the condition on line 302 was never false

303 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712 

304 deb_id = get_override_type('deb', session).overridetype_id 

305 udeb_id = get_override_type('udeb', session).overridetype_id 

306 

307 # Lock tables so that nobody can change things underneath us 

308 session.execute("LOCK TABLE bin_contents IN SHARE MODE") 

309 session.execute("LOCK TABLE src_contents IN SHARE MODE") 

310 

311 for suite in suite_query: 

312 suite_id = suite.suite_id 

313 

314 skip_arch_all = True 

315 if suite.separate_contents_architecture_all: 

316 skip_arch_all = False 

317 

318 for component in (c for c in suite.components if c in components): 

319 component_id = component.component_id 

320 # handle source packages 

321 pool.apply_async(source_helper, (suite_id, component_id), 

322 callback=class_.log_result) 

323 for architecture in suite.get_architectures(skipsrc=True, skipall=skip_arch_all): 

324 arch_id = architecture.arch_id 

325 # handle 'deb' packages 

326 pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id), 

327 callback=class_.log_result) 

328 # handle 'udeb' packages 

329 pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id), 

330 callback=class_.log_result) 

331 pool.close() 

332 pool.join() 

333 session.close() 

334 

335 

336class BinaryContentsScanner: 

337 ''' 

338 BinaryContentsScanner provides a threadsafe method scan() to scan the 

339 contents of a DBBinary object. 

340 ''' 

341 

342 def __init__(self, binary_id: int): 

343 ''' 

344 The argument binary_id is the id of the DBBinary object that 

345 should be scanned. 

346 ''' 

347 self.binary_id: int = binary_id 

348 

349 def scan(self) -> None: 

350 ''' 

351 This method does the actual scan and fills in the associated BinContents 

352 property. It commits any changes to the database. The argument dummy_arg 

353 is ignored but needed by our threadpool implementation. 

354 ''' 

355 session = DBConn().session() 

356 binary = session.query(DBBinary).get(self.binary_id) 

357 fileset = set(binary.scan_contents()) 

358 if len(fileset) == 0: 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true

359 fileset.add('EMPTY_PACKAGE') 

360 for filename in fileset: 

361 binary.contents.append(BinContents(file=filename)) 

362 session.commit() 

363 session.close() 

364 

365 @classmethod 

366 def scan_all(class_, limit=None): 

367 ''' 

368 The class method scan_all() scans all binaries using multiple threads. 

369 The number of binaries to be scanned can be limited with the limit 

370 argument. Returns the number of processed and remaining packages as a 

371 dict. 

372 ''' 

373 pool = DakProcessPool() 

374 session = DBConn().session() 

375 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711 

376 remaining = query.count 

377 if limit is not None: 377 ↛ 378line 377 didn't jump to line 378, because the condition on line 377 was never true

378 query = query.limit(limit) 

379 processed = query.count() 

380 for binary in query.yield_per(100): 

381 pool.apply_async(binary_scan_helper, (binary.binary_id, )) 

382 pool.close() 

383 pool.join() 

384 remaining = remaining() 

385 session.close() 

386 return {'processed': processed, 'remaining': remaining} 

387 

388 

389def binary_scan_helper(binary_id: int) -> None: 

390 ''' 

391 This function runs in a subprocess. 

392 ''' 

393 try: 

394 scanner = BinaryContentsScanner(binary_id) 

395 scanner.scan() 

396 except Exception as e: 

397 print("binary_scan_helper raised an exception: %s" % (e)) 

398 

399 

400class UnpackedSource: 

401 ''' 

402 UnpackedSource extracts a source package into a temporary location and 

403 gives you some convinient function for accessing it. 

404 ''' 

405 

406 def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None): 

407 ''' 

408 The dscfilename is a name of a DSC file that will be extracted. 

409 ''' 

410 basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath'] 

411 temp_directory = mkdtemp(dir=basedir) 

412 self.root_directory: Optional[str] = os.path.join(temp_directory, 'root') 

413 command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x', 

414 dscfilename, self.root_directory) 

415 subprocess.check_call(command) 

416 

417 def get_root_directory(self) -> str: 

418 ''' 

419 Returns the name of the package's root directory which is the directory 

420 where the debian subdirectory is located. 

421 ''' 

422 return self.root_directory 

423 

424 def get_all_filenames(self) -> Iterable[str]: 

425 ''' 

426 Returns an iterator over all filenames. The filenames will be relative 

427 to the root directory. 

428 ''' 

429 skip = len(self.root_directory) + 1 

430 for root, _, files in os.walk(self.root_directory): 

431 for name in files: 

432 yield os.path.join(root[skip:], name) 

433 

434 def cleanup(self) -> None: 

435 ''' 

436 Removes all temporary files. 

437 ''' 

438 if self.root_directory is None: 

439 return 

440 parent_directory = os.path.dirname(self.root_directory) 

441 rmtree(parent_directory) 

442 self.root_directory = None 

443 

444 def __del__(self): 

445 ''' 

446 Enforce cleanup. 

447 ''' 

448 self.cleanup() 

449 

450 

451class SourceContentsScanner: 

452 ''' 

453 SourceContentsScanner provides a method scan() to scan the contents of a 

454 DBSource object. 

455 ''' 

456 

457 def __init__(self, source_id: int): 

458 ''' 

459 The argument source_id is the id of the DBSource object that 

460 should be scanned. 

461 ''' 

462 self.source_id: int = source_id 

463 

464 def scan(self) -> None: 

465 ''' 

466 This method does the actual scan and fills in the associated SrcContents 

467 property. It commits any changes to the database. 

468 ''' 

469 session = DBConn().session() 

470 source = session.query(DBSource).get(self.source_id) 

471 fileset = set(source.scan_contents()) 

472 for filename in fileset: 

473 source.contents.append(SrcContents(file=filename)) 

474 session.commit() 

475 session.close() 

476 

477 @classmethod 

478 def scan_all(class_, limit=None): 

479 ''' 

480 The class method scan_all() scans all source using multiple processes. 

481 The number of sources to be scanned can be limited with the limit 

482 argument. Returns the number of processed and remaining packages as a 

483 dict. 

484 ''' 

485 pool = DakProcessPool() 

486 session = DBConn().session() 

487 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711 

488 remaining = query.count 

489 if limit is not None: 489 ↛ 490line 489 didn't jump to line 490, because the condition on line 489 was never true

490 query = query.limit(limit) 

491 processed = query.count() 

492 for source in query.yield_per(100): 

493 pool.apply_async(source_scan_helper, (source.source_id, )) 

494 pool.close() 

495 pool.join() 

496 remaining = remaining() 

497 session.close() 

498 return {'processed': processed, 'remaining': remaining} 

499 

500 

501def source_scan_helper(source_id: int) -> None: 

502 ''' 

503 This function runs in a subprocess. 

504 ''' 

505 try: 

506 scanner = SourceContentsScanner(source_id) 

507 scanner.scan() 

508 except Exception as e: 

509 print("source_scan_helper raised an exception: %s" % (e))