Coverage for daklib/contents.py: 95%

237 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1""" 

2Helper code for contents generation. 

3 

4@contact: Debian FTPMaster <ftpmaster@debian.org> 

5@copyright: 2011 Torsten Werner <twerner@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9################################################################################ 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27import os.path 

28import subprocess 

29from collections.abc import Collection, Iterable 

30from shutil import rmtree 

31from tempfile import mkdtemp 

32from typing import TYPE_CHECKING, ClassVar, Optional 

33 

34import sqlalchemy.sql as sql 

35 

36from daklib.config import Config 

37from daklib.dbconn import ( 

38 Architecture, 

39 Archive, 

40 BinContents, 

41 Component, 

42 DBBinary, 

43 DBConn, 

44 DBSource, 

45 OverrideType, 

46 SrcContents, 

47 Suite, 

48 get_architecture, 

49 get_override_type, 

50 get_suite, 

51) 

52from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter 

53 

54from .dakmultiprocessing import DakProcessPool 

55 

56if TYPE_CHECKING: 

57 from sqlalchemy.engine import Result 

58 

59 from daklib.daklog import Logger 

60 

61 

62class BinaryContentsWriter: 

63 """ 

64 BinaryContentsWriter writes the Contents-$arch.gz files. 

65 """ 

66 

67 def __init__( 

68 self, 

69 suite: Suite, 

70 architecture: Architecture, 

71 overridetype: OverrideType, 

72 component: Component, 

73 ) -> None: 

74 self.suite = suite 

75 self.architecture = architecture 

76 self.overridetype = overridetype 

77 self.component = component 

78 session = suite.session() 

79 assert session is not None 

80 self.session = session 

81 

82 def query(self) -> "Result[tuple[str, str]]": 

83 """ 

84 Returns a query object that is doing most of the work. 

85 """ 

86 overridesuite = ( 

87 get_suite(self.suite.overridesuite, self.session) 

88 if self.suite.overridesuite 

89 else self.suite 

90 ) 

91 assert overridesuite is not None 

92 params = { 

93 "suite": self.suite.suite_id, 

94 "overridesuite": overridesuite.suite_id, 

95 "component": self.component.component_id, 

96 "arch": self.architecture.arch_id, 

97 "type_id": self.overridetype.overridetype_id, 

98 "type": self.overridetype.overridetype, 

99 } 

100 

101 if self.suite.separate_contents_architecture_all: 

102 sql_arch_part = "architecture = :arch" 

103 else: 

104 sql_arch_part = "(architecture = :arch_all or architecture = :arch)" 

105 arch_all = get_architecture("all", self.session) 

106 assert arch_all is not None 

107 params["arch_all"] = arch_all.arch_id 

108 

109 sql_create_temp = ( 

110 """ 

111create temp table newest_binaries ( 

112 id integer primary key, 

113 package text); 

114 

115create index newest_binaries_by_package on newest_binaries (package); 

116 

117insert into newest_binaries (id, package) 

118 select distinct on (package) id, package from binaries 

119 where type = :type and 

120 %s and 

121 id in (select bin from bin_associations where suite = :suite) 

122 order by package, version desc;""" 

123 % sql_arch_part 

124 ) 

125 self.session.execute(sql.text(sql_create_temp), params=params) 

126 

127 query = sql.text( 

128 """ 

129with 

130 

131unique_override as 

132 (select o.package, s.section 

133 from override o, section s 

134 where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and 

135 o.component = :component) 

136 

137select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist 

138 from newest_binaries b, bin_contents bc, unique_override o 

139 where b.id = bc.binary_id and o.package = b.package 

140 group by bc.file""" 

141 ) 

142 

143 return self.session.execute(query, params=params) 

144 

145 def formatline(self, filename: str, package_list: str) -> str: 

146 """ 

147 Returns a formatted string for the filename argument. 

148 """ 

149 return "%-55s %s\n" % (filename, package_list) 

150 

151 def fetch(self) -> Iterable[str]: 

152 """ 

153 Yields a new line of the Contents-$arch.gz file in filename order. 

154 """ 

155 for filename, package_list in self.query().yield_per(100): 

156 yield self.formatline(filename, package_list) 

157 # end transaction to return connection to pool 

158 self.session.rollback() 

159 

160 def get_list(self) -> list[str]: 

161 """ 

162 Returns a list of lines for the Contents-$arch.gz file. 

163 """ 

164 return [item for item in self.fetch()] 

165 

166 def writer(self) -> BinaryContentsFileWriter: 

167 """ 

168 Returns a writer object. 

169 """ 

170 values = { 

171 "archive": self.suite.archive.path, 

172 "suite": self.suite.suite_name, 

173 "component": self.component.component_name, 

174 "debtype": self.overridetype.overridetype, 

175 "architecture": self.architecture.arch_string, 

176 } 

177 return BinaryContentsFileWriter(**values) 

178 

179 def write_file(self) -> None: 

180 """ 

181 Write the output file. 

182 """ 

183 writer = self.writer() 

184 file = writer.open() 

185 for item in self.fetch(): 

186 file.write(item) 

187 writer.close() 

188 

189 

190class SourceContentsWriter: 

191 """ 

192 SourceContentsWriter writes the Contents-source.gz files. 

193 """ 

194 

195 def __init__(self, suite: Suite, component: Component): 

196 self.suite = suite 

197 self.component = component 

198 session = suite.session() 

199 assert session is not None 

200 self.session = session 

201 

202 def query(self) -> "Result[tuple[str, str]]": 

203 """ 

204 Returns a query object that is doing most of the work. 

205 """ 

206 params = { 

207 "suite_id": self.suite.suite_id, 

208 "component_id": self.component.component_id, 

209 } 

210 

211 sql_create_temp = """ 

212create temp table newest_sources ( 

213 id integer primary key, 

214 source text); 

215 

216create index sources_binaries_by_source on newest_sources (source); 

217 

218insert into newest_sources (id, source) 

219 select distinct on (source) s.id, s.source from source s 

220 join files_archive_map af on s.file = af.file_id 

221 where s.id in (select source from src_associations where suite = :suite_id) 

222 and af.component_id = :component_id 

223 order by source, version desc;""" 

224 self.session.execute(sql.text(sql_create_temp), params=params) 

225 

226 query = sql.text( 

227 """ 

228select sc.file, string_agg(s.source, ',' order by s.source) as pkglist 

229 from newest_sources s, src_contents sc 

230 where s.id = sc.source_id group by sc.file""" 

231 ) 

232 

233 return self.session.execute(query, params=params) 

234 

235 def formatline(self, filename: str, package_list: str) -> str: 

236 """ 

237 Returns a formatted string for the filename argument. 

238 """ 

239 return "%s\t%s\n" % (filename, package_list) 

240 

241 def fetch(self) -> Iterable[str]: 

242 """ 

243 Yields a new line of the Contents-source.gz file in filename order. 

244 """ 

245 for filename, package_list in self.query().yield_per(100): 

246 yield self.formatline(filename, package_list) 

247 # end transaction to return connection to pool 

248 self.session.rollback() 

249 

250 def get_list(self) -> list[str]: 

251 """ 

252 Returns a list of lines for the Contents-source.gz file. 

253 """ 

254 return [item for item in self.fetch()] 

255 

256 def writer(self) -> SourceContentsFileWriter: 

257 """ 

258 Returns a writer object. 

259 """ 

260 values = { 

261 "archive": self.suite.archive.path, 

262 "suite": self.suite.suite_name, 

263 "component": self.component.component_name, 

264 } 

265 return SourceContentsFileWriter(**values) 

266 

267 def write_file(self) -> None: 

268 """ 

269 Write the output file. 

270 """ 

271 writer = self.writer() 

272 file = writer.open() 

273 for item in self.fetch(): 

274 file.write(item) 

275 writer.close() 

276 

277 

278def binary_helper( 

279 suite_id: int, arch_id: int, overridetype_id: int, component_id: int 

280) -> list[str]: 

281 """ 

282 This function is called in a new subprocess and multiprocessing wants a top 

283 level function. 

284 """ 

285 session = DBConn().session(work_mem=1000) 

286 suite = session.get_one(Suite, suite_id) 

287 architecture = session.get_one(Architecture, arch_id) 

288 overridetype = session.get_one(OverrideType, overridetype_id) 

289 component = session.get_one(Component, component_id) 

290 log_message = [ 

291 suite.suite_name, 

292 architecture.arch_string, 

293 overridetype.overridetype, 

294 component.component_name, 

295 ] 

296 contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component) 

297 contents_writer.write_file() 

298 session.close() 

299 return log_message 

300 

301 

302def source_helper(suite_id: int, component_id: int) -> list[str]: 

303 """ 

304 This function is called in a new subprocess and multiprocessing wants a top 

305 level function. 

306 """ 

307 session = DBConn().session(work_mem=1000) 

308 suite = session.get_one(Suite, suite_id) 

309 component = session.get_one(Component, component_id) 

310 log_message = [suite.suite_name, "source", component.component_name] 

311 contents_writer = SourceContentsWriter(suite, component) 

312 contents_writer.write_file() 

313 session.close() 

314 return log_message 

315 

316 

317class ContentsWriter: 

318 """ 

319 Loop over all suites, architectures, overridetypes, and components to write 

320 all contents files. 

321 """ 

322 

323 logger: ClassVar["Logger"] 

324 

325 @classmethod 

326 def log_result(class_, result) -> None: 

327 """ 

328 Writes a result message to the logfile. 

329 """ 

330 class_.logger.log(list(result)) 

331 

332 @classmethod 

333 def write_all( 

334 class_, 

335 logger, 

336 archive_names: Collection[str] | None = None, 

337 suite_names: Collection[str] | None = None, 

338 component_names: Collection[str] | None = None, 

339 force=False, 

340 ): 

341 """ 

342 Writes all Contents files for suites in list suite_names which defaults 

343 to all 'touchable' suites if not specified explicitely. Untouchable 

344 suites will be included if the force argument is set to True. 

345 """ 

346 pool = DakProcessPool() 

347 class_.logger = logger 

348 session = DBConn().session() 

349 suite_query = session.query(Suite) 

350 if archive_names: 350 ↛ 354line 350 didn't jump to line 354 because the condition on line 350 was always true

351 suite_query = suite_query.join(Suite.archive).filter( 

352 Archive.archive_name.in_(archive_names) 

353 ) 

354 if suite_names: 

355 suite_query = suite_query.filter(Suite.suite_name.in_(suite_names)) 

356 component_query = session.query(Component) 

357 if component_names: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 component_query = component_query.filter( 

359 Component.component_name.in_(component_names) 

360 ) 

361 components = component_query.all() 

362 if not force: 362 ↛ 364line 362 didn't jump to line 364 because the condition on line 362 was always true

363 suite_query = suite_query.filter(Suite.untouchable == False) # noqa:E712 

364 deb_type = get_override_type("deb", session) 

365 assert deb_type is not None 

366 deb_id = deb_type.overridetype_id 

367 udeb_type = get_override_type("udeb", session) 

368 assert udeb_type is not None 

369 udeb_id = udeb_type.overridetype_id 

370 

371 # Lock tables so that nobody can change things underneath us 

372 session.execute(sql.text("LOCK TABLE bin_contents IN SHARE MODE")) 

373 session.execute(sql.text("LOCK TABLE src_contents IN SHARE MODE")) 

374 

375 for suite in suite_query: 

376 suite_id = suite.suite_id 

377 

378 skip_arch_all = True 

379 if suite.separate_contents_architecture_all: 

380 skip_arch_all = False 

381 

382 for component in (c for c in suite.components if c in components): 

383 component_id = component.component_id 

384 # handle source packages 

385 pool.apply_async( 

386 source_helper, (suite_id, component_id), callback=class_.log_result 

387 ) 

388 for architecture in suite.get_architectures( 

389 skipsrc=True, skipall=skip_arch_all 

390 ): 

391 arch_id = architecture.arch_id 

392 # handle 'deb' packages 

393 pool.apply_async( 

394 binary_helper, 

395 (suite_id, arch_id, deb_id, component_id), 

396 callback=class_.log_result, 

397 ) 

398 # handle 'udeb' packages 

399 pool.apply_async( 

400 binary_helper, 

401 (suite_id, arch_id, udeb_id, component_id), 

402 callback=class_.log_result, 

403 ) 

404 pool.close() 

405 pool.join() 

406 session.close() 

407 

408 

409class BinaryContentsScanner: 

410 """ 

411 BinaryContentsScanner provides a threadsafe method scan() to scan the 

412 contents of a DBBinary object. 

413 """ 

414 

415 def __init__(self, binary_id: int): 

416 """ 

417 The argument binary_id is the id of the DBBinary object that 

418 should be scanned. 

419 """ 

420 self.binary_id: int = binary_id 

421 

422 def scan(self) -> None: 

423 """ 

424 This method does the actual scan and fills in the associated BinContents 

425 property. It commits any changes to the database. The argument dummy_arg 

426 is ignored but needed by our threadpool implementation. 

427 """ 

428 session = DBConn().session() 

429 binary = session.get_one(DBBinary, self.binary_id) 

430 fileset = set(binary.scan_contents()) 

431 if len(fileset) == 0: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true

432 fileset.add("EMPTY_PACKAGE") 

433 for filename in fileset: 

434 binary.contents.append(BinContents(file=filename)) 

435 session.commit() 

436 session.close() 

437 

438 @classmethod 

439 def scan_all(class_, limit=None): 

440 """ 

441 The class method scan_all() scans all binaries using multiple threads. 

442 The number of binaries to be scanned can be limited with the limit 

443 argument. Returns the number of processed and remaining packages as a 

444 dict. 

445 """ 

446 pool = DakProcessPool() 

447 session = DBConn().session() 

448 query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711 

449 remaining = query.count 

450 if limit is not None: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true

451 query = query.limit(limit) 

452 processed = query.count() 

453 for binary in query.yield_per(100): 

454 pool.apply_async(binary_scan_helper, (binary.binary_id,)) 

455 pool.close() 

456 pool.join() 

457 remaining_int = remaining() 

458 session.close() 

459 return {"processed": processed, "remaining": remaining_int} 

460 

461 

462def binary_scan_helper(binary_id: int) -> None: 

463 """ 

464 This function runs in a subprocess. 

465 """ 

466 try: 

467 scanner = BinaryContentsScanner(binary_id) 

468 scanner.scan() 

469 except Exception as e: 

470 print("binary_scan_helper raised an exception: %s" % (e)) 

471 

472 

473class UnpackedSource: 

474 """ 

475 UnpackedSource extracts a source package into a temporary location and 

476 gives you some convinient function for accessing it. 

477 """ 

478 

479 def __init__(self, dscfilename: str, tmpbasedir: Optional[str] = None): 

480 """ 

481 The dscfilename is a name of a DSC file that will be extracted. 

482 """ 

483 basedir = tmpbasedir if tmpbasedir else Config()["Dir::TempPath"] 

484 temp_directory = mkdtemp(dir=basedir) 

485 self.root_directory: Optional[str] = os.path.join(temp_directory, "root") 

486 command = ( 

487 "dpkg-source", 

488 "--no-copy", 

489 "--no-check", 

490 "-q", 

491 "-x", 

492 dscfilename, 

493 self.root_directory, 

494 ) 

495 subprocess.check_call(command) 

496 

497 def get_root_directory(self) -> str: 

498 """ 

499 Returns the name of the package's root directory which is the directory 

500 where the debian subdirectory is located. 

501 """ 

502 assert self.root_directory is not None 

503 return self.root_directory 

504 

505 def get_all_filenames(self) -> Iterable[str]: 

506 """ 

507 Returns an iterator over all filenames. The filenames will be relative 

508 to the root directory. 

509 """ 

510 assert self.root_directory is not None 

511 skip = len(self.root_directory) + 1 

512 for root, _, files in os.walk(self.root_directory): 

513 for name in files: 

514 yield os.path.join(root[skip:], name) 

515 

516 def cleanup(self) -> None: 

517 """ 

518 Removes all temporary files. 

519 """ 

520 if self.root_directory is None: 

521 return 

522 parent_directory = os.path.dirname(self.root_directory) 

523 rmtree(parent_directory) 

524 self.root_directory = None 

525 

526 def __del__(self): 

527 """ 

528 Enforce cleanup. 

529 """ 

530 self.cleanup() 

531 

532 

533class SourceContentsScanner: 

534 """ 

535 SourceContentsScanner provides a method scan() to scan the contents of a 

536 DBSource object. 

537 """ 

538 

539 def __init__(self, source_id: int): 

540 """ 

541 The argument source_id is the id of the DBSource object that 

542 should be scanned. 

543 """ 

544 self.source_id: int = source_id 

545 

546 def scan(self) -> None: 

547 """ 

548 This method does the actual scan and fills in the associated SrcContents 

549 property. It commits any changes to the database. 

550 """ 

551 session = DBConn().session() 

552 source = session.get_one(DBSource, self.source_id) 

553 fileset = set(source.scan_contents()) 

554 for filename in fileset: 

555 source.contents.append(SrcContents(file=filename)) 

556 session.commit() 

557 session.close() 

558 

559 @classmethod 

560 def scan_all(class_, limit=None): 

561 """ 

562 The class method scan_all() scans all source using multiple processes. 

563 The number of sources to be scanned can be limited with the limit 

564 argument. Returns the number of processed and remaining packages as a 

565 dict. 

566 """ 

567 pool = DakProcessPool() 

568 session = DBConn().session() 

569 query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711 

570 remaining = query.count 

571 if limit is not None: 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true

572 query = query.limit(limit) 

573 processed = query.count() 

574 for source in query.yield_per(100): 

575 pool.apply_async(source_scan_helper, (source.source_id,)) 

576 pool.close() 

577 pool.join() 

578 remaining_int = remaining() 

579 session.close() 

580 return {"processed": processed, "remaining": remaining_int} 

581 

582 

583def source_scan_helper(source_id: int) -> None: 

584 """ 

585 This function runs in a subprocess. 

586 """ 

587 try: 

588 scanner = SourceContentsScanner(source_id) 

589 scanner.scan() 

590 except Exception as e: 

591 print("source_scan_helper raised an exception: %s" % (e))