Coverage for daklib/contents.py: 95%

239 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1""" 

2Helper code for contents generation. 

3 

4@contact: Debian FTPMaster <ftpmaster@debian.org> 

5@copyright: 2011 Torsten Werner <twerner@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9################################################################################ 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27import os.path 

28from collections.abc import Collection, Iterable 

29from shutil import rmtree 

30from tempfile import mkdtemp 

31from typing import TYPE_CHECKING, ClassVar, Optional 

32 

33import sqlalchemy.sql as sql 

34 

35import daklib.utils 

36from daklib import sandbox 

37from daklib.config import Config 

38from daklib.dbconn import ( 

39 Architecture, 

40 Archive, 

41 BinContents, 

42 Component, 

43 DBBinary, 

44 DBConn, 

45 DBSource, 

46 OverrideType, 

47 SrcContents, 

48 Suite, 

49 get_architecture, 

50 get_override_type, 

51 get_suite, 

52) 

53from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter 

54 

55from .dakmultiprocessing import DakProcessPool 

56 

57if TYPE_CHECKING: 

58 from sqlalchemy.engine import Result 

59 

60 from daklib.daklog import Logger 

61 

62 

63class BinaryContentsWriter: 

64 """ 

65 BinaryContentsWriter writes the Contents-$arch.gz files. 

66 """ 

67 

68 def __init__( 

69 self, 

70 suite: Suite, 

71 architecture: Architecture, 

72 overridetype: OverrideType, 

73 component: Component, 

74 ) -> None: 

75 self.suite = suite 

76 self.architecture = architecture 

77 self.overridetype = overridetype 

78 self.component = component 

79 session = suite.session() 

80 assert session is not None 

81 self.session = session 

82 

83 def query(self) -> "Result[tuple[str, str]]": 

84 """ 

85 Returns a query object that is doing most of the work. 

86 """ 

87 overridesuite = ( 

88 get_suite(self.suite.overridesuite, self.session) 

89 if self.suite.overridesuite 

90 else self.suite 

91 ) 

92 assert overridesuite is not None 

93 params = { 

94 "suite": self.suite.suite_id, 

95 "overridesuite": overridesuite.suite_id, 

96 "component": self.component.component_id, 

97 "arch": self.architecture.arch_id, 

98 "type_id": self.overridetype.overridetype_id, 

99 "type": self.overridetype.overridetype, 

100 } 

101 

102 if self.suite.separate_contents_architecture_all: 

103 sql_arch_part = "architecture = :arch" 

104 else: 

105 sql_arch_part = "(architecture = :arch_all or architecture = :arch)" 

106 arch_all = get_architecture("all", self.session) 

107 assert arch_all is not None 

108 params["arch_all"] = arch_all.arch_id 

109 

110 sql_create_temp = ( 

111 """ 

112create temp table newest_binaries ( 

113 id integer primary key, 

114 package text); 

115 

116create index newest_binaries_by_package on newest_binaries (package); 

117 

118insert into newest_binaries (id, package) 

119 select distinct on (package) id, package from binaries 

120 where type = :type and 

121 %s and 

122 id in (select bin from bin_associations where suite = :suite) 

123 order by package, version desc;""" 

124 % sql_arch_part 

125 ) 

126 self.session.execute(sql.text(sql_create_temp), params=params) 

127 

128 query = sql.text( 

129 """ 

130with 

131 

132unique_override as 

133 (select o.package, s.section 

134 from override o, section s 

135 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and 

136 o.component = :component) 

137 

138select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist 

139 from newest_binaries b, bin_contents bc, unique_override o 

140 where b.id = bc.binary_id and o.package = b.package 

141 group by bc.file""" 

142 ) 

143 

144 return self.session.execute(query, params=params) 

145 

146 def formatline(self, filename: str, package_list: str) -> str: 

147 """ 

148 Returns a formatted string for the filename argument. 

149 """ 

150 return "%-55s %s\n" % (filename, package_list) 

151 

152 def fetch(self) -> Iterable[str]: 

153 """ 

154 Yields a new line of the Contents-$arch.gz file in filename order. 

155 """ 

156 for filename, package_list in self.query().yield_per(100): 

157 yield self.formatline(filename, package_list) 

158 # end transaction to return connection to pool 

159 self.session.rollback() 

160 

161 def get_list(self) -> list[str]: 

162 """ 

163 Returns a list of lines for the Contents-$arch.gz file. 

164 """ 

165 return [item for item in self.fetch()] 

166 

167 def writer(self) -> BinaryContentsFileWriter: 

168 """ 

169 Returns a writer object. 

170 """ 

171 values = { 

172 "archive": self.suite.archive.path, 

173 "suite": self.suite.suite_name, 

174 "component": self.component.component_name, 

175 "debtype": self.overridetype.overridetype, 

176 "architecture": self.architecture.arch_string, 

177 } 

178 return BinaryContentsFileWriter(**values) 

179 

180 def write_file(self) -> None: 

181 """ 

182 Write the output file. 

183 """ 

184 writer = self.writer() 

185 file = writer.open() 

186 for item in self.fetch(): 

187 file.write(item) 

188 writer.close() 

189 

190 

191class SourceContentsWriter: 

192 """ 

193 SourceContentsWriter writes the Contents-source.gz files. 

194 """ 

195 

196 def __init__(self, suite: Suite, component: Component): 

197 self.suite = suite 

198 self.component = component 

199 session = suite.session() 

200 assert session is not None 

201 self.session = session 

202 

203 def query(self) -> "Result[tuple[str, str]]": 

204 """ 

205 Returns a query object that is doing most of the work. 

206 """ 

207 params = { 

208 "suite_id": self.suite.suite_id, 

209 "component_id": self.component.component_id, 

210 } 

211 

212 sql_create_temp = """ 

213create temp table newest_sources ( 

214 id integer primary key, 

215 source text); 

216 

217create index sources_binaries_by_source on newest_sources (source); 

218 

219insert into newest_sources (id, source) 

220 select distinct on (source) s.id, s.source from source s 

221 join files_archive_map af on s.file = af.file_id 

222 where s.id in (select source from src_associations where suite = :suite_id) 

223 and af.component_id = :component_id 

224 order by source, version desc;""" 

225 self.session.execute(sql.text(sql_create_temp), params=params) 

226 

227 query = sql.text( 

228 """ 

229select sc.file, string_agg(s.source, ',' order by s.source) as pkglist 

230 from newest_sources s, src_contents sc 

231 where s.id = sc.source_id group by sc.file""" 

232 ) 

233 

234 return self.session.execute(query, params=params) 

235 

236 def formatline(self, filename: str, package_list: str) -> str: 

237 """ 

238 Returns a formatted string for the filename argument. 

239 """ 

240 return "%s\t%s\n" % (filename, package_list) 

241 

242 def fetch(self) -> Iterable[str]: 

243 """ 

244 Yields a new line of the Contents-source.gz file in filename order. 

245 """ 

246 for filename, package_list in self.query().yield_per(100): 

247 yield self.formatline(filename, package_list) 

248 # end transaction to return connection to pool 

249 self.session.rollback() 

250 

251 def get_list(self) -> list[str]: 

252 """ 

253 Returns a list of lines for the Contents-source.gz file. 

254 """ 

255 return [item for item in self.fetch()] 

256 

257 def writer(self) -> SourceContentsFileWriter: 

258 """ 

259 Returns a writer object. 

260 """ 

261 values = { 

262 "archive": self.suite.archive.path, 

263 "suite": self.suite.suite_name, 

264 "component": self.component.component_name, 

265 } 

266 return SourceContentsFileWriter(**values) 

267 

268 def write_file(self) -> None: 

269 """ 

270 Write the output file. 

271 """ 

272 writer = self.writer() 

273 file = writer.open() 

274 for item in self.fetch(): 

275 file.write(item) 

276 writer.close() 

277 

278 

279def binary_helper( 

280 suite_id: int, arch_id: int, overridetype_id: int, component_id: int 

281) -> list[str]: 

282 """ 

283 This function is called in a new subprocess and multiprocessing wants a top 

284 level function. 

285 """ 

286 session = DBConn().session(work_mem=1000) 

287 suite = session.get_one(Suite, suite_id) 

288 architecture = session.get_one(Architecture, arch_id) 

289 overridetype = session.get_one(OverrideType, overridetype_id) 

290 component = session.get_one(Component, component_id) 

291 log_message = [ 

292 suite.suite_name, 

293 architecture.arch_string, 

294 overridetype.overridetype, 

295 component.component_name, 

296 ] 

297 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) 

298 contents_writer.write_file() 

299 session.close() 

300 return log_message 

301 

302 

303def source_helper(suite_id: int, component_id: int) -> list[str]: 

304 """ 

305 This function is called in a new subprocess and multiprocessing wants a top 

306 level function. 

307 """ 

308 session = DBConn().session(work_mem=1000) 

309 suite = session.get_one(Suite, suite_id) 

310 component = session.get_one(Component, component_id) 

311 log_message = [suite.suite_name, "source", component.component_name] 

312 contents_writer = SourceContentsWriter(suite, component) 

313 contents_writer.write_file() 

314 session.close() 

315 return log_message 

316 

317 

318class ContentsWriter: 

319 """ 

320 Loop over all suites, architectures, overridetypes, and components to write 

321 all contents files. 

322 """ 

323 

324 logger: ClassVar["Logger"] 

325 

326 @classmethod 

327 def log_result(class_, result) -> None: 

328 """ 

329 Writes a result message to the logfile. 

330 """ 

331 class_.logger.log(list(result)) 

332 

333 @classmethod 

334 def write_all( 

335 class_, 

336 logger, 

337 archive_names: Collection[str] | None = None, 

338 suite_names: Collection[str] | None = None, 

339 component_names: Collection[str] | None = None, 

340 force=False, 

341 ): 

342 """ 

343 Writes all Contents files for suites in list suite_names which defaults 

344 to all 'touchable' suites if not specified explicitely. Untouchable 

345 suites will be included if the force argument is set to True. 

346 """ 

347 pool = DakProcessPool() 

348 class_.logger = logger 

349 session = DBConn().session() 

350 suite_query = session.query(Suite) 

351 if archive_names: 351 ↛ 355line 351 didn't jump to line 355 because the condition on line 351 was always true

352 suite_query = suite_query.join(Suite.archive).filter( 

353 Archive.archive_name.in_(archive_names) 

354 ) 

355 if suite_names: 

356 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) 

357 component_query = session.query(Component) 

358 if component_names: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 component_query = component_query.filter( 

360 Component.component_name.in_(component_names) 

361 ) 

362 components = component_query.all() 

363 if not force: 363 ↛ 365line 363 didn't jump to line 365 because the condition on line 363 was always true

364 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712 

365 deb_type = get_override_type("deb", session) 

366 assert deb_type is not None 

367 deb_id = deb_type.overridetype_id 

368 udeb_type = get_override_type("udeb", session) 

369 assert udeb_type is not None 

370 udeb_id = udeb_type.overridetype_id 

371 

372 # Lock tables so that nobody can change things underneath us 

373 session.execute(sql.text("LOCK TABLE bin_contents IN SHARE MODE")) 

374 session.execute(sql.text("LOCK TABLE src_contents IN SHARE MODE")) 

375 

376 for suite in suite_query: 

377 suite_id = suite.suite_id 

378 

379 skip_arch_all = True 

380 if suite.separate_contents_architecture_all: 

381 skip_arch_all = False 

382 

383 for component in (c for c in suite.components if c in components): 

384 component_id = component.component_id 

385 # handle source packages 

386 pool.apply_async( 

387 source_helper, (suite_id, component_id), callback=class_.log_result 

388 ) 

389 for architecture in suite.get_architectures( 

390 skipsrc=True, skipall=skip_arch_all 

391 ): 

392 arch_id = architecture.arch_id 

393 # handle 'deb' packages 

394 pool.apply_async( 

395 binary_helper, 

396 (suite_id, arch_id, deb_id, component_id), 

397 callback=class_.log_result, 

398 ) 

399 # handle 'udeb' packages 

400 pool.apply_async( 

401 binary_helper, 

402 (suite_id, arch_id, udeb_id, component_id), 

403 callback=class_.log_result, 

404 ) 

405 pool.close() 

406 pool.join() 

407 session.close() 

408 

409 

410class BinaryContentsScanner: 

411 """ 

412 BinaryContentsScanner provides a threadsafe method scan() to scan the 

413 contents of a DBBinary object. 

414 """ 

415 

416 def __init__(self, binary_id: int): 

417 """ 

418 The argument binary_id is the id of the DBBinary object that 

419 should be scanned. 

420 """ 

421 self.binary_id: int = binary_id 

422 

423 def scan(self) -> None: 

424 """ 

425 This method does the actual scan and fills in the associated BinContents 

426 property. It commits any changes to the database. The argument dummy_arg 

427 is ignored but needed by our threadpool implementation. 

428 """ 

429 session = DBConn().session() 

430 binary = session.get_one(DBBinary, self.binary_id) 

431 fileset = set(binary.scan_contents()) 

432 if len(fileset) == 0: 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true

433 fileset.add("EMPTY_PACKAGE") 

434 for filename in fileset: 

435 binary.contents.append(BinContents(file=filename)) 

436 session.commit() 

437 session.close() 

438 

439 @classmethod 

440 def scan_all(class_, limit=None): 

441 """ 

442 The class method scan_all() scans all binaries using multiple threads. 

443 The number of binaries to be scanned can be limited with the limit 

444 argument. Returns the number of processed and remaining packages as a 

445 dict. 

446 """ 

447 pool = DakProcessPool() 

448 session = DBConn().session() 

449 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711 

450 remaining = query.count 

451 if limit is not None: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 query = query.limit(limit) 

453 processed = query.count() 

454 for binary in query.yield_per(100): 

455 pool.apply_async( 

456 binary_scan_helper, 

457 (binary.binary_id, binary.package, binary.version, binary.arch_id), 

458 ) 

459 pool.close() 

460 pool.join() 

461 remaining_int = remaining() 

462 session.close() 

463 return {"processed": processed, "remaining": remaining_int} 

464 

465 

466def binary_scan_helper( 

467 binary_id: int, package: str, version: str, arch_id: int 

468) -> None: 

469 """ 

470 This function runs in a subprocess. 

471 """ 

472 try: 

473 scanner = BinaryContentsScanner(binary_id) 

474 scanner.scan() 

475 except Exception as e: 

476 print( 

477 "binary_scan_helper raised an exception while processing %s=%s (arch=%s): %s" 

478 % (package, version, arch_id, e) 

479 ) 

480 

481 

482class UnpackedSource: 

483 """ 

484 UnpackedSource extracts a source package into a temporary location and 

485 gives you some convinient function for accessing it. 

486 """ 

487 

488 def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None): 

489 """ 

490 The dscfilename is a name of a DSC file that will be extracted. 

491 """ 

492 basedir = tmpbasedir if tmpbasedir else Config()["Dir::TempPath"] 

493 temp_directory = mkdtemp(dir=basedir) 

494 self.root_directory: Optional[str] = os.path.join(temp_directory, "root") 

495 command = ( 

496 "dpkg-source", 

497 "--no-copy", 

498 "--no-check", 

499 "-q", 

500 "-x", 

501 dscfilename, 

502 self.root_directory, 

503 ) 

504 sandbox.run( 

505 command, 

506 sandbox=sandbox.Sandbox( 

507 extra_read_write_paths=[ 

508 temp_directory, 

509 os.environ.get("TMPDIR", "/tmp"), 

510 ], 

511 ), 

512 check=True, 

513 ) 

514 daklib.utils.remove_unsafe_symlinks(self.root_directory) 

515 

516 def get_root_directory(self) -> str: 

517 """ 

518 Returns the name of the package's root directory which is the directory 

519 where the debian subdirectory is located. 

520 """ 

521 assert self.root_directory is not None 

522 return self.root_directory 

523 

524 def get_all_filenames(self) -> Iterable[str]: 

525 """ 

526 Returns an iterator over all filenames. The filenames will be relative 

527 to the root directory. 

528 """ 

529 assert self.root_directory is not None 

530 skip = len(self.root_directory) + 1 

531 for root, _, files in os.walk(self.root_directory): 

532 for name in files: 

533 yield os.path.join(root[skip:], name) 

534 

535 def cleanup(self) -> None: 

536 """ 

537 Removes all temporary files. 

538 """ 

539 if self.root_directory is None: 

540 return 

541 parent_directory = os.path.dirname(self.root_directory) 

542 rmtree(parent_directory) 

543 self.root_directory = None 

544 

545 def __del__(self): 

546 """ 

547 Enforce cleanup. 

548 """ 

549 self.cleanup() 

550 

551 

552class SourceContentsScanner: 

553 """ 

554 SourceContentsScanner provides a method scan() to scan the contents of a 

555 DBSource object. 

556 """ 

557 

558 def __init__(self, source_id: int): 

559 """ 

560 The argument source_id is the id of the DBSource object that 

561 should be scanned. 

562 """ 

563 self.source_id: int = source_id 

564 

565 def scan(self) -> None: 

566 """ 

567 This method does the actual scan and fills in the associated SrcContents 

568 property. It commits any changes to the database. 

569 """ 

570 session = DBConn().session() 

571 source = session.get_one(DBSource, self.source_id) 

572 fileset = set(source.scan_contents()) 

573 for filename in fileset: 

574 source.contents.append(SrcContents(file=filename)) 

575 session.commit() 

576 session.close() 

577 

578 @classmethod 

579 def scan_all(class_, limit=None): 

580 """ 

581 The class method scan_all() scans all source using multiple processes. 

582 The number of sources to be scanned can be limited with the limit 

583 argument. Returns the number of processed and remaining packages as a 

584 dict. 

585 """ 

586 pool = DakProcessPool() 

587 session = DBConn().session() 

588 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711 

589 remaining = query.count 

590 if limit is not None: 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true

591 query = query.limit(limit) 

592 processed = query.count() 

593 for source in query.yield_per(100): 

594 pool.apply_async( 

595 source_scan_helper, (source.source_id, source.source, source.version) 

596 ) 

597 pool.close() 

598 pool.join() 

599 remaining_int = remaining() 

600 session.close() 

601 return {"processed": processed, "remaining": remaining_int} 

602 

603 

604def source_scan_helper(source_id: int, source: str, version: str) -> None: 

605 """ 

606 This function runs in a subprocess. 

607 """ 

608 try: 

609 scanner = SourceContentsScanner(source_id) 

610 scanner.scan() 

611 except Exception as e: 

612 print( 

613 "source_scan_helper raised an exception while processing %s=%s: %s" 

614 % (source, version, e) 

615 )