1""" 

2Helper code for contents generation. 

3 

4@contact: Debian FTPMaster <ftpmaster@debian.org> 

5@copyright: 2011 Torsten Werner <twerner@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9################################################################################ 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27import os.path 

28import subprocess 

29from collections.abc import Iterable 

30from shutil import rmtree 

31from tempfile import mkdtemp 

32from typing import Optional 

33 

34import sqlalchemy.sql as sql 

35 

36from daklib.config import Config 

37from daklib.dbconn import ( 

38 Architecture, 

39 Archive, 

40 BinContents, 

41 Component, 

42 DBBinary, 

43 DBConn, 

44 DBSource, 

45 OverrideType, 

46 SrcContents, 

47 Suite, 

48 get_architecture, 

49 get_override_type, 

50 get_suite, 

51) 

52from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter 

53 

54from .dakmultiprocessing import DakProcessPool 

55 

56 

57class BinaryContentsWriter: 

58 """ 

59 BinaryContentsWriter writes the Contents-$arch.gz files. 

60 """ 

61 

62 def __init__(self, suite, architecture, overridetype, component): 

63 self.suite = suite 

64 self.architecture = architecture 

65 self.overridetype = overridetype 

66 self.component = component 

67 self.session = suite.session() 

68 

69 def query(self): 

70 """ 

71 Returns a query object that is doing most of the work. 

72 """ 

73 overridesuite = self.suite 

74 if self.suite.overridesuite is not None: 

75 overridesuite = get_suite(self.suite.overridesuite, self.session) 

76 params = { 

77 "suite": self.suite.suite_id, 

78 "overridesuite": overridesuite.suite_id, 

79 "component": self.component.component_id, 

80 "arch": self.architecture.arch_id, 

81 "type_id": self.overridetype.overridetype_id, 

82 "type": self.overridetype.overridetype, 

83 } 

84 

85 if self.suite.separate_contents_architecture_all: 

86 sql_arch_part = "architecture = :arch" 

87 else: 

88 sql_arch_part = "(architecture = :arch_all or architecture = :arch)" 

89 params["arch_all"] = get_architecture("all", self.session).arch_id 

90 

91 sql_create_temp = ( 

92 """ 

93create temp table newest_binaries ( 

94 id integer primary key, 

95 package text); 

96 

97create index newest_binaries_by_package on newest_binaries (package); 

98 

99insert into newest_binaries (id, package) 

100 select distinct on (package) id, package from binaries 

101 where type = :type and 

102 %s and 

103 id in (select bin from bin_associations where suite = :suite) 

104 order by package, version desc;""" 

105 % sql_arch_part 

106 ) 

107 self.session.execute(sql_create_temp, params=params) 

108 

109 query = sql.text( 

110 """ 

111with 

112 

113unique_override as 

114 (select o.package, s.section 

115 from override o, section s 

116 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and 

117 o.component = :component) 

118 

119select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist 

120 from newest_binaries b, bin_contents bc, unique_override o 

121 where b.id = bc.binary_id and o.package = b.package 

122 group by bc.file""" 

123 ) 

124 

125 return ( 

126 self.session.query(sql.column("file"), sql.column("pkglist")) 

127 .from_statement(query) 

128 .params(params) 

129 ) 

130 

131 def formatline(self, filename, package_list) -> str: 

132 """ 

133 Returns a formatted string for the filename argument. 

134 """ 

135 return "%-55s %s\n" % (filename, package_list) 

136 

137 def fetch(self) -> Iterable[str]: 

138 """ 

139 Yields a new line of the Contents-$arch.gz file in filename order. 

140 """ 

141 for filename, package_list in self.query().yield_per(100): 

142 yield self.formatline(filename, package_list) 

143 # end transaction to return connection to pool 

144 self.session.rollback() 

145 

146 def get_list(self) -> list[str]: 

147 """ 

148 Returns a list of lines for the Contents-$arch.gz file. 

149 """ 

150 return [item for item in self.fetch()] 

151 

152 def writer(self): 

153 """ 

154 Returns a writer object. 

155 """ 

156 values = { 

157 "archive": self.suite.archive.path, 

158 "suite": self.suite.suite_name, 

159 "component": self.component.component_name, 

160 "debtype": self.overridetype.overridetype, 

161 "architecture": self.architecture.arch_string, 

162 } 

163 return BinaryContentsFileWriter(**values) 

164 

165 def write_file(self) -> None: 

166 """ 

167 Write the output file. 

168 """ 

169 writer = self.writer() 

170 file = writer.open() 

171 for item in self.fetch(): 

172 file.write(item) 

173 writer.close() 

174 

175 

176class SourceContentsWriter: 

177 """ 

178 SourceContentsWriter writes the Contents-source.gz files. 

179 """ 

180 

181 def __init__(self, suite, component): 

182 self.suite = suite 

183 self.component = component 

184 self.session = suite.session() 

185 

186 def query(self): 

187 """ 

188 Returns a query object that is doing most of the work. 

189 """ 

190 params = { 

191 "suite_id": self.suite.suite_id, 

192 "component_id": self.component.component_id, 

193 } 

194 

195 sql_create_temp = """ 

196create temp table newest_sources ( 

197 id integer primary key, 

198 source text); 

199 

200create index sources_binaries_by_source on newest_sources (source); 

201 

202insert into newest_sources (id, source) 

203 select distinct on (source) s.id, s.source from source s 

204 join files_archive_map af on s.file = af.file_id 

205 where s.id in (select source from src_associations where suite = :suite_id) 

206 and af.component_id = :component_id 

207 order by source, version desc;""" 

208 self.session.execute(sql_create_temp, params=params) 

209 

210 query = sql.text( 

211 """ 

212select sc.file, string_agg(s.source, ',' order by s.source) as pkglist 

213 from newest_sources s, src_contents sc 

214 where s.id = sc.source_id group by sc.file""" 

215 ) 

216 

217 return ( 

218 self.session.query(sql.column("file"), sql.column("pkglist")) 

219 .from_statement(query) 

220 .params(params) 

221 ) 

222 

223 def formatline(self, filename, package_list): 

224 """ 

225 Returns a formatted string for the filename argument. 

226 """ 

227 return "%s\t%s\n" % (filename, package_list) 

228 

229 def fetch(self): 

230 """ 

231 Yields a new line of the Contents-source.gz file in filename order. 

232 """ 

233 for filename, package_list in self.query().yield_per(100): 

234 yield self.formatline(filename, package_list) 

235 # end transaction to return connection to pool 

236 self.session.rollback() 

237 

238 def get_list(self): 

239 """ 

240 Returns a list of lines for the Contents-source.gz file. 

241 """ 

242 return [item for item in self.fetch()] 

243 

244 def writer(self): 

245 """ 

246 Returns a writer object. 

247 """ 

248 values = { 

249 "archive": self.suite.archive.path, 

250 "suite": self.suite.suite_name, 

251 "component": self.component.component_name, 

252 } 

253 return SourceContentsFileWriter(**values) 

254 

255 def write_file(self): 

256 """ 

257 Write the output file. 

258 """ 

259 writer = self.writer() 

260 file = writer.open() 

261 for item in self.fetch(): 

262 file.write(item) 

263 writer.close() 

264 

265 

266def binary_helper(suite_id: int, arch_id: int, overridetype_id: int, component_id: int): 

267 """ 

268 This function is called in a new subprocess and multiprocessing wants a top 

269 level function. 

270 """ 

271 session = DBConn().session(work_mem=1000) 

272 suite = Suite.get(suite_id, session) 

273 architecture = Architecture.get(arch_id, session) 

274 overridetype = OverrideType.get(overridetype_id, session) 

275 component = Component.get(component_id, session) 

276 log_message = [ 

277 suite.suite_name, 

278 architecture.arch_string, 

279 overridetype.overridetype, 

280 component.component_name, 

281 ] 

282 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) 

283 contents_writer.write_file() 

284 session.close() 

285 return log_message 

286 

287 

288def source_helper(suite_id: int, component_id: int): 

289 """ 

290 This function is called in a new subprocess and multiprocessing wants a top 

291 level function. 

292 """ 

293 session = DBConn().session(work_mem=1000) 

294 suite = Suite.get(suite_id, session) 

295 component = Component.get(component_id, session) 

296 log_message = [suite.suite_name, "source", component.component_name] 

297 contents_writer = SourceContentsWriter(suite, component) 

298 contents_writer.write_file() 

299 session.close() 

300 return log_message 

301 

302 

303class ContentsWriter: 

304 """ 

305 Loop over all suites, architectures, overridetypes, and components to write 

306 all contents files. 

307 """ 

308 

309 @classmethod 

310 def log_result(class_, result) -> None: 

311 """ 

312 Writes a result message to the logfile. 

313 """ 

314 class_.logger.log(list(result)) 

315 

316 @classmethod 

317 def write_all( 

318 class_, 

319 logger, 

320 archive_names=None, 

321 suite_names=None, 

322 component_names=None, 

323 force=False, 

324 ): 

325 """ 

326 Writes all Contents files for suites in list suite_names which defaults 

327 to all 'touchable' suites if not specified explicitely. Untouchable 

328 suites will be included if the force argument is set to True. 

329 """ 

330 pool = DakProcessPool() 

331 class_.logger = logger 

332 session = DBConn().session() 

333 suite_query = session.query(Suite) 

334 if archive_names: 334 ↛ 338line 334 didn't jump to line 338, because the condition on line 334 was never false

335 suite_query = suite_query.join(Suite.archive).filter( 

336 Archive.archive_name.in_(archive_names) 

337 ) 

338 if suite_names: 

339 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) 

340 component_query = session.query(Component) 

341 if component_names: 341 ↛ 342line 341 didn't jump to line 342, because the condition on line 341 was never true

342 component_query = component_query.filter( 

343 Component.component_name.in_(component_names) 

344 ) 

345 components = component_query.all() 

346 if not force: 346 ↛ 348line 346 didn't jump to line 348, because the condition on line 346 was never false

347 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712 

348 deb_id = get_override_type("deb", session).overridetype_id 

349 udeb_id = get_override_type("udeb", session).overridetype_id 

350 

351 # Lock tables so that nobody can change things underneath us 

352 session.execute("LOCK TABLE bin_contents IN SHARE MODE") 

353 session.execute("LOCK TABLE src_contents IN SHARE MODE") 

354 

355 for suite in suite_query: 

356 suite_id = suite.suite_id 

357 

358 skip_arch_all = True 

359 if suite.separate_contents_architecture_all: 

360 skip_arch_all = False 

361 

362 for component in (c for c in suite.components if c in components): 

363 component_id = component.component_id 

364 # handle source packages 

365 pool.apply_async( 

366 source_helper, (suite_id, component_id), callback=class_.log_result 

367 ) 

368 for architecture in suite.get_architectures( 

369 skipsrc=True, skipall=skip_arch_all 

370 ): 

371 arch_id = architecture.arch_id 

372 # handle 'deb' packages 

373 pool.apply_async( 

374 binary_helper, 

375 (suite_id, arch_id, deb_id, component_id), 

376 callback=class_.log_result, 

377 ) 

378 # handle 'udeb' packages 

379 pool.apply_async( 

380 binary_helper, 

381 (suite_id, arch_id, udeb_id, component_id), 

382 callback=class_.log_result, 

383 ) 

384 pool.close() 

385 pool.join() 

386 session.close() 

387 

388 

389class BinaryContentsScanner: 

390 """ 

391 BinaryContentsScanner provides a threadsafe method scan() to scan the 

392 contents of a DBBinary object. 

393 """ 

394 

395 def __init__(self, binary_id: int): 

396 """ 

397 The argument binary_id is the id of the DBBinary object that 

398 should be scanned. 

399 """ 

400 self.binary_id: int = binary_id 

401 

402 def scan(self) -> None: 

403 """ 

404 This method does the actual scan and fills in the associated BinContents 

405 property. It commits any changes to the database. The argument dummy_arg 

406 is ignored but needed by our threadpool implementation. 

407 """ 

408 session = DBConn().session() 

409 binary = session.query(DBBinary).get(self.binary_id) 

410 fileset = set(binary.scan_contents()) 

411 if len(fileset) == 0: 411 ↛ 412line 411 didn't jump to line 412, because the condition on line 411 was never true

412 fileset.add("EMPTY_PACKAGE") 

413 for filename in fileset: 

414 binary.contents.append(BinContents(file=filename)) 

415 session.commit() 

416 session.close() 

417 

418 @classmethod 

419 def scan_all(class_, limit=None): 

420 """ 

421 The class method scan_all() scans all binaries using multiple threads. 

422 The number of binaries to be scanned can be limited with the limit 

423 argument. Returns the number of processed and remaining packages as a 

424 dict. 

425 """ 

426 pool = DakProcessPool() 

427 session = DBConn().session() 

428 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711 

429 remaining = query.count 

430 if limit is not None: 430 ↛ 431line 430 didn't jump to line 431, because the condition on line 430 was never true

431 query = query.limit(limit) 

432 processed = query.count() 

433 for binary in query.yield_per(100): 

434 pool.apply_async(binary_scan_helper, (binary.binary_id,)) 

435 pool.close() 

436 pool.join() 

437 remaining = remaining() 

438 session.close() 

439 return {"processed": processed, "remaining": remaining} 

440 

441 

442def binary_scan_helper(binary_id: int) -> None: 

443 """ 

444 This function runs in a subprocess. 

445 """ 

446 try: 

447 scanner = BinaryContentsScanner(binary_id) 

448 scanner.scan() 

449 except Exception as e: 

450 print("binary_scan_helper raised an exception: %s" % (e)) 

451 

452 

453class UnpackedSource: 

454 """ 

455 UnpackedSource extracts a source package into a temporary location and 

456 gives you some convinient function for accessing it. 

457 """ 

458 

459 def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None): 

460 """ 

461 The dscfilename is a name of a DSC file that will be extracted. 

462 """ 

463 basedir = tmpbasedir if tmpbasedir else Config()["Dir::TempPath"] 

464 temp_directory = mkdtemp(dir=basedir) 

465 self.root_directory: Optional[str] = os.path.join(temp_directory, "root") 

466 command = ( 

467 "dpkg-source", 

468 "--no-copy", 

469 "--no-check", 

470 "-q", 

471 "-x", 

472 dscfilename, 

473 self.root_directory, 

474 ) 

475 subprocess.check_call(command) 

476 

477 def get_root_directory(self) -> str: 

478 """ 

479 Returns the name of the package's root directory which is the directory 

480 where the debian subdirectory is located. 

481 """ 

482 return self.root_directory 

483 

484 def get_all_filenames(self) -> Iterable[str]: 

485 """ 

486 Returns an iterator over all filenames. The filenames will be relative 

487 to the root directory. 

488 """ 

489 skip = len(self.root_directory) + 1 

490 for root, _, files in os.walk(self.root_directory): 

491 for name in files: 

492 yield os.path.join(root[skip:], name) 

493 

494 def cleanup(self) -> None: 

495 """ 

496 Removes all temporary files. 

497 """ 

498 if self.root_directory is None: 

499 return 

500 parent_directory = os.path.dirname(self.root_directory) 

501 rmtree(parent_directory) 

502 self.root_directory = None 

503 

504 def __del__(self): 

505 """ 

506 Enforce cleanup. 

507 """ 

508 self.cleanup() 

509 

510 

511class SourceContentsScanner: 

512 """ 

513 SourceContentsScanner provides a method scan() to scan the contents of a 

514 DBSource object. 

515 """ 

516 

517 def __init__(self, source_id: int): 

518 """ 

519 The argument source_id is the id of the DBSource object that 

520 should be scanned. 

521 """ 

522 self.source_id: int = source_id 

523 

524 def scan(self) -> None: 

525 """ 

526 This method does the actual scan and fills in the associated SrcContents 

527 property. It commits any changes to the database. 

528 """ 

529 session = DBConn().session() 

530 source = session.query(DBSource).get(self.source_id) 

531 fileset = set(source.scan_contents()) 

532 for filename in fileset: 

533 source.contents.append(SrcContents(file=filename)) 

534 session.commit() 

535 session.close() 

536 

537 @classmethod 

538 def scan_all(class_, limit=None): 

539 """ 

540 The class method scan_all() scans all source using multiple processes. 

541 The number of sources to be scanned can be limited with the limit 

542 argument. Returns the number of processed and remaining packages as a 

543 dict. 

544 """ 

545 pool = DakProcessPool() 

546 session = DBConn().session() 

547 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711 

548 remaining = query.count 

549 if limit is not None: 549 ↛ 550line 549 didn't jump to line 550, because the condition on line 549 was never true

550 query = query.limit(limit) 

551 processed = query.count() 

552 for source in query.yield_per(100): 

553 pool.apply_async(source_scan_helper, (source.source_id,)) 

554 pool.close() 

555 pool.join() 

556 remaining = remaining() 

557 session.close() 

558 return {"processed": processed, "remaining": remaining} 

559 

560 

561def source_scan_helper(source_id: int) -> None: 

562 """ 

563 This function runs in a subprocess. 

564 """ 

565 try: 

566 scanner = SourceContentsScanner(source_id) 

567 scanner.scan() 

568 except Exception as e: 

569 print("source_scan_helper raised an exception: %s" % (e))