Coverage for daklib/dbconn.py: 87%

1553 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1"""DB access class 

2 

3@contact: Debian FTPMaster <ftpmaster@debian.org> 

4@copyright: 2000, 2001, 2002, 2003, 2004, 2006 James Troup <james@nocrew.org> 

5@copyright: 2008-2009 Mark Hymers <mhy@debian.org> 

6@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org> 

7@copyright: 2009 Mike O'Connor <stew@debian.org> 

8@license: GNU General Public License version 2 or later 

9""" 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27# < mhy> I need a funny comment 

28# < sgran> two peanuts were walking down a dark street 

29# < sgran> one was a-salted 

30# * mhy looks up the definition of "funny" 

31 

32################################################################################ 

33 

34import functools 

35import inspect 

36import os 

37import re 

38import subprocess 

39import warnings 

40from collections.abc import Callable, Iterable 

41from datetime import datetime, timedelta 

42from os.path import normpath 

43from tarfile import TarFile 

44from typing import TYPE_CHECKING, Any, Optional, TypedDict, Union, overload, override 

45 

46import apt_pkg 

47import sqlalchemy 

48import sqlalchemy.types 

49from debian.deb822 import Deb822 

50from sqlalchemy import ( 

51 Column, 

52 ForeignKey, 

53 Table, 

54 UniqueConstraint, 

55 create_engine, 

56 desc, 

57 sql, 

58) 

59from sqlalchemy.dialects.postgresql import ARRAY 

60 

61# Don't remove this, we re-export the exceptions to scripts which import us 

62from sqlalchemy.exc import IntegrityError, OperationalError, SAWarning, SQLAlchemyError 

63from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy 

64from sqlalchemy.orm import ( 

65 DynamicMapped, 

66 Mapped, 

67 attribute_keyed_dict, 

68 mapped_column, 

69 object_session, 

70 relationship, 

71 sessionmaker, 

72) 

73from sqlalchemy.orm.exc import NoResultFound 

74from sqlalchemy.sql import func 

75from sqlalchemy.types import CHAR, BigInteger, Boolean, DateTime, Integer, String, Text 

76 

77import daklib.gpg 

78 

79from .aptversion import AptVersion 

80 

81# Only import Config until Queue stuff is changed to store its config 

82# in the database 

83from .config import Config 

84from .textutils import fix_maintainer 

85 

86# suppress some deprecation warnings in squeeze related to sqlalchemy 

87warnings.filterwarnings( 

88 "ignore", "Predicate of partial index .* ignored during reflection", SAWarning 

89) 

90 

91# (Debian 12 "bookworm") Silence warning targeted at SQLAlchemy dialect maintainers 

92warnings.filterwarnings( 

93 "ignore", 

94 "Dialect postgresql:psycopg2 will not make use of SQL compilation caching.*", 

95 SAWarning, 

96) 

97 

98from .database.base import Base, BaseTimestamp 

99 

100if TYPE_CHECKING: 

101 import sqlalchemy.orm.query 

102 from sqlalchemy.orm import Session 

103 

104 

105################################################################################ 

106 

107 

108class DebVersion(sqlalchemy.types.UserDefinedType): 

109 @override 

110 def get_col_spec(self, **kw: Any) -> str: 

111 return "DEBVERSION" 

112 

113 @override 

114 def bind_processor(self, dialect): 

115 return None 

116 

117 @override 

118 def result_processor(self, dialect, coltype): 

119 return None 

120 

121 

122################################################################################ 

123 

124__all__ = ["IntegrityError", "SQLAlchemyError", "DebVersion"] 

125 

126################################################################################ 

127 

128 

129def session_wrapper[**P, R](fn: Callable[P, R]) -> Callable[P, R]: 

130 """ 

131 Wrapper around common ".., session=None):" handling. If the wrapped 

132 function is called without passing 'session', we create a local one 

133 and destroy it when the function ends. 

134 

135 Also attaches a commit_or_flush method to the session; if we created a 

136 local session, this is a synonym for session.commit(), otherwise it is a 

137 synonym for session.flush(). 

138 """ 

139 

140 @functools.wraps(fn) 

141 def wrapped(*args, **kwargs): 

142 private_transaction = False 

143 

144 # Find the session object 

145 session = kwargs.get("session") 

146 

147 if session is None: 

148 if len(args) < len(inspect.getfullargspec(fn).args): 

149 # No session specified as last argument or in kwargs 

150 private_transaction = True 

151 kwargs["session"] = session = DBConn().session() 

152 else: 

153 # Session is last argument in args 

154 session = args[-1] 

155 if session is None: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 session = DBConn().session() 

157 args = (*args[:-1], session) 

158 private_transaction = True 

159 

160 if private_transaction: 

161 session.commit_or_flush = session.commit # type: ignore[union-attr] 

162 else: 

163 session.commit_or_flush = session.flush # type: ignore[union-attr] 

164 

165 try: 

166 return fn(*args, **kwargs) 

167 finally: 

168 if private_transaction: 

169 # We created a session; close it. 

170 session.close() 

171 

172 return wrapped 

173 

174 

175__all__.append("session_wrapper") 

176 

177################################################################################ 

178 

179 

180class ORMObject(Base): 

181 """ 

182 ORMObject is a base class for all ORM classes mapped by SQLalchemy. All 

183 derived classes must implement the properties() method. 

184 """ 

185 

186 __abstract__ = True 

187 

188 def properties(self) -> list[str]: 

189 """ 

190 This method should be implemented by all derived classes and returns a 

191 list of the important properties. The properties 'created' and 

192 'modified' will be added automatically. A suffix '_count' should be 

193 added to properties that are lists or query objects. The most important 

194 property name should be returned as the first element in the list 

195 because it is used by repr(). 

196 """ 

197 return [] 

198 

199 def classname(self) -> str: 

200 """ 

201 Returns the name of the class. 

202 """ 

203 return type(self).__name__ 

204 

205 @override 

206 def __repr__(self): 

207 """ 

208 Returns a short string representation of the object using the first 

209 element from the properties() method. 

210 """ 

211 primary_property = self.properties()[0] 

212 value = getattr(self, primary_property) 

213 return "<%s %s>" % (self.classname(), str(value)) 

214 

215 @override 

216 def __str__(self): 

217 """ 

218 Returns a human readable form of the object using the properties() 

219 method. 

220 """ 

221 return "<%s(...)>" % (self.classname()) 

222 

223 @classmethod 

224 @session_wrapper 

225 def get(cls, primary_key, session: "Session | None" = None): 

226 """ 

227 This is a support function that allows getting an object by its primary 

228 key. 

229 

230 Architecture.get(3[, session]) 

231 

232 instead of the more verbose 

233 

234 session.query(Architecture).get(3) 

235 """ 

236 assert session is not None 

237 return session.query(cls).get(primary_key) 

238 

239 def session(self) -> "Session | None": 

240 """ 

241 Returns the current session that is associated with the object. May 

242 return None is object is in detached state. 

243 """ 

244 

245 return object_session(self) 

246 

247 

248__all__.append("ORMObject") 

249 

250################################################################################ 

251 

252 

253class ACL(ORMObject): 

254 __tablename__ = "acl" 

255 

256 id: Mapped[int] = mapped_column(primary_key=True) 

257 name: Mapped[str] = mapped_column() # TODO: add unique=True 

258 is_global: Mapped[bool] = mapped_column(default=False) 

259 match_fingerprint: Mapped[bool] = mapped_column(default=False) 

260 match_keyring_id: Mapped[int | None] = mapped_column(ForeignKey("keyrings.id")) 

261 allow_new: Mapped[bool] = mapped_column(default=False) 

262 allow_source: Mapped[bool] = mapped_column(default=False) 

263 allow_binary: Mapped[bool] = mapped_column(default=False) 

264 allow_binary_all: Mapped[bool] = mapped_column(default=False) 

265 allow_binary_only: Mapped[bool] = mapped_column(default=False) 

266 allow_hijack: Mapped[bool] = mapped_column(default=False) 

267 allow_per_source: Mapped[bool] = mapped_column(default=False) 

268 deny_per_source: Mapped[bool] = mapped_column(default=False) 

269 

270 architectures: Mapped[set["Architecture"]] = relationship( 

271 secondary="acl_architecture_map" 

272 ) 

273 fingerprints: Mapped[set["Fingerprint"]] = relationship( 

274 secondary="acl_fingerprint_map" 

275 ) 

276 match_keyring: Mapped["Keyring"] = relationship(foreign_keys=[match_keyring_id]) 

277 per_source: Mapped[set["ACLPerSource"]] = relationship(back_populates="acl") 

278 per_suite: Mapped[set["ACLPerSuite"]] = relationship(back_populates="acl") 

279 

280 @override 

281 def __repr__(self): 

282 return "<ACL {0}>".format(self.name) 

283 

284 

285__all__.append("ACL") 

286 

287 

288class AclArchitectureMap(Base): 

289 __tablename__ = "acl_architecture_map" 

290 

291 acl_id: Mapped[int] = mapped_column( 

292 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True 

293 ) 

294 architecture_id: Mapped[int] = mapped_column( 

295 ForeignKey("architecture.id", ondelete="CASCADE"), primary_key=True 

296 ) 

297 

298 

299class AclFingerprintMap(Base): 

300 __tablename__ = "acl_fingerprint_map" 

301 

302 acl_id: Mapped[int] = mapped_column( 

303 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True 

304 ) 

305 fingerprint_id: Mapped[int] = mapped_column( 

306 ForeignKey("fingerprint.id", ondelete="CASCADE"), primary_key=True 

307 ) 

308 

309 

310class ACLPerSource(ORMObject): 

311 __tablename__ = "acl_per_source" 

312 

313 acl_id: Mapped[int] = mapped_column( 

314 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True 

315 ) 

316 fingerprint_id: Mapped[int] = mapped_column( 

317 ForeignKey("fingerprint.id", ondelete="CASCADE"), primary_key=True 

318 ) 

319 source: Mapped[str] = mapped_column(primary_key=True) 

320 reason: Mapped[str | None] = mapped_column(default=None) 

321 created_by_id: Mapped[int | None] = mapped_column(ForeignKey("fingerprint.id")) 

322 created: Mapped[datetime] = mapped_column( 

323 DateTime(timezone=False), server_default=func.now() 

324 ) 

325 

326 acl: Mapped[ACL] = relationship(back_populates="per_source") 

327 fingerprint: Mapped["Fingerprint"] = relationship(foreign_keys=[fingerprint_id]) 

328 created_by: Mapped["Fingerprint"] = relationship(foreign_keys=[created_by_id]) 

329 

330 @override 

331 def __repr__(self): 

332 return "<ACLPerSource acl={0} fingerprint={1} source={2} reason={3}>".format( 

333 self.acl.name, self.fingerprint.fingerprint, self.source, self.reason 

334 ) 

335 

336 

337__all__.append("ACLPerSource") 

338 

339 

340class ACLPerSuite(ORMObject): 

341 __tablename__ = "acl_per_suite" 

342 

343 acl_id: Mapped[int] = mapped_column( 

344 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True 

345 ) 

346 fingerprint_id: Mapped[int] = mapped_column( 

347 ForeignKey("fingerprint.id", ondelete="CASCADE"), primary_key=True 

348 ) 

349 suite_id: Mapped[int] = mapped_column( 

350 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True 

351 ) 

352 reason: Mapped[str | None] = mapped_column(default=None) 

353 created_by_id: Mapped[int | None] = mapped_column(ForeignKey("fingerprint.id")) 

354 created: Mapped[datetime] = mapped_column( 

355 DateTime(timezone=False), server_default=func.now() 

356 ) 

357 

358 acl: Mapped[ACL] = relationship(back_populates="per_suite") 

359 fingerprint: Mapped["Fingerprint"] = relationship(foreign_keys=[fingerprint_id]) 

360 suite: Mapped["Suite"] = relationship() 

361 created_by: Mapped["Fingerprint"] = relationship(foreign_keys=[created_by_id]) 

362 

363 @override 

364 def __repr__(self): 

365 return "<ACLPerSuite acl={0} fingerprint={1} suite={2} reason={3}>".format( 

366 self.acl.name, 

367 self.fingerprint.fingerprint, 

368 self.suite.suite_name, 

369 self.reason, 

370 ) 

371 

372 

373__all__.append("ACLPerSuite") 

374 

375################################################################################ 

376 

377 

378class Architecture(BaseTimestamp): 

379 __tablename__ = "architecture" 

380 

381 arch_id: Mapped[int] = mapped_column("id", Integer(), primary_key=True) 

382 arch_string: Mapped[str] = mapped_column(Text(), unique=True) 

383 description: Mapped[str] 

384 

385 suites: Mapped[list["Suite"]] = relationship( 

386 secondary="suite_architectures", back_populates="architectures" 

387 ) 

388 

389 def __init__(self, arch_string=None, description=None): 

390 self.arch_string = arch_string 

391 self.description = description 

392 

393 @override 

394 def __str__(self): 

395 return self.arch_string 

396 

397 @override 

398 def __repr__(self): 

399 return "<{} {}>".format( 

400 self.__class__.__name__, 

401 self.arch_string, 

402 ) 

403 

404 @override 

405 def __eq__(self, val): 

406 if isinstance(val, str): 

407 warnings.warn( 

408 "comparison with a `str` is deprecated", 

409 DeprecationWarning, 

410 stacklevel=2, 

411 ) 

412 return self.arch_string == val 

413 # This signals to use the normal comparison operator 

414 return NotImplemented 

415 

416 @override 

417 def __ne__(self, val): 

418 if isinstance(val, str): 418 ↛ 426line 418 didn't jump to line 426 because the condition on line 418 was always true

419 warnings.warn( 

420 "comparison with a `str` is deprecated", 

421 DeprecationWarning, 

422 stacklevel=2, 

423 ) 

424 return self.arch_string != val 

425 # This signals to use the normal comparison operator 

426 return NotImplemented 

427 

428 __hash__ = BaseTimestamp.__hash__ 

429 

430 

431__all__.append("Architecture") 

432 

433 

434@session_wrapper 

435def get_architecture( 

436 architecture: str, session: "Session | None" = None 

437) -> Optional[Architecture]: 

438 """ 

439 Returns database id for given `architecture`. 

440 

441 :param architecture: The name of the architecture 

442 :param session: Optional SQLA session object (a temporary one will be 

443 generated if not supplied) 

444 :return: Architecture object for the given arch (None if not present) 

445 """ 

446 

447 assert session is not None 

448 q = session.query(Architecture).filter_by(arch_string=architecture) 

449 return q.one_or_none() 

450 

451 

452__all__.append("get_architecture") 

453 

454################################################################################ 

455 

456 

457class Archive(Base): 

458 __tablename__ = "archive" 

459 

460 archive_id: Mapped[int] = mapped_column("id", primary_key=True) 

461 archive_name: Mapped[str] = mapped_column("name", unique=True) 

462 description: Mapped[str | None] = mapped_column(default=None) 

463 created: Mapped[datetime] = mapped_column( 

464 DateTime(timezone=True), server_default=func.now() 

465 ) 

466 modified: Mapped[datetime] = mapped_column( 

467 DateTime(timezone=True), server_default=func.now() 

468 ) 

469 primary_mirror: Mapped[str | None] = mapped_column(default=None) 

470 path: Mapped[str] 

471 mode: Mapped[str] = mapped_column(CHAR(length=4), default="0644") 

472 tainted: Mapped[bool] = mapped_column(default=False) 

473 use_morgue: Mapped[bool] = mapped_column(default=True) 

474 stayofexecution: Mapped[timedelta] = mapped_column(default=timedelta(hours=36)) 

475 changelog: Mapped[str | None] 

476 

477 files: Mapped[list["ArchiveFile"]] = relationship(back_populates="archive") 

478 suites: Mapped[list["Suite"]] = relationship(back_populates="archive") 

479 

480 def __init__(self, *args, **kwargs): 

481 pass 

482 

483 @override 

484 def __repr__(self): 

485 return "<Archive %s>" % self.archive_name 

486 

487 

488__all__.append("Archive") 

489 

490 

491@session_wrapper 

492def get_archive(archive: str, session: "Session | None" = None) -> Optional[Archive]: 

493 """ 

494 returns database id for given `archive`. 

495 

496 :param archive: the name of the arhive 

497 :param session: Optional SQLA session object (a temporary one will be 

498 generated if not supplied) 

499 :return: Archive object for the given name (None if not present) 

500 """ 

501 

502 assert session is not None 

503 archive = archive.lower() 

504 q = session.query(Archive).filter_by(archive_name=archive) 

505 return q.one_or_none() 

506 

507 

508__all__.append("get_archive") 

509 

510################################################################################ 

511 

512 

513class ArchiveFile(Base): 

514 __tablename__ = "files_archive_map" 

515 

516 file_id: Mapped[int] = mapped_column(ForeignKey("files.id"), primary_key=True) 

517 archive_id: Mapped[int] = mapped_column(ForeignKey("archive.id"), primary_key=True) 

518 component_id: Mapped[int] = mapped_column( 

519 ForeignKey("component.id"), primary_key=True 

520 ) 

521 last_used: Mapped[datetime | None] = mapped_column( 

522 DateTime(timezone=False), default=None 

523 ) 

524 created: Mapped[datetime] = mapped_column( 

525 DateTime(timezone=False), server_default=func.now() 

526 ) 

527 

528 archive: Mapped[Archive] = relationship(back_populates="files") 

529 component: Mapped["Component"] = relationship() 

530 file: Mapped["PoolFile"] = relationship(back_populates="archives") 

531 

532 def __init__(self, archive=None, component=None, file=None): 

533 self.archive = archive 

534 self.component = component 

535 self.file = file 

536 

537 @property 

538 def path(self): 

539 return os.path.join( 

540 self.archive.path, "pool", self.component.component_name, self.file.filename 

541 ) 

542 

543 

544__all__.append("ArchiveFile") 

545 

546################################################################################ 

547 

548 

549class BinContents(ORMObject): 

550 __tablename__ = "bin_contents" 

551 

552 binary_id: Mapped[int] = mapped_column( 

553 ForeignKey("binaries.id", ondelete="CASCADE"), primary_key=True 

554 ) 

555 file: Mapped[str] = mapped_column(primary_key=True) 

556 

557 binary: Mapped["DBBinary"] = relationship(back_populates="contents") 

558 

559 def __init__(self, file=None, binary=None): 

560 self.file = file 

561 self.binary = binary 

562 

563 @override 

564 def properties(self) -> list[str]: 

565 return ["file", "binary"] 

566 

567 

568__all__.append("BinContents") 

569 

570################################################################################ 

571 

572 

573class DBBinary(ORMObject): 

574 __tablename__ = "binaries" 

575 __table_args = ( 

576 UniqueConstraint( 

577 "package", "version", "architecture", name="binaries_package_key" 

578 ), 

579 ) 

580 

581 binary_id: Mapped[int] = mapped_column("id", primary_key=True) 

582 package: Mapped[str] 

583 version: Mapped[str] = mapped_column(DebVersion()) 

584 maintainer_id: Mapped[int] = mapped_column( 

585 "maintainer", ForeignKey("maintainer.id") 

586 ) 

587 source_id: Mapped[int] = mapped_column("source", ForeignKey("source.id")) 

588 arch_id: Mapped[int] = mapped_column("architecture", ForeignKey("architecture.id")) 

589 poolfile_id: Mapped[int] = mapped_column( 

590 "file", ForeignKey("files.id"), unique=True 

591 ) 

592 binarytype: Mapped[str] = mapped_column("type") 

593 fingerprint_id: Mapped[int | None] = mapped_column( 

594 "sig_fpr", ForeignKey("fingerprint.id") 

595 ) 

596 install_date: Mapped[datetime | None] = mapped_column( 

597 DateTime(timezone=True), server_default=func.now() 

598 ) 

599 created: Mapped[datetime | None] = mapped_column( 

600 DateTime(timezone=True), server_default=func.now() 

601 ) 

602 modified: Mapped[datetime | None] = mapped_column( 

603 DateTime(timezone=True), server_default=func.now() 

604 ) 

605 stanza: Mapped[str] 

606 authorized_by_fingerprint_id: Mapped[int | None] = mapped_column( 

607 ForeignKey("fingerprint.id") 

608 ) 

609 

610 maintainer: Mapped["Maintainer"] = relationship() 

611 source: Mapped["DBSource"] = relationship(back_populates="binaries") 

612 architecture: Mapped[Architecture] = relationship() 

613 poolfile: Mapped["PoolFile"] = relationship() 

614 fingerprint: Mapped["Fingerprint"] = relationship(foreign_keys=[fingerprint_id]) 

615 authorized_by_fingerprint: Mapped["Fingerprint"] = relationship( 

616 foreign_keys=[authorized_by_fingerprint_id] 

617 ) 

618 suites: Mapped[list["Suite"]] = relationship( 

619 secondary="bin_associations", back_populates="binaries" 

620 ) 

621 extra_sources: Mapped[list["DBSource"]] = relationship( 

622 secondary="extra_src_references", back_populates="extra_binary_references" 

623 ) 

624 key: Mapped[dict[str, "BinaryMetadata"]] = relationship( 

625 collection_class=attribute_keyed_dict("key"), cascade="all" 

626 ) 

627 contents: DynamicMapped["BinContents"] = relationship( 

628 cascade="all", back_populates="binary" 

629 ) 

630 

631 metadata_proxy: AssociationProxy[dict["MetadataKey", str]] = association_proxy( 

632 "key", "value" 

633 ) 

634 

635 def __init__( 

636 self, 

637 package=None, 

638 source=None, 

639 version=None, 

640 maintainer=None, 

641 architecture=None, 

642 poolfile=None, 

643 binarytype="deb", 

644 fingerprint=None, 

645 ): 

646 self.package = package 

647 self.source = source 

648 self.version = version 

649 self.maintainer = maintainer 

650 self.architecture = architecture 

651 self.poolfile = poolfile 

652 self.binarytype = binarytype 

653 self.fingerprint = fingerprint 

654 

655 @property 

656 def pkid(self) -> int: 

657 return self.binary_id 

658 

659 @property 

660 def name(self) -> str: 

661 return self.package 

662 

663 @property 

664 def arch_string(self) -> str: 

665 return "%s" % self.architecture 

666 

667 @override 

668 def properties(self) -> list[str]: 

669 return [ 

670 "package", 

671 "version", 

672 "maintainer", 

673 "source", 

674 "architecture", 

675 "poolfile", 

676 "binarytype", 

677 "fingerprint", 

678 "install_date", 

679 "suites_count", 

680 "binary_id", 

681 "contents_count", 

682 "extra_sources", 

683 ] 

684 

685 def scan_contents(self) -> Iterable[str]: 

686 """ 

687 Yields the contents of the package. Only regular files are yielded and 

688 the path names are normalized after converting them from either utf-8 

689 or iso8859-1 encoding. It yields the string ' <EMPTY PACKAGE>' if the 

690 package does not contain any regular file. 

691 """ 

692 fullpath = self.poolfile.fullpath 

693 dpkg_cmd = ("dpkg-deb", "--fsys-tarfile", fullpath) 

694 dpkg = subprocess.Popen(dpkg_cmd, stdout=subprocess.PIPE) 

695 tar = TarFile.open(fileobj=dpkg.stdout, mode="r|") 

696 for member in tar.getmembers(): 

697 if not member.isdir(): 

698 name = normpath(member.name) 

699 yield name 

700 tar.close() 

701 assert dpkg.stdout is not None 

702 dpkg.stdout.close() 

703 dpkg.wait() 

704 

705 def read_control(self) -> bytes: 

706 """ 

707 Reads the control information from a binary. 

708 

709 :return: stanza text of the control section. 

710 """ 

711 from . import utils 

712 

713 fullpath = self.poolfile.fullpath 

714 return utils.deb_extract_control(fullpath) 

715 

716 def read_control_fields(self) -> apt_pkg.TagSection: 

717 """ 

718 Reads the control information from a binary and return 

719 as a dictionary. 

720 

721 :return: fields of the control section as a dictionary. 

722 """ 

723 stanza = self.read_control() 

724 return apt_pkg.TagSection(stanza) 

725 

726 @property 

727 def proxy(self) -> "MetadataProxy": 

728 session = object_session(self) 

729 assert session is not None 

730 query = session.query(BinaryMetadata).filter_by(binary=self) 

731 return MetadataProxy(session, query) 

732 

733 

734__all__.append("DBBinary") 

735 

736 

737@session_wrapper 

738def get_suites_binary_in( 

739 package: str, session: "Session | None" = None 

740) -> "list[Suite]": 

741 """ 

742 Returns list of Suite objects which given `package` name is in 

743 

744 :param package: DBBinary package name to search for 

745 :return: list of Suite objects for the given package 

746 """ 

747 

748 assert session is not None 

749 return ( 

750 session.query(Suite) 

751 .filter(Suite.binaries.any(DBBinary.package == package)) 

752 .all() 

753 ) 

754 

755 

756__all__.append("get_suites_binary_in") 

757 

758 

759@session_wrapper 

760def get_component_by_package_suite( 

761 package: str, 

762 suite_list: list[str], 

763 arch_list: Optional[str] = None, 

764 session: "Session | None" = None, 

765) -> Optional[str]: 

766 """ 

767 Returns the component name of the newest binary package in suite_list or 

768 None if no package is found. The result can be optionally filtered by a list 

769 of architecture names. 

770 

771 :param package: DBBinary package name to search for 

772 :param suite_list: list of suite_name items 

773 :param arch_list: optional list of arch_string items that defaults to [] 

774 :return: name of component or None 

775 """ 

776 

777 assert session is not None 

778 q = ( 

779 session.query(DBBinary) 

780 .filter_by(package=package) 

781 .join(DBBinary.suites) 

782 .filter(Suite.suite_name.in_(suite_list)) 

783 ) 

784 if arch_list: 

785 q = q.join(DBBinary.architecture).filter( 

786 Architecture.arch_string.in_(arch_list) 

787 ) 

788 binary = q.order_by(desc(DBBinary.version)).first() 

789 if binary is None: 

790 return None 

791 else: 

792 return binary.poolfile.component.component_name 

793 

794 

795__all__.append("get_component_by_package_suite") 

796 

797 

798class BinAssociations(Base): 

799 __tablename__ = "bin_associations" 

800 __table_args = ( 

801 UniqueConstraint("suite", "bin", name="bin_associations_suite_key"), 

802 ) 

803 

804 id: Mapped[int] = mapped_column(primary_key=True) 

805 suite: Mapped[int] = mapped_column(ForeignKey("suite.id")) 

806 bin: Mapped[int] = mapped_column(ForeignKey("binaries.id")) 

807 created: Mapped[datetime] = mapped_column( 

808 DateTime(timezone=True), server_default=func.now() 

809 ) 

810 modified: Mapped[datetime] = mapped_column( 

811 DateTime(timezone=True), server_default=func.now() 

812 ) 

813 

814 

815class ExtraSrcReferences(Base): 

816 __tablename__ = "extra_src_references" 

817 

818 bin_id: Mapped[int] = mapped_column( 

819 ForeignKey("binaries.id", ondelete="CASCADE"), primary_key=True 

820 ) 

821 src_id: Mapped[int] = mapped_column(ForeignKey("source.id"), primary_key=True) 

822 

823 

824################################################################################ 

825 

826 

827class BuildQueue(Base): 

828 __tablename__ = "build_queue" 

829 

830 queue_id: Mapped[int] = mapped_column("id", primary_key=True) 

831 queue_name: Mapped[str] = mapped_column(unique=True) 

832 generate_metadata: Mapped[bool] = mapped_column(default=False) 

833 stay_of_execution: Mapped[int] = mapped_column(default=86400) 

834 created: Mapped[datetime | None] = mapped_column( 

835 DateTime(timezone=True), server_default=func.now() 

836 ) 

837 modified: Mapped[datetime | None] = mapped_column( 

838 DateTime(timezone=True), server_default=func.now() 

839 ) 

840 suite_id: Mapped[int] = mapped_column(ForeignKey("suite.id")) 

841 

842 suite: Mapped["Suite"] = relationship() 

843 

844 def __init__(self, *args, **kwargs): 

845 pass 

846 

847 @override 

848 def __repr__(self): 

849 return "<BuildQueue %s>" % self.queue_name 

850 

851 

852__all__.append("BuildQueue") 

853 

854################################################################################ 

855 

856 

857class Component(ORMObject): 

858 __tablename__ = "component" 

859 

860 component_id: Mapped[int] = mapped_column("id", primary_key=True) 

861 component_name: Mapped[str] = mapped_column("name", unique=True) 

862 description: Mapped[str | None] 

863 meets_dfsg: Mapped[bool | None] 

864 created: Mapped[datetime | None] = mapped_column( 

865 DateTime(timezone=True), server_default=func.now() 

866 ) 

867 modified: Mapped[datetime | None] = mapped_column( 

868 DateTime(timezone=True), server_default=func.now() 

869 ) 

870 ordering: Mapped[int] = mapped_column(unique=True) 

871 

872 suites: Mapped[list["Suite"]] = relationship( 

873 secondary="component_suite", back_populates="components" 

874 ) 

875 overrides: DynamicMapped["Override"] = relationship(back_populates="component") 

876 

877 def __init__(self, component_name=None): 

878 self.component_name = component_name 

879 

880 @override 

881 def __eq__(self, val: Any): 

882 if isinstance(val, str): 882 ↛ 883line 882 didn't jump to line 883 because the condition on line 882 was never true

883 warnings.warn( 

884 "comparison with a `str` is deprecated", 

885 DeprecationWarning, 

886 stacklevel=2, 

887 ) 

888 return self.component_name == val 

889 # This signals to use the normal comparison operator 

890 return NotImplemented 

891 

892 @override 

893 def __ne__(self, val: Any): 

894 if isinstance(val, str): 

895 warnings.warn( 

896 "comparison with a `str` is deprecated", 

897 DeprecationWarning, 

898 stacklevel=2, 

899 ) 

900 return self.component_name != val 

901 # This signals to use the normal comparison operator 

902 return NotImplemented 

903 

904 __hash__ = ORMObject.__hash__ 

905 

906 @override 

907 def properties(self) -> list[str]: 

908 return [ 

909 "component_name", 

910 "component_id", 

911 "description", 

912 "meets_dfsg", 

913 "overrides_count", 

914 ] 

915 

916 

917__all__.append("Component") 

918 

919 

920@session_wrapper 

921def get_component( 

922 component: str, session: "Session | None" = None 

923) -> Optional[Component]: 

924 """ 

925 Returns database id for given `component`. 

926 

927 :param component: The name of the override type 

928 :return: the database id for the given component 

929 """ 

930 

931 assert session is not None 

932 component = component.lower() 

933 q = session.query(Component).filter_by(component_name=component) 

934 return q.one_or_none() 

935 

936 

937__all__.append("get_component") 

938 

939 

940def get_mapped_component_name(component_name: str) -> str: 

941 cnf = Config() 

942 for m in cnf.value_list("ComponentMappings"): 942 ↛ 943line 942 didn't jump to line 943 because the loop on line 942 never started

943 (src, dst) = m.split() 

944 if component_name == src: 

945 component_name = dst 

946 return component_name 

947 

948 

949__all__.append("get_mapped_component_name") 

950 

951 

952@session_wrapper 

953def get_mapped_component( 

954 component_name: str, session: "Session | None" = None 

955) -> Optional[Component]: 

956 """get component after mappings 

957 

958 Evaluate component mappings from ComponentMappings in dak.conf for the 

959 given component name. 

960 

961 .. todo:: 

962 

963 ansgar wants to get rid of this. It's currently only used for 

964 the security archive 

965 

966 :param component_name: component name 

967 :param session: database session 

968 :return: component after applying maps or :const:`None` 

969 """ 

970 assert session is not None 

971 component_name = get_mapped_component_name(component_name) 

972 component = ( 

973 session.query(Component).filter_by(component_name=component_name).first() 

974 ) 

975 return component 

976 

977 

978__all__.append("get_mapped_component") 

979 

980 

981@session_wrapper 

982def get_component_names(session: "Session | None" = None) -> list[str]: 

983 """ 

984 Returns list of strings of component names. 

985 

986 :return: list of strings of component names 

987 """ 

988 

989 assert session is not None 

990 return [x.component_name for x in session.query(Component).all()] 

991 

992 

993__all__.append("get_component_names") 

994 

995 

996class ComponentSuite(Base): 

997 __tablename__ = "component_suite" 

998 

999 component_id: Mapped[int] = mapped_column( 

1000 ForeignKey("component.id", ondelete="CASCADE"), primary_key=True 

1001 ) 

1002 suite_id: Mapped[int] = mapped_column( 

1003 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True 

1004 ) 

1005 

1006 

1007################################################################################ 

1008 

1009 

1010class DBConfig(Base): 

1011 __tablename__ = "config" 

1012 

1013 config_id: Mapped[int] = mapped_column("id", primary_key=True) 

1014 name: Mapped[str] = mapped_column(unique=True) 

1015 value: Mapped[str | None] 

1016 created: Mapped[datetime] = mapped_column( 

1017 DateTime(timezone=True), server_default=func.now() 

1018 ) 

1019 modified: Mapped[datetime] = mapped_column( 

1020 DateTime(timezone=True), server_default=func.now() 

1021 ) 

1022 

1023 def __init__(self, *args, **kwargs): 

1024 pass 

1025 

1026 @override 

1027 def __repr__(self): 

1028 return "<DBConfig %s>" % self.name 

1029 

1030 

1031__all__.append("DBConfig") 

1032 

1033################################################################################ 

1034 

1035 

1036class DSCFile(Base): 

1037 __tablename__ = "dsc_files" 

1038 __table_args = (UniqueConstraint("source", "file", name="dsc_files_source_key"),) 

1039 

1040 dscfile_id: Mapped[int] = mapped_column("id", primary_key=True) 

1041 source_id: Mapped[int] = mapped_column("source", ForeignKey("source.id")) 

1042 poolfile_id: Mapped[int] = mapped_column("file", ForeignKey("files.id")) 

1043 created: Mapped[datetime | None] = mapped_column( 

1044 DateTime(timezone=True), server_default=func.now() 

1045 ) 

1046 modified: Mapped[datetime | None] = mapped_column( 

1047 DateTime(timezone=True), server_default=func.now() 

1048 ) 

1049 extra_file: Mapped[bool] = mapped_column(default=True) 

1050 

1051 source: Mapped["DBSource"] = relationship(back_populates="srcfiles") 

1052 poolfile: Mapped["PoolFile"] = relationship() 

1053 

1054 def __init__(self, *args, **kwargs): 

1055 pass 

1056 

1057 @override 

1058 def __repr__(self): 

1059 return "<DSCFile %s>" % self.dscfile_id 

1060 

1061 

1062__all__.append("DSCFile") 

1063 

1064 

1065@session_wrapper 

1066def get_dscfiles( 

1067 dscfile_id: Optional[int] = None, 

1068 source_id: Optional[int] = None, 

1069 poolfile_id: Optional[int] = None, 

1070 session: "Session | None" = None, 

1071) -> list[DSCFile]: 

1072 """ 

1073 Returns a list of DSCFiles which may be empty 

1074 

1075 :param dscfile_id: the dscfile_id of the DSCFiles to find 

1076 :param source_id: the source id related to the DSCFiles to find 

1077 :param poolfile_id: the poolfile id related to the DSCFiles to find 

1078 :return: Possibly empty list of DSCFiles 

1079 """ 

1080 

1081 assert session is not None 

1082 

1083 q = session.query(DSCFile) 

1084 if dscfile_id is not None: 

1085 q = q.filter_by(dscfile_id=dscfile_id) 

1086 if source_id is not None: 

1087 q = q.filter_by(source_id=source_id) 

1088 if poolfile_id is not None: 

1089 q = q.filter_by(poolfile_id=poolfile_id) 

1090 return q.all() 

1091 

1092 

1093__all__.append("get_dscfiles") 

1094 

1095################################################################################ 

1096 

1097 

1098class ExternalOverride(ORMObject): 

1099 __tablename__ = "external_overrides" 

1100 

1101 suite_id: Mapped[int] = mapped_column( 

1102 "suite", ForeignKey("suite.id"), primary_key=True 

1103 ) 

1104 component_id: Mapped[int] = mapped_column( 

1105 "component", ForeignKey("component.id"), primary_key=True 

1106 ) 

1107 package: Mapped[str] = mapped_column(primary_key=True) 

1108 key: Mapped[str] = mapped_column(primary_key=True) 

1109 value: Mapped[str] 

1110 

1111 suite: Mapped["Suite"] = relationship() 

1112 component: Mapped["Component"] = relationship() 

1113 

1114 def __init__(self, *args, **kwargs): 

1115 pass 

1116 

1117 @override 

1118 def __repr__(self): 

1119 return "<ExternalOverride %s = %s: %s>" % (self.package, self.key, self.value) 

1120 

1121 

1122__all__.append("ExternalOverride") 

1123 

1124################################################################################ 

1125 

1126 

1127class PoolFile(ORMObject): 

1128 __tablename__ = "files" 

1129 

1130 file_id: Mapped[int] = mapped_column("id", primary_key=True) 

1131 filename: Mapped[str] # TODO: add unique=True? 

1132 filesize: Mapped[int] = mapped_column("size", BigInteger()) 

1133 md5sum: Mapped[str] 

1134 last_used: Mapped[datetime | None] 

1135 sha1sum: Mapped[str] 

1136 sha256sum: Mapped[str] 

1137 created: Mapped[datetime | None] = mapped_column( 

1138 DateTime(timezone=True), server_default=func.now() 

1139 ) 

1140 modified: Mapped[datetime | None] = mapped_column( 

1141 DateTime(timezone=True), server_default=func.now() 

1142 ) 

1143 

1144 archives: Mapped[list["ArchiveFile"]] = relationship(back_populates="file") 

1145 

1146 def __init__(self, filename=None, filesize=-1, md5sum=None): 

1147 self.filename = filename 

1148 self.filesize = filesize 

1149 self.md5sum = md5sum 

1150 

1151 @property 

1152 def fullpath(self) -> str: 

1153 session = object_session(self) 

1154 assert session is not None 

1155 af = ( 

1156 session.query(ArchiveFile) 

1157 .join(Archive) 

1158 .filter(ArchiveFile.file == self) 

1159 .order_by(Archive.tainted.desc()) 

1160 .first() 

1161 ) 

1162 assert af is not None 

1163 return af.path 

1164 

1165 @property 

1166 def component(self) -> Component: 

1167 session = object_session(self) 

1168 assert session is not None 

1169 component_id = ( 

1170 session.query(ArchiveFile.component_id) 

1171 .filter(ArchiveFile.file == self) 

1172 .group_by(ArchiveFile.component_id) 

1173 .one() 

1174 ) 

1175 return session.get_one(Component, component_id) 

1176 

1177 @property 

1178 def basename(self) -> str: 

1179 return os.path.basename(self.filename) 

1180 

1181 @override 

1182 def properties(self) -> list[str]: 

1183 return [ 

1184 "filename", 

1185 "file_id", 

1186 "filesize", 

1187 "md5sum", 

1188 "sha1sum", 

1189 "sha256sum", 

1190 "source", 

1191 "binary", 

1192 "last_used", 

1193 ] 

1194 

1195 

1196__all__.append("PoolFile") 

1197 

1198################################################################################ 

1199 

1200 

1201class Fingerprint(ORMObject): 

1202 __tablename__ = "fingerprint" 

1203 

1204 fingerprint_id: Mapped[int] = mapped_column("id", primary_key=True) 

1205 fingerprint: Mapped[str] = mapped_column(unique=True) 

1206 uid_id: Mapped[int | None] = mapped_column("uid", ForeignKey("uid.id")) 

1207 keyring_id: Mapped[int | None] = mapped_column("keyring", ForeignKey("keyrings.id")) 

1208 created: Mapped[datetime | None] = mapped_column( 

1209 DateTime(timezone=True), server_default=func.now() 

1210 ) 

1211 modified: Mapped[datetime | None] = mapped_column( 

1212 DateTime(timezone=True), server_default=func.now() 

1213 ) 

1214 acl_id: Mapped[int | None] = mapped_column(ForeignKey("acl.id")) 

1215 

1216 uid: Mapped["Uid | None"] = relationship(back_populates="fingerprint") 

1217 keyring: Mapped["Keyring | None"] = relationship() 

1218 acl: Mapped["ACL | None"] = relationship() 

1219 

1220 def __init__(self, fingerprint=None): 

1221 self.fingerprint = fingerprint 

1222 

1223 @override 

1224 def properties(self) -> list[str]: 

1225 return ["fingerprint", "fingerprint_id", "keyring", "uid", "binary_reject"] 

1226 

1227 

1228__all__.append("Fingerprint") 

1229 

1230 

1231@session_wrapper 

1232def get_fingerprint( 

1233 fpr: str, session: "Session | None" = None 

1234) -> Optional[Fingerprint]: 

1235 """ 

1236 Returns Fingerprint object for given fpr. 

1237 

1238 :param fpr: The fpr to find / add 

1239 :param session: Optional SQL session object (a temporary one will be 

1240 generated if not supplied). 

1241 :return: the Fingerprint object for the given fpr or None 

1242 """ 

1243 

1244 assert session is not None 

1245 q = session.query(Fingerprint).filter_by(fingerprint=fpr) 

1246 return q.one_or_none() 

1247 

1248 

1249__all__.append("get_fingerprint") 

1250 

1251 

1252@session_wrapper 

1253def get_or_set_fingerprint(fpr: str, session: "Session | None" = None) -> Fingerprint: 

1254 """ 

1255 Returns Fingerprint object for given fpr. 

1256 

1257 If no matching fpr is found, a row is inserted. 

1258 

1259 :param fpr: The fpr to find / add 

1260 :param session: Optional SQL session object (a temporary one will be 

1261 generated if not supplied). If not passed, a commit will be performed at 

1262 the end of the function, otherwise the caller is responsible for commiting. 

1263 A flush will be performed either way. 

1264 :return: the Fingerprint object for the given fpr 

1265 """ 

1266 

1267 assert session is not None 

1268 q = session.query(Fingerprint).filter_by(fingerprint=fpr) 

1269 

1270 try: 

1271 ret = q.one() 

1272 except NoResultFound: 

1273 fingerprint = Fingerprint() 

1274 fingerprint.fingerprint = fpr 

1275 session.add(fingerprint) 

1276 session.commit_or_flush() # type: ignore[attr-defined] 

1277 ret = fingerprint 

1278 

1279 return ret 

1280 

1281 

1282__all__.append("get_or_set_fingerprint") 

1283 

1284################################################################################ 

1285 

1286# Helper routine for Keyring class 

1287 

1288 

1289def get_ldap_name(entry) -> str: 

1290 name = [] 

1291 for k in ["cn", "mn", "sn"]: 

1292 ret = entry.get(k) 

1293 if not ret: 

1294 continue 

1295 value = ret[0].decode() 

1296 if value and value[0] != "-": 

1297 name.append(value) 

1298 return " ".join(name) 

1299 

1300 

1301################################################################################ 

1302 

1303 

1304class _Key(TypedDict, total=False): 

1305 email: str 

1306 name: str 

1307 fingerprints: list[str] 

1308 uid: str 

1309 

1310 

1311class Keyring(Base): 

1312 __tablename__ = "keyrings" 

1313 

1314 keyring_id: Mapped[int] = mapped_column("id", primary_key=True) 

1315 keyring_name: Mapped[str] = mapped_column("name", unique=True) 

1316 priority: Mapped[int] = mapped_column(default=100) 

1317 created: Mapped[datetime | None] = mapped_column( 

1318 DateTime(timezone=True), server_default=func.now() 

1319 ) 

1320 modified: Mapped[datetime | None] = mapped_column( 

1321 DateTime(timezone=True), server_default=func.now() 

1322 ) 

1323 active: Mapped[bool | None] = mapped_column(default=True) 

1324 acl_id: Mapped[int | None] = mapped_column(ForeignKey("acl.id")) 

1325 tag2upload: Mapped[bool] = mapped_column(default=False) 

1326 

1327 acl: Mapped["ACL | None"] = relationship(foreign_keys=[acl_id]) 

1328 

1329 keys: dict[str, _Key] = {} 

1330 fpr_lookup: dict[str, str] = {} 

1331 

1332 def __init__(self, *args, **kwargs): 

1333 pass 

1334 

1335 @override 

1336 def __repr__(self): 

1337 return "<Keyring %s>" % self.keyring_name 

1338 

1339 def de_escape_gpg_str(self, txt: str) -> str: 

1340 esclist = re.split(r"(\\x..)", txt) 

1341 for x in range(1, len(esclist), 2): 1341 ↛ 1342line 1341 didn't jump to line 1342 because the loop on line 1341 never started

1342 esclist[x] = "%c" % (int(esclist[x][2:], 16)) 

1343 return "".join(esclist) 

1344 

1345 def parse_address(self, uid: str) -> tuple[str, str]: 

1346 """parses uid and returns a tuple of real name and email address""" 

1347 import email.utils 

1348 

1349 (name, address) = email.utils.parseaddr(uid) 

1350 name = re.sub(r"\s*[(].*[)]", "", name) 

1351 name = self.de_escape_gpg_str(name) 

1352 if name == "": 

1353 name = uid 

1354 return (name, address) 

1355 

1356 def load_keys(self, keyring: str) -> None: 

1357 if not self.keyring_id: 1357 ↛ 1358line 1357 didn't jump to line 1358 because the condition on line 1357 was never true

1358 raise Exception("Must be initialized with database information") 

1359 

1360 cmd = [ 

1361 "gpg", 

1362 "--no-default-keyring", 

1363 "--keyring", 

1364 keyring, 

1365 "--with-colons", 

1366 "--fingerprint", 

1367 "--fingerprint", 

1368 ] 

1369 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

1370 

1371 key = None 

1372 need_fingerprint = False 

1373 

1374 assert p.stdout is not None 

1375 for line_raw in p.stdout: 

1376 try: 

1377 line = line_raw.decode() 

1378 except UnicodeDecodeError: 

1379 # Some old UIDs might not use UTF-8 encoding. We assume they 

1380 # use latin1. 

1381 line = line_raw.decode("latin1") 

1382 field = line.split(":") 

1383 if field[0] == "pub": 

1384 key = field[4] 

1385 self.keys[key] = {} 

1386 (name, addr) = self.parse_address(field[9]) 

1387 if "@" in addr: 1387 ↛ 1388line 1387 didn't jump to line 1388 because the condition on line 1387 was never true

1388 self.keys[key]["email"] = addr 

1389 self.keys[key]["name"] = name 

1390 need_fingerprint = True 

1391 elif key and field[0] == "uid": 

1392 assert key is not None 

1393 (name, addr) = self.parse_address(field[9]) 

1394 if "email" not in self.keys[key] and "@" in addr: 1394 ↛ 1375line 1394 didn't jump to line 1375 because the condition on line 1394 was always true

1395 self.keys[key]["email"] = addr 

1396 self.keys[key]["name"] = name 

1397 elif need_fingerprint and field[0] == "fpr": 

1398 assert key is not None 

1399 self.keys[key]["fingerprints"] = [field[9]] 

1400 self.fpr_lookup[field[9]] = key 

1401 need_fingerprint = False 

1402 

1403 (out, err) = p.communicate() 

1404 r = p.returncode 

1405 if r != 0: 1405 ↛ 1406line 1405 didn't jump to line 1406 because the condition on line 1405 was never true

1406 raise daklib.gpg.GpgException( 

1407 "command failed: %s\nstdout: %r\nstderr: %r\n" % (cmd, out, err) 

1408 ) 

1409 

1410 def import_users_from_ldap( 

1411 self, session: "Session" 

1412 ) -> tuple[dict[str, tuple[int, str]], dict[int, tuple[str, str]]]: 

1413 import ldap # type: ignore 

1414 

1415 from .utils import open_ldap_connection 

1416 

1417 conn = open_ldap_connection() 

1418 cnf = Config() 

1419 LDAPDn = cnf["Import-LDAP-Fingerprints::LDAPDn"] 

1420 Attrs = conn.search_s( 

1421 LDAPDn, 

1422 ldap.SCOPE_ONELEVEL, 

1423 "(&(keyfingerprint=*)(supplementaryGid=%s))" 

1424 % (cnf["Import-Users-From-Passwd::ValidGID"]), 

1425 ["uid", "keyfingerprint", "cn", "mn", "sn"], 

1426 ) 

1427 

1428 byuid: dict[int, tuple[str, str]] = {} 

1429 byname: dict[str, tuple[int, str]] = {} 

1430 

1431 for i in Attrs: 

1432 entry = i[1] 

1433 uid = entry["uid"][0].decode() 

1434 name = get_ldap_name(entry) 

1435 fingerprints = entry["keyFingerPrint"] 

1436 keyid = None 

1437 for f_raw in fingerprints: 

1438 f = f_raw.decode() 

1439 key = self.fpr_lookup.get(f, None) 

1440 if key not in self.keys: 

1441 continue 

1442 self.keys[key]["uid"] = uid 

1443 

1444 if keyid is not None: 

1445 continue 

1446 keyid = get_or_set_uid(uid, session).uid_id 

1447 byuid[keyid] = (uid, name) 

1448 byname[uid] = (keyid, name) 

1449 

1450 return (byname, byuid) 

1451 

1452 def generate_users_from_keyring( 

1453 self, format: str, session: "Session" 

1454 ) -> tuple[dict[str, tuple[int, str]], dict[int, tuple[str, str]]]: 

1455 byuid: dict[int, tuple[str, str]] = {} 

1456 byname: dict[str, tuple[int, str]] = {} 

1457 any_invalid = False 

1458 for x in list(self.keys.keys()): 

1459 if "email" not in self.keys[x]: 1459 ↛ 1460line 1459 didn't jump to line 1460 because the condition on line 1459 was never true

1460 any_invalid = True 

1461 self.keys[x]["uid"] = format % "invalid-uid" 

1462 else: 

1463 uid = format % self.keys[x]["email"] 

1464 keyid = get_or_set_uid(uid, session).uid_id 

1465 byuid[keyid] = (uid, self.keys[x]["name"]) 

1466 byname[uid] = (keyid, self.keys[x]["name"]) 

1467 self.keys[x]["uid"] = uid 

1468 

1469 if any_invalid: 1469 ↛ 1470line 1469 didn't jump to line 1470 because the condition on line 1469 was never true

1470 uid = format % "invalid-uid" 

1471 keyid = get_or_set_uid(uid, session).uid_id 

1472 byuid[keyid] = (uid, "ungeneratable user id") 

1473 byname[uid] = (keyid, "ungeneratable user id") 

1474 

1475 return (byname, byuid) 

1476 

1477 

1478__all__.append("Keyring") 

1479 

1480 

1481@session_wrapper 

1482def get_keyring(keyring: str, session: "Session | None" = None) -> Optional[Keyring]: 

1483 """ 

1484 If `keyring` does not have an entry in the `keyrings` table yet, return None 

1485 If `keyring` already has an entry, simply return the existing :class:`Keyring` 

1486 

1487 :param keyring: the keyring name 

1488 :return: the :class:`Keyring` object for this keyring 

1489 """ 

1490 

1491 assert session is not None 

1492 q = session.query(Keyring).filter_by(keyring_name=keyring) 

1493 return q.one_or_none() 

1494 

1495 

1496__all__.append("get_keyring") 

1497 

1498 

1499@session_wrapper 

1500def get_active_keyring_paths(session: "Session | None" = None) -> list[str]: 

1501 """ 

1502 :return: list of active keyring paths 

1503 """ 

1504 assert session is not None 

1505 return [ 

1506 x.keyring_name 

1507 for x in session.query(Keyring) 

1508 .filter(Keyring.active == True) # noqa:E712 

1509 .order_by(desc(Keyring.priority)) 

1510 .all() 

1511 ] 

1512 

1513 

1514__all__.append("get_active_keyring_paths") 

1515 

1516################################################################################ 

1517 

1518 

1519class DBChange(Base): 

1520 __tablename__ = "changes" 

1521 

1522 change_id: Mapped[int] = mapped_column("id", primary_key=True) 

1523 changesname: Mapped[str] 

1524 seen: Mapped[datetime] = mapped_column( 

1525 DateTime(timezone=True), server_default=func.now() 

1526 ) 

1527 source: Mapped[str] 

1528 binaries: Mapped[str | None] 

1529 architecture: Mapped[str] 

1530 version: Mapped[str] 

1531 distribution: Mapped[str] 

1532 urgency: Mapped[str] 

1533 maintainer: Mapped[str] 

1534 fingerprint: Mapped[str] 

1535 changedby: Mapped[str] 

1536 date: Mapped[str] 

1537 created: Mapped[datetime] = mapped_column( 

1538 DateTime(timezone=True), server_default=func.now() 

1539 ) 

1540 modified: Mapped[datetime] = mapped_column( 

1541 DateTime(timezone=True), server_default=func.now() 

1542 ) 

1543 changelog_id: Mapped[int | None] 

1544 closes: Mapped[list[str] | None] = mapped_column(ARRAY(String())) 

1545 authorized_by_fingerprint: Mapped[str | None] 

1546 

1547 def __init__(self, *args, **kwargs): 

1548 pass 

1549 

1550 @override 

1551 def __repr__(self): 

1552 return "<DBChange %s>" % self.changesname 

1553 

1554 

1555__all__.append("DBChange") 

1556 

1557 

1558@session_wrapper 

1559def get_dbchange(filename: str, session: "Session | None" = None) -> Optional[DBChange]: 

1560 """ 

1561 returns DBChange object for given `filename`. 

1562 

1563 :param filename: the name of the file 

1564 :param session: Optional SQLA session object (a temporary one will be 

1565 generated if not supplied) 

1566 :return: DBChange object for the given filename (:const:`None` if not present) 

1567 """ 

1568 assert session is not None 

1569 q = session.query(DBChange).filter_by(changesname=filename) 

1570 return q.one_or_none() 

1571 

1572 

1573__all__.append("get_dbchange") 

1574 

1575################################################################################ 

1576 

1577 

1578class DBChangelog(Base): 

1579 __tablename__ = "changelogs_text" 

1580 

1581 id: Mapped[int] = mapped_column(primary_key=True) 

1582 changelog: Mapped[str | None] 

1583 

1584 def __init__(self, *args, **kwargs): 

1585 pass 

1586 

1587 @override 

1588 def __repr__(self): 

1589 return "<DBChangelog %s>" % self.id 

1590 

1591 

1592__all__.append("DBChangelog") 

1593 

1594################################################################################ 

1595 

1596 

1597class Maintainer(ORMObject): 

1598 __tablename__ = "maintainer" 

1599 

1600 maintainer_id: Mapped[int] = mapped_column("id", primary_key=True) 

1601 name: Mapped[str] = mapped_column(unique=True) 

1602 created: Mapped[datetime | None] = mapped_column( 

1603 DateTime(timezone=True), server_default=func.now() 

1604 ) 

1605 modified: Mapped[datetime | None] = mapped_column( 

1606 DateTime(timezone=True), server_default=func.now() 

1607 ) 

1608 

1609 maintains_sources: Mapped[list["DBSource"]] = relationship( 

1610 back_populates="maintainer", foreign_keys=lambda: DBSource.maintainer_id 

1611 ) 

1612 changed_sources: Mapped[list["DBSource"]] = relationship( 

1613 back_populates="changedby", foreign_keys=lambda: DBSource.changedby_id 

1614 ) 

1615 

1616 def __init__(self, name=None): 

1617 self.name = name 

1618 

1619 @override 

1620 def properties(self) -> list[str]: 

1621 return ["name", "maintainer_id"] 

1622 

1623 def get_split_maintainer(self) -> tuple[str, str, str, str]: 

1624 if not hasattr(self, "name") or self.name is None: 

1625 return ("", "", "", "") 

1626 

1627 return fix_maintainer(self.name.strip()) 

1628 

1629 

1630__all__.append("Maintainer") 

1631 

1632 

1633@session_wrapper 

1634def get_or_set_maintainer(name: str, session: "Session | None" = None) -> Maintainer: 

1635 """ 

1636 Returns Maintainer object for given maintainer name. 

1637 

1638 If no matching maintainer name is found, a row is inserted. 

1639 

1640 :param name: The maintainer name to add 

1641 :param session: Optional SQL session object (a temporary one will be 

1642 generated if not supplied). If not passed, a commit will be performed at 

1643 the end of the function, otherwise the caller is responsible for commiting. 

1644 A flush will be performed either way. 

1645 :return: the Maintainer object for the given maintainer 

1646 """ 

1647 

1648 assert session is not None 

1649 q = session.query(Maintainer).filter_by(name=name) 

1650 try: 

1651 ret = q.one() 

1652 except NoResultFound: 

1653 maintainer = Maintainer() 

1654 maintainer.name = name 

1655 session.add(maintainer) 

1656 session.commit_or_flush() # type: ignore[attr-defined] 

1657 ret = maintainer 

1658 

1659 return ret 

1660 

1661 

1662__all__.append("get_or_set_maintainer") 

1663 

1664 

1665@session_wrapper 

1666def get_maintainer( 

1667 maintainer_id: int, session: "Session | None" = None 

1668) -> Optional[Maintainer]: 

1669 """ 

1670 Return the name of the maintainer behind `maintainer_id` or :const:`None` 

1671 if that `maintainer_id` is invalid. 

1672 

1673 :param maintainer_id: the id of the maintainer 

1674 :return: the Maintainer with this `maintainer_id` 

1675 """ 

1676 

1677 assert session is not None 

1678 return session.get(Maintainer, maintainer_id) 

1679 

1680 

1681__all__.append("get_maintainer") 

1682 

1683################################################################################ 

1684 

1685 

1686class NewComment(Base): 

1687 __tablename__ = "new_comments" 

1688 

1689 comment_id: Mapped[int] = mapped_column("id", primary_key=True) 

1690 package: Mapped[str] 

1691 version: Mapped[str] 

1692 comment: Mapped[str] 

1693 author: Mapped[str] 

1694 notedate: Mapped[datetime] = mapped_column( 

1695 DateTime(timezone=True), server_default=func.now() 

1696 ) 

1697 trainee: Mapped[bool] = mapped_column(default=False) 

1698 created: Mapped[datetime | None] = mapped_column( 

1699 DateTime(timezone=True), server_default=func.now() 

1700 ) 

1701 modified: Mapped[datetime | None] = mapped_column( 

1702 DateTime(timezone=True), server_default=func.now() 

1703 ) 

1704 policy_queue_id: Mapped[int] = mapped_column(ForeignKey("policy_queue.id")) 

1705 

1706 policy_queue: Mapped["PolicyQueue"] = relationship() 

1707 

1708 def __init__(self, *args, **kwargs): 

1709 pass 

1710 

1711 @override 

1712 def __repr__(self): 

1713 return """<NewComment for '%s %s' (%s)>""" % ( 

1714 self.package, 

1715 self.version, 

1716 self.comment_id, 

1717 ) 

1718 

1719 

1720__all__.append("NewComment") 

1721 

1722 

1723@session_wrapper 

1724def has_new_comment( 

1725 policy_queue: "PolicyQueue", 

1726 package: str, 

1727 version: str, 

1728 session: "Session | None" = None, 

1729) -> bool: 

1730 """ 

1731 Returns :const:`True` if the given combination of `package`, `version` has a comment. 

1732 

1733 :param package: name of the package 

1734 :param version: package version 

1735 :param session: Optional SQLA session object (a temporary one will be 

1736 generated if not supplied) 

1737 """ 

1738 

1739 assert session is not None 

1740 q = session.query(NewComment).filter_by(policy_queue=policy_queue) 

1741 q = q.filter_by(package=package) 

1742 q = q.filter_by(version=version) 

1743 

1744 return bool(q.count() > 0) 

1745 

1746 

1747__all__.append("has_new_comment") 

1748 

1749 

1750@session_wrapper 

1751def get_new_comments( 

1752 policy_queue: "PolicyQueue", 

1753 package: Optional[str] = None, 

1754 version: Optional[str] = None, 

1755 comment_id: Optional[int] = None, 

1756 session: "Session | None" = None, 

1757) -> list[NewComment]: 

1758 """ 

1759 Returns (possibly empty) list of NewComment objects for the given 

1760 parameters 

1761 

1762 :param package: name of the package 

1763 :param version: package version 

1764 :param comment_id: An id of a comment 

1765 :param session: Optional SQLA session object (a temporary one will be 

1766 generated if not supplied) 

1767 :return: A (possibly empty) list of NewComment objects will be returned 

1768 """ 

1769 

1770 assert session is not None 

1771 q = session.query(NewComment).filter_by(policy_queue=policy_queue) 

1772 if package is not None: 1772 ↛ 1774line 1772 didn't jump to line 1774 because the condition on line 1772 was always true

1773 q = q.filter_by(package=package) 

1774 if version is not None: 1774 ↛ 1775line 1774 didn't jump to line 1775 because the condition on line 1774 was never true

1775 q = q.filter_by(version=version) 

1776 if comment_id is not None: 1776 ↛ 1777line 1776 didn't jump to line 1777 because the condition on line 1776 was never true

1777 q = q.filter_by(comment_id=comment_id) 

1778 

1779 return q.all() 

1780 

1781 

1782__all__.append("get_new_comments") 

1783 

1784################################################################################ 

1785 

1786 

1787class Override(ORMObject): 

1788 __tablename__ = "override" 

1789 

1790 suite_id: Mapped[int] = mapped_column( 

1791 "suite", ForeignKey("suite.id"), primary_key=True 

1792 ) 

1793 component_id: Mapped[int] = mapped_column( 

1794 "component", ForeignKey("component.id"), primary_key=True 

1795 ) 

1796 package: Mapped[str] = mapped_column(primary_key=True) 

1797 overridetype_id: Mapped[int] = mapped_column( 

1798 "type", ForeignKey("override_type.id"), primary_key=True 

1799 ) 

1800 priority_id: Mapped[int | None] = mapped_column( 

1801 "priority", ForeignKey("priority.id") 

1802 ) 

1803 section_id: Mapped[int] = mapped_column("section", ForeignKey("section.id")) 

1804 maintainer: Mapped[str | None] 

1805 created: Mapped[datetime | None] = mapped_column( 

1806 DateTime(timezone=True), server_default=func.now() 

1807 ) 

1808 modified: Mapped[datetime | None] = mapped_column( 

1809 DateTime(timezone=True), server_default=func.now() 

1810 ) 

1811 

1812 suite: Mapped["Suite"] = relationship(back_populates="overrides") 

1813 component: Mapped["Component"] = relationship(back_populates="overrides") 

1814 priority: Mapped["Priority"] = relationship(back_populates="overrides") 

1815 section: Mapped["Section"] = relationship(back_populates="overrides") 

1816 overridetype: Mapped["OverrideType"] = relationship(back_populates="overrides") 

1817 

1818 def __init__( 

1819 self, 

1820 package=None, 

1821 suite=None, 

1822 component=None, 

1823 overridetype=None, 

1824 section=None, 

1825 priority=None, 

1826 ): 

1827 self.package = package 

1828 self.suite = suite 

1829 self.component = component 

1830 self.overridetype = overridetype 

1831 self.section = section 

1832 self.priority = priority 

1833 

1834 @override 

1835 def properties(self) -> list[str]: 

1836 return ["package", "suite", "component", "overridetype", "section", "priority"] 

1837 

1838 

1839__all__.append("Override") 

1840 

1841 

1842@session_wrapper 

1843def get_override( 

1844 package: str, 

1845 suite: Union[str, list[str], None] = None, 

1846 component: Union[str, list[str], None] = None, 

1847 overridetype: Union[str, list[str], None] = None, 

1848 session: "Session | None" = None, 

1849) -> list[Override]: 

1850 """ 

1851 Returns Override object for the given parameters 

1852 

1853 :param package: The name of the package 

1854 :param suite: The name of the suite (or suites if a list) to limit to. If 

1855 None, don't limit. Defaults to None. 

1856 :param component: The name of the component (or components if a list) to 

1857 limit to. If None, don't limit. Defaults to None. 

1858 :param overridetype: The name of the overridetype (or overridetypes if a list) to 

1859 limit to. If None, don't limit. Defaults to None. 

1860 :param session: Optional SQLA session object (a temporary one will be 

1861 generated if not supplied) 

1862 :return: A (possibly empty) list of Override objects will be returned 

1863 """ 

1864 

1865 assert session is not None 

1866 

1867 q = session.query(Override) 

1868 q = q.filter_by(package=package) 

1869 

1870 if suite is not None: 

1871 if not isinstance(suite, list): 

1872 suite = [suite] 

1873 q = q.join(Suite).filter(Suite.suite_name.in_(suite)) 

1874 

1875 if component is not None: 

1876 if not isinstance(component, list): 

1877 component = [component] 

1878 q = q.join(Component).filter(Component.component_name.in_(component)) 

1879 

1880 if overridetype is not None: 

1881 if not isinstance(overridetype, list): 

1882 overridetype = [overridetype] 

1883 q = q.join(OverrideType).filter(OverrideType.overridetype.in_(overridetype)) 

1884 

1885 return q.all() 

1886 

1887 

1888__all__.append("get_override") 

1889 

1890 

1891################################################################################ 

1892 

1893 

1894class OverrideType(ORMObject): 

1895 __tablename__ = "override_type" 

1896 

1897 overridetype_id: Mapped[int] = mapped_column("id", primary_key=True) 

1898 overridetype: Mapped[str] = mapped_column("type", unique=True) 

1899 created: Mapped[datetime | None] = mapped_column( 

1900 DateTime(timezone=True), server_default=func.now() 

1901 ) 

1902 modified: Mapped[datetime | None] = mapped_column( 

1903 DateTime(timezone=True), server_default=func.now() 

1904 ) 

1905 

1906 overrides: DynamicMapped["Override"] = relationship(back_populates="overridetype") 

1907 

1908 def __init__(self, overridetype=None): 

1909 self.overridetype = overridetype 

1910 

1911 @override 

1912 def properties(self) -> list[str]: 

1913 return ["overridetype", "overridetype_id", "overrides_count"] 

1914 

1915 

1916__all__.append("OverrideType") 

1917 

1918 

1919@session_wrapper 

1920def get_override_type( 

1921 override_type: str, session: "Session | None" = None 

1922) -> Optional[OverrideType]: 

1923 """ 

1924 Returns OverrideType object for given `override_type`. 

1925 

1926 :param override_type: The name of the override type 

1927 :param session: Optional SQLA session object (a temporary one will be 

1928 generated if not supplied) 

1929 :return: the database id for the given override type 

1930 """ 

1931 

1932 assert session is not None 

1933 q = session.query(OverrideType).filter_by(overridetype=override_type) 

1934 return q.one_or_none() 

1935 

1936 

1937__all__.append("get_override_type") 

1938 

1939################################################################################ 

1940 

1941 

1942class PolicyQueue(Base): 

1943 __tablename__ = "policy_queue" 

1944 

1945 policy_queue_id: Mapped[int] = mapped_column("id", primary_key=True) 

1946 queue_name: Mapped[str] = mapped_column(unique=True) 

1947 path: Mapped[str] 

1948 change_perms: Mapped[str] = mapped_column(CHAR(length=4)) 

1949 generate_metadata: Mapped[bool] = mapped_column(default=False) 

1950 created: Mapped[datetime | None] = mapped_column( 

1951 DateTime(timezone=True), server_default=func.now() 

1952 ) 

1953 modified: Mapped[datetime | None] = mapped_column( 

1954 DateTime(timezone=True), server_default=func.now() 

1955 ) 

1956 send_to_build_queues: Mapped[bool] = mapped_column(default=False) 

1957 suite_id: Mapped[int] = mapped_column(ForeignKey("suite.id")) 

1958 

1959 suite: Mapped["Suite"] = relationship(foreign_keys=[suite_id]) 

1960 uploads: Mapped[list["PolicyQueueUpload"]] = relationship( 

1961 back_populates="policy_queue" 

1962 ) 

1963 

1964 def __init__(self, *args, **kwargs): 

1965 pass 

1966 

1967 @override 

1968 def __repr__(self): 

1969 return "<PolicyQueue %s>" % self.queue_name 

1970 

1971 

1972__all__.append("PolicyQueue") 

1973 

1974 

1975@session_wrapper 

1976def get_policy_queue( 

1977 queuename: str, session: "Session | None" = None 

1978) -> Optional[PolicyQueue]: 

1979 """ 

1980 Returns PolicyQueue object for given `queuename` 

1981 

1982 :param queuename: The name of the queue 

1983 :param session: Optional SQLA session object (a temporary one will be 

1984 generated if not supplied) 

1985 :return: PolicyQueue object for the given queue 

1986 """ 

1987 

1988 assert session is not None 

1989 q = session.query(PolicyQueue).filter_by(queue_name=queuename) 

1990 return q.one_or_none() 

1991 

1992 

1993__all__.append("get_policy_queue") 

1994 

1995################################################################################ 

1996 

1997 

1998@functools.total_ordering 

1999class PolicyQueueUpload(Base): 

2000 __tablename__ = "policy_queue_upload" 

2001 __table_args = ( 

2002 UniqueConstraint( 

2003 "policy_queue_id", 

2004 "target_suite_id", 

2005 "changes_id", 

2006 name="policy_queue_upload_policy_queue_id_target_suite_id_changes_key", 

2007 ), 

2008 ) 

2009 

2010 id: Mapped[int] = mapped_column(primary_key=True) 

2011 policy_queue_id: Mapped[int] = mapped_column(ForeignKey("policy_queue.id")) 

2012 target_suite_id: Mapped[int] = mapped_column(ForeignKey("suite.id")) 

2013 changes_id: Mapped[int] = mapped_column(ForeignKey("changes.id")) 

2014 source_id: Mapped[int | None] = mapped_column(ForeignKey("source.id")) 

2015 

2016 changes: Mapped["DBChange"] = relationship() 

2017 policy_queue: Mapped["PolicyQueue"] = relationship(back_populates="uploads") 

2018 target_suite: Mapped["Suite"] = relationship() 

2019 source: Mapped["DBSource | None"] = relationship() 

2020 binaries: Mapped[list["DBBinary"]] = relationship( 

2021 secondary="policy_queue_upload_binaries_map" 

2022 ) 

2023 byhand: Mapped[list["PolicyQueueByhandFile"]] = relationship( 

2024 back_populates="upload" 

2025 ) 

2026 

2027 def _key(self): 

2028 return ( 

2029 self.changes.source, 

2030 AptVersion(self.changes.version), 

2031 self.source is None, 

2032 self.changes.changesname, 

2033 ) 

2034 

2035 @override 

2036 def __eq__(self, other: Any) -> bool: 

2037 if not isinstance(other, PolicyQueueUpload): 2037 ↛ 2038line 2037 didn't jump to line 2038 because the condition on line 2037 was never true

2038 return NotImplemented 

2039 return self._key() == other._key() 

2040 

2041 def __lt__(self, other): 

2042 return self._key() < other._key() 

2043 

2044 

2045__all__.append("PolicyQueueUpload") 

2046 

2047 

2048class PolicyQueueUploadBinariesMap(Base): 

2049 __tablename__ = "policy_queue_upload_binaries_map" 

2050 

2051 policy_queue_upload_id: Mapped[int] = mapped_column( 

2052 ForeignKey("policy_queue_upload.id", ondelete="CASCADE"), primary_key=True 

2053 ) 

2054 binary_id: Mapped[int] = mapped_column(ForeignKey("binaries.id"), primary_key=True) 

2055 

2056 

2057################################################################################ 

2058 

2059 

2060class PolicyQueueByhandFile(Base): 

2061 __tablename__ = "policy_queue_byhand_file" 

2062 

2063 id: Mapped[int] = mapped_column(primary_key=True) 

2064 upload_id: Mapped[int] = mapped_column(ForeignKey("policy_queue_upload.id")) 

2065 filename: Mapped[str] 

2066 processed: Mapped[bool] = mapped_column(default=False) 

2067 

2068 upload: Mapped[PolicyQueueUpload] = relationship(back_populates="byhand") 

2069 

2070 

2071__all__.append("PolicyQueueByhandFile") 

2072 

2073################################################################################ 

2074 

2075 

2076class Priority(ORMObject): 

2077 __tablename__ = "priority" 

2078 

2079 priority_id: Mapped[int] = mapped_column("id", primary_key=True) 

2080 priority: Mapped[str] = mapped_column(unique=True) 

2081 level: Mapped[int] = mapped_column(unique=True) 

2082 created: Mapped[datetime | None] = mapped_column( 

2083 DateTime(timezone=True), server_default=func.now() 

2084 ) 

2085 modified: Mapped[datetime | None] = mapped_column( 

2086 DateTime(timezone=True), server_default=func.now() 

2087 ) 

2088 

2089 overrides: DynamicMapped["Override"] = relationship(back_populates="priority") 

2090 

2091 def __init__(self, priority=None, level=None): 

2092 self.priority = priority 

2093 self.level = level 

2094 

2095 @override 

2096 def properties(self) -> list[str]: 

2097 return ["priority", "priority_id", "level", "overrides_count"] 

2098 

2099 @override 

2100 def __eq__(self, val: Any): 

2101 if isinstance(val, str): 

2102 warnings.warn( 

2103 "comparison with a `str` is deprecated", 

2104 DeprecationWarning, 

2105 stacklevel=2, 

2106 ) 

2107 return self.priority == val 

2108 # This signals to use the normal comparison operator 

2109 return NotImplemented 

2110 

2111 @override 

2112 def __ne__(self, val: Any): 

2113 if isinstance(val, str): 2113 ↛ 2121line 2113 didn't jump to line 2121 because the condition on line 2113 was always true

2114 warnings.warn( 

2115 "comparison with a `str` is deprecated", 

2116 DeprecationWarning, 

2117 stacklevel=2, 

2118 ) 

2119 return self.priority != val 

2120 # This signals to use the normal comparison operator 

2121 return NotImplemented 

2122 

2123 __hash__ = ORMObject.__hash__ 

2124 

2125 

2126__all__.append("Priority") 

2127 

2128 

2129@session_wrapper 

2130def get_priority(priority: str, session: "Session | None" = None) -> Optional[Priority]: 

2131 """ 

2132 Returns Priority object for given `priority` name. 

2133 

2134 :param priority: The name of the priority 

2135 :param session: Optional SQLA session object (a temporary one will be 

2136 generated if not supplied) 

2137 :return: Priority object for the given priority 

2138 """ 

2139 

2140 assert session is not None 

2141 q = session.query(Priority).filter_by(priority=priority) 

2142 return q.one_or_none() 

2143 

2144 

2145__all__.append("get_priority") 

2146 

2147 

2148@session_wrapper 

2149def get_priorities(session: "Session | None" = None) -> dict[str, int]: 

2150 """ 

2151 Returns dictionary of priority names -> id mappings 

2152 

2153 :param session: Optional SQL session object (a temporary one will be 

2154 generated if not supplied) 

2155 :return: dictionary of priority names -> id mappings 

2156 """ 

2157 

2158 assert session is not None 

2159 ret = {} 

2160 q = session.query(Priority) 

2161 for x in q.all(): 

2162 ret[x.priority] = x.priority_id 

2163 

2164 return ret 

2165 

2166 

2167__all__.append("get_priorities") 

2168 

2169################################################################################ 

2170 

2171 

2172class Section(BaseTimestamp): 

2173 __tablename__ = "section" 

2174 

2175 section_id: Mapped[int] = mapped_column("id", primary_key=True) 

2176 section: Mapped[str] = mapped_column(unique=True) 

2177 

2178 overrides: DynamicMapped["Override"] = relationship(back_populates="section") 

2179 

2180 def __init__(self, section=None): 

2181 self.section = section 

2182 

2183 @override 

2184 def __str__(self): 

2185 return self.section 

2186 

2187 @override 

2188 def __repr__(self): 

2189 return "<{} {}>".format( 

2190 self.__class__.__name__, 

2191 self.section, 

2192 ) 

2193 

2194 @override 

2195 def __eq__(self, val: Any): 

2196 if isinstance(val, str): 

2197 warnings.warn( 

2198 "comparison with a `str` is deprecated", 

2199 DeprecationWarning, 

2200 stacklevel=2, 

2201 ) 

2202 return self.section == val 

2203 # This signals to use the normal comparison operator 

2204 return NotImplemented 

2205 

2206 @override 

2207 def __ne__(self, val: Any): 

2208 if isinstance(val, str): 2208 ↛ 2216line 2208 didn't jump to line 2216 because the condition on line 2208 was always true

2209 warnings.warn( 

2210 "comparison with a `str` is deprecated", 

2211 DeprecationWarning, 

2212 stacklevel=2, 

2213 ) 

2214 return self.section != val 

2215 # This signals to use the normal comparison operator 

2216 return NotImplemented 

2217 

2218 __hash__ = BaseTimestamp.__hash__ 

2219 

2220 

2221__all__.append("Section") 

2222 

2223 

2224@session_wrapper 

2225def get_section(section: str, session: "Session | None" = None) -> Optional[Section]: 

2226 """ 

2227 Returns Section object for given `section` name. 

2228 

2229 :param section: The name of the section 

2230 :param session: Optional SQLA session object (a temporary one will be 

2231 generated if not supplied) 

2232 :return: Section object for the given section name 

2233 """ 

2234 

2235 assert session is not None 

2236 q = session.query(Section).filter_by(section=section) 

2237 return q.one_or_none() 

2238 

2239 

2240__all__.append("get_section") 

2241 

2242 

2243@session_wrapper 

2244def get_sections(session: "Session | None" = None) -> dict[str, int]: 

2245 """ 

2246 Returns dictionary of section names -> id mappings 

2247 

2248 :param session: Optional SQL session object (a temporary one will be 

2249 generated if not supplied) 

2250 :return: dictionary of section names -> id mappings 

2251 """ 

2252 

2253 assert session is not None 

2254 ret = {} 

2255 q = session.query(Section) 

2256 for x in q.all(): 

2257 ret[x.section] = x.section_id 

2258 

2259 return ret 

2260 

2261 

2262__all__.append("get_sections") 

2263 

2264################################################################################ 

2265 

2266 

2267class SignatureHistory(ORMObject): 

2268 __tablename__ = "signature_history" 

2269 

2270 fingerprint: Mapped[str] = mapped_column(primary_key=True) 

2271 signature_timestamp: Mapped[datetime] = mapped_column( 

2272 DateTime(timezone=False), primary_key=True 

2273 ) 

2274 contents_sha1: Mapped[str] = mapped_column(primary_key=True) 

2275 seen: Mapped[datetime] = mapped_column( 

2276 DateTime(timezone=False), server_default=func.now() 

2277 ) 

2278 

2279 @classmethod 

2280 def from_signed_file( 

2281 cls, signed_file: "daklib.gpg.SignedFile" 

2282 ) -> "SignatureHistory": 

2283 """signature history entry from signed file 

2284 

2285 :param signed_file: signed file 

2286 """ 

2287 self = cls() 

2288 self.fingerprint = signed_file.primary_fingerprint 

2289 self.signature_timestamp = signed_file.signature_timestamp 

2290 self.contents_sha1 = signed_file.contents_sha1 

2291 return self 

2292 

2293 def query(self, session): 

2294 return ( 

2295 session.query(SignatureHistory) 

2296 .filter_by( 

2297 fingerprint=self.fingerprint, 

2298 signature_timestamp=self.signature_timestamp, 

2299 contents_sha1=self.contents_sha1, 

2300 ) 

2301 .first() 

2302 ) 

2303 

2304 

2305__all__.append("SignatureHistory") 

2306 

2307################################################################################ 

2308 

2309 

2310class SrcContents(ORMObject): 

2311 __tablename__ = "src_contents" 

2312 

2313 source_id: Mapped[int] = mapped_column( 

2314 ForeignKey("source.id", ondelete="CASCADE"), primary_key=True 

2315 ) 

2316 file: Mapped[str] = mapped_column(primary_key=True) 

2317 

2318 source: Mapped["DBSource"] = relationship(back_populates="contents") 

2319 

2320 def __init__(self, file=None, source=None): 

2321 self.file = file 

2322 self.source = source 

2323 

2324 @override 

2325 def properties(self) -> list[str]: 

2326 return ["file", "source"] 

2327 

2328 

2329__all__.append("SrcContents") 

2330 

2331################################################################################ 

2332 

2333 

2334class DBSource(ORMObject): 

2335 __tablename__ = "source" 

2336 __table_args = (UniqueConstraint("source", "version", name="source_source_key"),) 

2337 

2338 source_id: Mapped[int] = mapped_column("id", primary_key=True) 

2339 source: Mapped[str] 

2340 version: Mapped[str] = mapped_column(DebVersion()) 

2341 maintainer_id: Mapped[int] = mapped_column( 

2342 "maintainer", ForeignKey("maintainer.id") 

2343 ) 

2344 poolfile_id: Mapped[int] = mapped_column( 

2345 "file", ForeignKey("files.id"), unique=True 

2346 ) 

2347 fingerprint_id: Mapped[int | None] = mapped_column( 

2348 "sig_fpr", ForeignKey("fingerprint.id") 

2349 ) 

2350 install_date: Mapped[datetime] = mapped_column( 

2351 DateTime(timezone=True), server_default=func.now() 

2352 ) 

2353 changedby_id: Mapped[int] = mapped_column("changedby", ForeignKey("maintainer.id")) 

2354 dm_upload_allowed: Mapped[bool] = mapped_column(default=False) 

2355 created: Mapped[datetime | None] = mapped_column( 

2356 DateTime(timezone=True), server_default=func.now() 

2357 ) 

2358 modified: Mapped[datetime | None] = mapped_column( 

2359 DateTime(timezone=True), server_default=func.now() 

2360 ) 

2361 stanza: Mapped[str] 

2362 authorized_by_fingerprint_id: Mapped[int | None] = mapped_column( 

2363 ForeignKey("fingerprint.id") 

2364 ) 

2365 

2366 poolfile: Mapped["PoolFile"] = relationship() 

2367 fingerprint: Mapped["Fingerprint | None"] = relationship( 

2368 foreign_keys=[fingerprint_id] 

2369 ) 

2370 authorized_by_fingerprint: Mapped["Fingerprint | None"] = relationship( 

2371 foreign_keys=[authorized_by_fingerprint_id] 

2372 ) 

2373 srcfiles: Mapped[list["DSCFile"]] = relationship(back_populates="source") 

2374 suites: Mapped[list["Suite"]] = relationship( 

2375 secondary="src_associations", back_populates="sources" 

2376 ) 

2377 uploaders: Mapped[list["Maintainer"]] = relationship(secondary="src_uploaders") 

2378 key: Mapped[dict[str, "SourceMetadata"]] = relationship( 

2379 collection_class=attribute_keyed_dict("key"), 

2380 cascade="all", 

2381 back_populates="source", 

2382 ) 

2383 contents: DynamicMapped["SrcContents"] = relationship( 

2384 cascade="all", 

2385 back_populates="source", 

2386 ) 

2387 binaries: Mapped[list["DBBinary"]] = relationship(back_populates="source") 

2388 extra_binary_references: DynamicMapped["DBBinary"] = relationship( 

2389 secondary="extra_src_references", back_populates="extra_sources" 

2390 ) 

2391 maintainer: Mapped["Maintainer"] = relationship( 

2392 back_populates="maintains_sources", foreign_keys=[maintainer_id] 

2393 ) 

2394 changedby: Mapped["Maintainer"] = relationship( 

2395 back_populates="changed_sources", foreign_keys=[changedby_id] 

2396 ) 

2397 

2398 metadata_proxy: AssociationProxy[dict["MetadataKey", str]] = association_proxy( 

2399 "key", "value" 

2400 ) 

2401 

2402 def __init__( 

2403 self, 

2404 source=None, 

2405 version=None, 

2406 maintainer=None, 

2407 changedby=None, 

2408 poolfile=None, 

2409 install_date=None, 

2410 fingerprint=None, 

2411 ): 

2412 self.source = source 

2413 self.version = version 

2414 self.maintainer = maintainer 

2415 self.changedby = changedby 

2416 self.poolfile = poolfile 

2417 self.install_date = install_date 

2418 self.fingerprint = fingerprint 

2419 

2420 @property 

2421 def pkid(self) -> int: 

2422 return self.source_id 

2423 

2424 @property 

2425 def name(self) -> str: 

2426 return self.source 

2427 

2428 @property 

2429 def arch_string(self) -> str: 

2430 return "source" 

2431 

2432 @override 

2433 def properties(self) -> list[str]: 

2434 return [ 

2435 "source", 

2436 "source_id", 

2437 "maintainer", 

2438 "changedby", 

2439 "fingerprint", 

2440 "poolfile", 

2441 "version", 

2442 "suites_count", 

2443 "install_date", 

2444 "binaries_count", 

2445 "uploaders_count", 

2446 ] 

2447 

2448 def read_control_fields(self) -> Deb822: 

2449 """ 

2450 Reads the control information from a dsc 

2451 

2452 :return: fields is the dsc information in a dictionary form 

2453 """ 

2454 with open(self.poolfile.fullpath, "r") as fd: 

2455 fields = Deb822(fd) 

2456 return fields 

2457 

2458 def scan_contents(self) -> set[str]: 

2459 """ 

2460 Returns a set of names for non directories. The path names are 

2461 normalized after converting them from either utf-8 or iso8859-1 

2462 encoding. 

2463 """ 

2464 fullpath = self.poolfile.fullpath 

2465 from daklib.contents import UnpackedSource 

2466 

2467 unpacked = UnpackedSource(fullpath) 

2468 fileset = set() 

2469 for name in unpacked.get_all_filenames(): 

2470 fileset.add(name) 

2471 return fileset 

2472 

2473 @property 

2474 def proxy(self) -> "MetadataProxy": 

2475 session = object_session(self) 

2476 assert session is not None 

2477 query = session.query(SourceMetadata).filter_by(source=self) 

2478 return MetadataProxy(session, query) 

2479 

2480 

2481__all__.append("DBSource") 

2482 

2483 

2484@session_wrapper 

2485def get_suites_source_in(source: str, session=None) -> "list[Suite]": 

2486 """ 

2487 Returns list of Suite objects which given `source` name is in 

2488 

2489 :param source: DBSource package name to search for 

2490 :return: list of Suite objects for the given source 

2491 """ 

2492 

2493 return session.query(Suite).filter(Suite.sources.any(source=source)).all() 

2494 

2495 

2496__all__.append("get_suites_source_in") 

2497 

2498# FIXME: This function fails badly if it finds more than 1 source package and 

2499# its implementation is trivial enough to be inlined. 

2500 

2501 

2502@session_wrapper 

2503def get_source_in_suite( 

2504 source: str, suite_name: Optional[str], session: "Session | None" = None 

2505) -> Optional[DBSource]: 

2506 """ 

2507 Returns a DBSource object for a combination of `source` and `suite_name`. 

2508 

2509 :param source: source package name 

2510 :param suite_name: the suite name 

2511 :return: the version for `source` in `suite` 

2512 """ 

2513 assert session is not None 

2514 if suite_name is None: 2514 ↛ 2515line 2514 didn't jump to line 2515 because the condition on line 2514 was never true

2515 return None 

2516 suite = get_suite(suite_name, session) 

2517 if suite is None: 2517 ↛ 2518line 2517 didn't jump to line 2518 because the condition on line 2517 was never true

2518 return None 

2519 return suite.get_sources(source).one_or_none() 

2520 

2521 

2522__all__.append("get_source_in_suite") 

2523 

2524 

2525@session_wrapper 

2526def import_metadata_into_db( 

2527 obj: Union[DBBinary, DBSource], session: "Session | None" = None 

2528) -> None: 

2529 """ 

2530 This routine works on either DBBinary or DBSource objects and imports 

2531 their metadata into the database 

2532 """ 

2533 assert session is not None 

2534 fields = obj.read_control_fields() 

2535 for k in fields.keys(): 

2536 try: 

2537 # Try raw ASCII 

2538 val = str(fields[k]) 

2539 except UnicodeEncodeError: 

2540 # Fall back to UTF-8 

2541 try: 

2542 val = fields[k].encode("utf-8") 

2543 except UnicodeEncodeError: 

2544 # Finally try iso8859-1 

2545 val = fields[k].encode("iso8859-1") 

2546 # Otherwise we allow the exception to percolate up and we cause 

2547 # a reject as someone is playing silly buggers 

2548 

2549 obj.metadata_proxy[get_or_set_metadatakey(k, session)] = val 

2550 

2551 session.commit_or_flush() # type: ignore[attr-defined] 

2552 

2553 

2554__all__.append("import_metadata_into_db") 

2555 

2556 

2557class SrcAssociations(Base): 

2558 __tablename__ = "src_associations" 

2559 __table_args = ( 

2560 UniqueConstraint("suite", "source", name="src_associations_suite_key"), 

2561 ) 

2562 

2563 id: Mapped[int] = mapped_column(primary_key=True) 

2564 suite: Mapped[int] = mapped_column(ForeignKey("suite.id")) 

2565 source: Mapped[int] = mapped_column(ForeignKey("source.id")) 

2566 created: Mapped[datetime] = mapped_column( 

2567 DateTime(timezone=True), server_default=func.now() 

2568 ) 

2569 modified: Mapped[datetime] = mapped_column( 

2570 DateTime(timezone=True), server_default=func.now() 

2571 ) 

2572 

2573 

2574class SrcUploaders(Base): 

2575 __tablename__ = "src_uploaders" 

2576 __table_args = ( 

2577 UniqueConstraint("source", "maintainer", name="src_uploaders_source_key"), 

2578 ) 

2579 

2580 id: Mapped[int] = mapped_column(primary_key=True) 

2581 source: Mapped[int] = mapped_column(ForeignKey("source.id", ondelete="CASCADE")) 

2582 maintainer: Mapped[int] = mapped_column( 

2583 ForeignKey("maintainer.id", ondelete="CASCADE") 

2584 ) 

2585 created: Mapped[datetime] = mapped_column( 

2586 DateTime(timezone=True), server_default=func.now() 

2587 ) 

2588 modified: Mapped[datetime] = mapped_column( 

2589 DateTime(timezone=True), server_default=func.now() 

2590 ) 

2591 

2592 

2593################################################################################ 

2594 

2595 

2596class SrcFormat(Base): 

2597 __tablename__ = "src_format" 

2598 

2599 src_format_id: Mapped[int] = mapped_column("id", primary_key=True) 

2600 format_name: Mapped[str] = mapped_column(unique=True) 

2601 created: Mapped[datetime] = mapped_column( 

2602 DateTime(timezone=True), server_default=func.now() 

2603 ) 

2604 modified: Mapped[datetime] = mapped_column( 

2605 DateTime(timezone=True), server_default=func.now() 

2606 ) 

2607 

2608 suites: DynamicMapped["Suite"] = relationship( 

2609 secondary="suite_src_formats", back_populates="srcformats" 

2610 ) 

2611 

2612 def __init__(self, *args, **kwargs): 

2613 pass 

2614 

2615 @override 

2616 def __repr__(self): 

2617 return "<SrcFormat %s>" % (self.format_name) 

2618 

2619 

2620__all__.append("SrcFormat") 

2621 

2622################################################################################ 

2623 

2624SUITE_FIELDS = [ 

2625 ("SuiteName", "suite_name"), 

2626 ("SuiteID", "suite_id"), 

2627 ("Version", "version"), 

2628 ("Origin", "origin"), 

2629 ("Label", "label"), 

2630 ("Description", "description"), 

2631 ("Untouchable", "untouchable"), 

2632 ("Announce", "announce"), 

2633 ("Codename", "codename"), 

2634 ("OverrideCodename", "overridecodename"), 

2635 ("ValidTime", "validtime"), 

2636 ("Priority", "priority"), 

2637 ("NotAutomatic", "notautomatic"), 

2638 ("CopyChanges", "copychanges"), 

2639 ("OverrideSuite", "overridesuite"), 

2640] 

2641 

2642# Why the heck don't we have any UNIQUE constraints in table suite? 

2643# TODO: Add UNIQUE constraints for appropriate columns. 

2644 

2645 

2646class Suite(ORMObject): 

2647 __tablename__ = "suite" 

2648 

2649 suite_id: Mapped[int] = mapped_column("id", primary_key=True) 

2650 suite_name: Mapped[str] = mapped_column(unique=True) 

2651 version: Mapped[str | None] 

2652 origin: Mapped[str | None] 

2653 label: Mapped[str | None] 

2654 description: Mapped[str | None] 

2655 untouchable: Mapped[bool] = mapped_column(default=False) 

2656 codename: Mapped[str | None] 

2657 overridecodename: Mapped[str | None] 

2658 validtime: Mapped[int] = mapped_column(default=604800) 

2659 priority: Mapped[int] = mapped_column(default=0) 

2660 notautomatic: Mapped[bool] = mapped_column(default=False) 

2661 copychanges: Mapped[str | None] 

2662 overridesuite: Mapped[str | None] 

2663 policy_queue_id: Mapped[int | None] = mapped_column(ForeignKey("policy_queue.id")) 

2664 created: Mapped[datetime | None] = mapped_column( 

2665 DateTime(timezone=True), server_default=func.now() 

2666 ) 

2667 modified: Mapped[datetime | None] = mapped_column( 

2668 DateTime(timezone=True), server_default=func.now() 

2669 ) 

2670 changelog: Mapped[str | None] 

2671 butautomaticupgrades: Mapped[bool] = mapped_column(default=False) 

2672 signingkeys: Mapped[list[str] | None] = mapped_column(ARRAY(String())) 

2673 announce: Mapped[list[str] | None] = mapped_column(ARRAY(String())) 

2674 include_long_description: Mapped[bool] = mapped_column(default=False) 

2675 overrideprocess: Mapped[bool] = mapped_column(default=False) 

2676 overrideorigin: Mapped[str | None] 

2677 allowcsset: Mapped[bool] = mapped_column(default=False) 

2678 archive_id: Mapped[int] = mapped_column(ForeignKey("archive.id")) 

2679 new_queue_id: Mapped[int | None] = mapped_column(ForeignKey("policy_queue.id")) 

2680 close_bugs: Mapped[bool | None] 

2681 mail_whitelist: Mapped[str | None] 

2682 indices_compression: Mapped[list[str] | None] = mapped_column( 

2683 ARRAY(String()), default=["xz"] 

2684 ) 

2685 i18n_compression: Mapped[list[str] | None] = mapped_column( 

2686 ARRAY(String()), default=["xz"] 

2687 ) 

2688 release_suite: Mapped[str | None] 

2689 debugsuite_id: Mapped[int | None] = mapped_column(ForeignKey("suite.id")) 

2690 changelog_url: Mapped[str | None] 

2691 accept_source_uploads: Mapped[bool | None] = mapped_column(default=True) 

2692 accept_binary_uploads: Mapped[bool | None] = mapped_column(default=True) 

2693 checksums: Mapped[list[str] | None] = mapped_column( 

2694 ARRAY(String()), default=["sha256"] 

2695 ) 

2696 last_changed: Mapped[datetime] = mapped_column( 

2697 DateTime(timezone=False), server_default=func.now() 

2698 ) 

2699 byhash: Mapped[bool | None] = mapped_column(default=True) 

2700 separate_contents_architecture_all: Mapped[bool] = mapped_column(default=False) 

2701 separate_packages_architecture_all: Mapped[bool] = mapped_column(default=False) 

2702 merged_pdiffs: Mapped[bool] = mapped_column(default=True) 

2703 stayofexecution: Mapped[timedelta] = mapped_column(default=timedelta(hours=0)) 

2704 

2705 policy_queue: Mapped["PolicyQueue | None"] = relationship( 

2706 foreign_keys=[policy_queue_id] 

2707 ) 

2708 new_queue: Mapped["PolicyQueue | None"] = relationship(foreign_keys=[new_queue_id]) 

2709 debug_suite: Mapped["Suite | None"] = relationship(remote_side=[suite_id]) 

2710 copy_queues: Mapped[list["BuildQueue"]] = relationship( 

2711 secondary="suite_build_queue_copy" 

2712 ) 

2713 srcformats: Mapped[list["SrcFormat"]] = relationship( 

2714 secondary="suite_src_formats", back_populates="suites" 

2715 ) 

2716 archive: Mapped[Archive] = relationship(back_populates="suites") 

2717 acls: Mapped[set[ACL]] = relationship(secondary="suite_acl_map") 

2718 components: Mapped[list["Component"]] = relationship( 

2719 secondary="component_suite", 

2720 back_populates="suites", 

2721 order_by=lambda: Component.ordering, 

2722 ) 

2723 architectures: Mapped[list[Architecture]] = relationship( 

2724 secondary="suite_architectures", back_populates="suites" 

2725 ) 

2726 binaries: DynamicMapped["DBBinary"] = relationship( 

2727 secondary="bin_associations", 

2728 back_populates="suites", 

2729 ) 

2730 sources: DynamicMapped["DBSource"] = relationship( 

2731 secondary="src_associations", back_populates="suites" 

2732 ) 

2733 overrides: DynamicMapped["Override"] = relationship(back_populates="suite") 

2734 

2735 def __init__(self, suite_name=None, version=None): 

2736 self.suite_name = suite_name 

2737 self.version = version 

2738 

2739 @override 

2740 def properties(self) -> list[str]: 

2741 return [ 

2742 "suite_name", 

2743 "version", 

2744 "sources_count", 

2745 "binaries_count", 

2746 "overrides_count", 

2747 ] 

2748 

2749 @override 

2750 def __eq__(self, val: Any): 

2751 if isinstance(val, str): 2751 ↛ 2752line 2751 didn't jump to line 2752 because the condition on line 2751 was never true

2752 warnings.warn( 

2753 "comparison with a `str` is deprecated", 

2754 DeprecationWarning, 

2755 stacklevel=2, 

2756 ) 

2757 return self.suite_name == val 

2758 # This signals to use the normal comparison operator 

2759 return NotImplemented 

2760 

2761 @override 

2762 def __ne__(self, val: Any): 

2763 if isinstance(val, str): 2763 ↛ 2764line 2763 didn't jump to line 2764 because the condition on line 2763 was never true

2764 warnings.warn( 

2765 "comparison with a `str` is deprecated", 

2766 DeprecationWarning, 

2767 stacklevel=2, 

2768 ) 

2769 return self.suite_name != val 

2770 # This signals to use the normal comparison operator 

2771 return NotImplemented 

2772 

2773 __hash__ = ORMObject.__hash__ 

2774 

2775 def details(self) -> str: 

2776 ret = [] 

2777 for disp, field in SUITE_FIELDS: 

2778 val = getattr(self, field, None) 

2779 if val is not None: 

2780 ret.append("%s: %s" % (disp, val)) 

2781 

2782 return "\n".join(ret) 

2783 

2784 def get_architectures( 

2785 self, skipsrc: bool = False, skipall: bool = False 

2786 ) -> list[Architecture]: 

2787 """ 

2788 Returns list of Architecture objects 

2789 

2790 :param skipsrc: Whether to skip returning the 'source' architecture entry 

2791 :param skipall: Whether to skip returning the 'all' architecture entry 

2792 :return: list of Architecture objects for the given name (may be empty) 

2793 """ 

2794 

2795 session = object_session(self) 

2796 assert session is not None 

2797 q = session.query(Architecture).with_parent(self) 

2798 if skipsrc: 

2799 q = q.filter(Architecture.arch_string != "source") 

2800 if skipall: 

2801 q = q.filter(Architecture.arch_string != "all") 

2802 return q.order_by(Architecture.arch_string).all() 

2803 

2804 def get_sources(self, source: str) -> sqlalchemy.orm.query.Query: 

2805 """ 

2806 Returns a query object representing DBSource that is part of this suite. 

2807 

2808 :param source: source package name 

2809 :return: a query of DBSource 

2810 """ 

2811 

2812 session = object_session(self) 

2813 assert session is not None 

2814 return session.query(DBSource).filter_by(source=source).with_parent(self) 

2815 

2816 def get_overridesuite(self) -> "Suite": 

2817 if self.overridesuite is None: 

2818 return self 

2819 session = object_session(self) 

2820 assert session is not None 

2821 return session.query(Suite).filter_by(suite_name=self.overridesuite).one() 

2822 

2823 def update_last_changed(self) -> None: 

2824 self.last_changed = sqlalchemy.func.now() 

2825 

2826 @property 

2827 def path(self) -> str: 

2828 return os.path.join(self.archive.path, "dists", self.suite_name) 

2829 

2830 @property 

2831 def release_suite_output(self) -> str: 

2832 if self.release_suite is not None: 2832 ↛ 2833line 2832 didn't jump to line 2833 because the condition on line 2832 was never true

2833 return self.release_suite 

2834 return self.suite_name 

2835 

2836 

2837__all__.append("Suite") 

2838 

2839 

2840@session_wrapper 

2841def get_suite(suite: str, session: "Session | None" = None) -> Optional[Suite]: 

2842 """ 

2843 Returns Suite object for given `suite` name. 

2844 

2845 :param suite: The name of the suite 

2846 :param session: Optional SQLA session object (a temporary one will be 

2847 generated if not supplied) 

2848 :return: Suite object for the requested suite name (None if not present) 

2849 """ 

2850 

2851 assert session is not None 

2852 

2853 # Start by looking for the dak internal name 

2854 q = session.query(Suite).filter_by(suite_name=suite) 

2855 try: 

2856 return q.one() 

2857 except NoResultFound: 

2858 pass 

2859 

2860 # Now try codename 

2861 q = session.query(Suite).filter_by(codename=suite) 

2862 try: 

2863 return q.one() 

2864 except NoResultFound: 

2865 pass 

2866 

2867 # Finally give release_suite a try 

2868 q = session.query(Suite).filter_by(release_suite=suite) 

2869 return q.one_or_none() 

2870 

2871 

2872__all__.append("get_suite") 

2873 

2874 

2875class SuiteBuildQueueCopy(Base): 

2876 __tablename__ = "suite_build_queue_copy" 

2877 

2878 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"), primary_key=True) 

2879 build_queue_id: Mapped[int] = mapped_column( 

2880 ForeignKey("build_queue.id"), primary_key=True 

2881 ) 

2882 created: Mapped[datetime] = mapped_column( 

2883 DateTime(timezone=True), server_default=func.now() 

2884 ) 

2885 modified: Mapped[datetime] = mapped_column( 

2886 DateTime(timezone=True), server_default=func.now() 

2887 ) 

2888 

2889 

2890class SuiteSrcFormats(Base): 

2891 __tablename__ = "suite_src_formats" 

2892 

2893 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"), primary_key=True) 

2894 src_format: Mapped[int] = mapped_column( 

2895 ForeignKey("src_format.id"), primary_key=True 

2896 ) 

2897 created: Mapped[datetime] = mapped_column( 

2898 DateTime(timezone=True), server_default=func.now() 

2899 ) 

2900 modified: Mapped[datetime] = mapped_column( 

2901 DateTime(timezone=True), server_default=func.now() 

2902 ) 

2903 

2904 

2905class SuiteAclMap(Base): 

2906 __tablename__ = "suite_acl_map" 

2907 

2908 suite_id: Mapped[int] = mapped_column( 

2909 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True 

2910 ) 

2911 acl_id: Mapped[int] = mapped_column(ForeignKey("acl.id"), primary_key=True) 

2912 

2913 

2914class SuiteArchitectures(Base): 

2915 __tablename__ = "suite_architectures" 

2916 

2917 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"), primary_key=True) 

2918 architecture: Mapped[int] = mapped_column( 

2919 ForeignKey("architecture.id"), primary_key=True 

2920 ) 

2921 created: Mapped[datetime] = mapped_column( 

2922 DateTime(timezone=True), server_default=func.now() 

2923 ) 

2924 modified: Mapped[datetime] = mapped_column( 

2925 DateTime(timezone=True), server_default=func.now() 

2926 ) 

2927 

2928 

2929################################################################################ 

2930 

2931 

2932@session_wrapper 

2933def get_suite_architectures( 

2934 suite_name: str, 

2935 skipsrc: bool = False, 

2936 skipall: bool = False, 

2937 session: "Session | None" = None, 

2938) -> list[Architecture]: 

2939 """ 

2940 Returns list of Architecture objects for given `suite` name. The list is 

2941 empty if `suite` does not exist. 

2942 

2943 :param suite_name: Suite name to search for 

2944 :param skipsrc: Whether to skip returning the 'source' architecture entry 

2945 :param skipall: Whether to skip returning the 'all' architecture entry 

2946 :param session: Optional SQL session object (a temporary one will be 

2947 generated if not supplied) 

2948 :return: list of Architecture objects for the given name (may be empty) 

2949 """ 

2950 

2951 assert session is not None 

2952 suite = get_suite(suite_name, session) 

2953 if suite is None: 2953 ↛ 2954line 2953 didn't jump to line 2954 because the condition on line 2953 was never true

2954 return [] 

2955 return suite.get_architectures(skipsrc, skipall) 

2956 

2957 

2958__all__.append("get_suite_architectures") 

2959 

2960################################################################################ 

2961 

2962 

2963class Uid(ORMObject): 

2964 __tablename__ = "uid" 

2965 

2966 uid_id: Mapped[int] = mapped_column("id", primary_key=True) 

2967 uid: Mapped[str] = mapped_column(unique=True) 

2968 name: Mapped[str | None] 

2969 created: Mapped[datetime | None] = mapped_column( 

2970 DateTime(timezone=True), server_default=func.now() 

2971 ) 

2972 modified: Mapped[datetime | None] = mapped_column( 

2973 DateTime(timezone=True), server_default=func.now() 

2974 ) 

2975 

2976 fingerprint: Mapped[list["Fingerprint"]] = relationship(back_populates="uid") 

2977 

2978 def __init__(self, uid=None, name=None): 

2979 self.uid = uid 

2980 self.name = name 

2981 

2982 @override 

2983 def __eq__(self, val: Any): 

2984 if isinstance(val, str): 

2985 warnings.warn( 

2986 "comparison with a `str` is deprecated", 

2987 DeprecationWarning, 

2988 stacklevel=2, 

2989 ) 

2990 return self.uid == val 

2991 # This signals to use the normal comparison operator 

2992 return NotImplemented 

2993 

2994 @override 

2995 def __ne__(self, val: Any): 

2996 if isinstance(val, str): 

2997 warnings.warn( 

2998 "comparison with a `str` is deprecated", 

2999 DeprecationWarning, 

3000 stacklevel=2, 

3001 ) 

3002 return self.uid != val 

3003 # This signals to use the normal comparison operator 

3004 return NotImplemented 

3005 

3006 __hash__ = ORMObject.__hash__ 

3007 

3008 @override 

3009 def properties(self) -> list[str]: 

3010 return ["uid", "name", "fingerprint"] 

3011 

3012 

3013__all__.append("Uid") 

3014 

3015 

3016@session_wrapper 

3017def get_or_set_uid(uidname: str, session: "Session | None" = None) -> Uid: 

3018 """ 

3019 Returns uid object for given uidname. 

3020 

3021 If no matching uidname is found, a row is inserted. 

3022 

3023 :param uidname: The uid to add 

3024 :param session: Optional SQL session object (a temporary one will be 

3025 generated if not supplied). If not passed, a commit will be performed at 

3026 the end of the function, otherwise the caller is responsible for commiting. 

3027 :return: the uid object for the given uidname 

3028 """ 

3029 

3030 assert session is not None 

3031 q = session.query(Uid).filter_by(uid=uidname) 

3032 

3033 try: 

3034 ret = q.one() 

3035 except NoResultFound: 

3036 uid = Uid() 

3037 uid.uid = uidname 

3038 session.add(uid) 

3039 session.commit_or_flush() # type: ignore[attr-defined] 

3040 ret = uid 

3041 

3042 return ret 

3043 

3044 

3045__all__.append("get_or_set_uid") 

3046 

3047 

3048@session_wrapper 

3049def get_uid_from_fingerprint( 

3050 fpr: str, session: "Session | None" = None 

3051) -> Optional[Uid]: 

3052 assert session is not None 

3053 q = session.query(Uid) 

3054 q = q.join(Fingerprint).filter_by(fingerprint=fpr) 

3055 

3056 return q.one_or_none() 

3057 

3058 

3059__all__.append("get_uid_from_fingerprint") 

3060 

3061################################################################################ 

3062 

3063 

3064class MetadataKey(ORMObject): 

3065 __tablename__ = "metadata_keys" 

3066 

3067 key_id: Mapped[int] = mapped_column(primary_key=True) 

3068 key: Mapped[str] = mapped_column(unique=True) 

3069 ordering: Mapped[int] = mapped_column(default=0) 

3070 

3071 def __init__(self, key=None): 

3072 self.key = key 

3073 

3074 @override 

3075 def properties(self) -> list[str]: 

3076 return ["key"] 

3077 

3078 

3079__all__.append("MetadataKey") 

3080 

3081 

3082@session_wrapper 

3083def get_or_set_metadatakey( 

3084 keyname: str, session: "Session | None" = None 

3085) -> MetadataKey: 

3086 """ 

3087 Returns MetadataKey object for given uidname. 

3088 

3089 If no matching keyname is found, a row is inserted. 

3090 

3091 :param keyname: The keyname to add 

3092 :param session: Optional SQL session object (a temporary one will be 

3093 generated if not supplied). If not passed, a commit will be performed at 

3094 the end of the function, otherwise the caller is responsible for commiting. 

3095 :return: the metadatakey object for the given keyname 

3096 """ 

3097 

3098 assert session is not None 

3099 q = session.query(MetadataKey).filter_by(key=keyname) 

3100 

3101 try: 

3102 ret = q.one() 

3103 except NoResultFound: 

3104 ret = MetadataKey(keyname) 

3105 session.add(ret) 

3106 session.commit_or_flush() # type: ignore[attr-defined] 

3107 

3108 return ret 

3109 

3110 

3111__all__.append("get_or_set_metadatakey") 

3112 

3113################################################################################ 

3114 

3115 

3116class BinaryMetadata(ORMObject): 

3117 __tablename__ = "binaries_metadata" 

3118 

3119 binary_id: Mapped[int] = mapped_column( 

3120 "bin_id", ForeignKey("binaries.id", ondelete="CASCADE"), primary_key=True 

3121 ) 

3122 key_id: Mapped[int] = mapped_column( 

3123 ForeignKey("metadata_keys.key_id"), primary_key=True 

3124 ) 

3125 value: Mapped[str] 

3126 

3127 binary: Mapped["DBBinary"] = relationship(back_populates="key") 

3128 key: Mapped[MetadataKey] = relationship() 

3129 

3130 def __init__(self, key=None, value=None, binary=None): 

3131 self.key = key 

3132 self.value = value 

3133 if binary is not None: 

3134 self.binary = binary 

3135 

3136 @override 

3137 def properties(self) -> list[str]: 

3138 return ["binary", "key", "value"] 

3139 

3140 

3141__all__.append("BinaryMetadata") 

3142 

3143################################################################################ 

3144 

3145 

3146class SourceMetadata(ORMObject): 

3147 __tablename__ = "source_metadata" 

3148 

3149 source_id: Mapped[int] = mapped_column( 

3150 "src_id", ForeignKey("source.id", ondelete="CASCADE"), primary_key=True 

3151 ) 

3152 key_id: Mapped[int] = mapped_column( 

3153 ForeignKey("metadata_keys.key_id"), primary_key=True 

3154 ) 

3155 value: Mapped[str] 

3156 

3157 source: Mapped["DBSource"] = relationship(back_populates="key") 

3158 key: Mapped[MetadataKey] = relationship() 

3159 

3160 def __init__(self, key=None, value=None, source=None): 

3161 self.key = key 

3162 self.value = value 

3163 if source is not None: 

3164 self.source = source 

3165 

3166 @override 

3167 def properties(self) -> list[str]: 

3168 return ["source", "key", "value"] 

3169 

3170 

3171__all__.append("SourceMetadata") 

3172 

3173################################################################################ 

3174 

3175 

3176class MetadataProxy: 

3177 def __init__(self, session, query): 

3178 self.session = session 

3179 self.query = query 

3180 

3181 def _get(self, key): 

3182 metadata_key = self.session.query(MetadataKey).filter_by(key=key).first() 

3183 if metadata_key is None: 

3184 return None 

3185 metadata = self.query.filter_by(key=metadata_key).first() 

3186 return metadata 

3187 

3188 def __contains__(self, key: str) -> bool: 

3189 if self._get(key) is not None: 3189 ↛ 3191line 3189 didn't jump to line 3191 because the condition on line 3189 was always true

3190 return True 

3191 return False 

3192 

3193 def __getitem__(self, key: str) -> str: 

3194 metadata = self._get(key) 

3195 if metadata is None: 

3196 raise KeyError 

3197 return metadata.value 

3198 

3199 @overload 

3200 def get(self, key: str, default: str) -> str: ... # noqa: E704 3200 ↛ exitline 3200 didn't return from function 'get' because

3201 

3202 @overload 

3203 def get(self, key: str, default: None = None) -> str | None: ... # noqa: E704 3203 ↛ exitline 3203 didn't return from function 'get' because

3204 

3205 def get(self, key, default=None): 

3206 try: 

3207 return self[key] 

3208 except KeyError: 

3209 return default 

3210 

3211 

3212################################################################################ 

3213 

3214 

3215class VersionCheck(ORMObject): 

3216 __tablename__ = "version_check" 

3217 

3218 suite_id: Mapped[int] = mapped_column( 

3219 "suite", ForeignKey("suite.id"), primary_key=True 

3220 ) 

3221 check: Mapped[str] = mapped_column(primary_key=True) 

3222 reference_id: Mapped[int] = mapped_column( 

3223 "reference", ForeignKey("suite.id"), primary_key=True 

3224 ) 

3225 

3226 suite: Mapped["Suite"] = relationship(foreign_keys=[suite_id]) 

3227 reference: Mapped["Suite"] = relationship( 

3228 foreign_keys=[reference_id], lazy="joined" 

3229 ) 

3230 

3231 def __init__(self, *args, **kwargs): 

3232 pass 

3233 

3234 @override 

3235 def properties(self) -> list[str]: 

3236 return ["check"] 

3237 

3238 

3239__all__.append("VersionCheck") 

3240 

3241 

3242@session_wrapper 

3243def get_version_checks( 

3244 suite_name: str, check: Optional[str] = None, session: "Session | None" = None 

3245) -> list[VersionCheck]: 

3246 assert session is not None 

3247 suite = get_suite(suite_name, session) 

3248 if not suite: 3248 ↛ 3251line 3248 didn't jump to line 3251 because the condition on line 3248 was never true

3249 # Make sure that what we return is iterable so that list comprehensions 

3250 # involving this don't cause a traceback 

3251 return [] 

3252 q = session.query(VersionCheck).filter_by(suite=suite) 

3253 if check: 3253 ↛ 3255line 3253 didn't jump to line 3255 because the condition on line 3253 was always true

3254 q = q.filter_by(check=check) 

3255 return q.all() 

3256 

3257 

3258__all__.append("get_version_checks") 

3259 

3260################################################################################ 

3261 

3262 

3263class ExternalSignatureRequests(Base): 

3264 __tablename__ = "external_signature_requests" 

3265 

3266 association_id: Mapped[int] = mapped_column( 

3267 ForeignKey("bin_associations.id", ondelete="CASCADE"), primary_key=True 

3268 ) 

3269 suite_id: Mapped[int] = mapped_column( 

3270 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True 

3271 ) 

3272 

3273 

3274################################################################################ 

3275 

3276package_list = Table( 

3277 "package_list", 

3278 Base.metadata, 

3279 Column("package", String()), 

3280 Column("version", DebVersion()), 

3281 Column("source", String()), 

3282 Column("source_version", DebVersion()), 

3283 Column("suite", String()), 

3284 Column("codename", String()), 

3285 Column("archive", String()), 

3286 Column("component", String()), 

3287 Column("display_suite", String()), 

3288 Column("architecture_is_source", Boolean()), 

3289 Column("architecture", String()), 

3290 Column("type", String()), 

3291) 

3292 

3293################################################################################ 

3294 

3295 

3296class DBConn: 

3297 """ 

3298 database module init. 

3299 """ 

3300 

3301 __shared_state: dict[str, Any] = {} 

3302 

3303 def __init__(self, *, debug=False) -> None: 

3304 self.__dict__ = self.__shared_state 

3305 

3306 if not getattr(self, "initialised", False): 

3307 self.initialised = True 

3308 self.debug = debug 

3309 self.__createconn() 

3310 

3311 ## Connection functions 

3312 def __createconn(self) -> None: 

3313 from .config import Config 

3314 

3315 cnf = Config() 

3316 if "DB::Service" in cnf: 3316 ↛ 3317line 3316 didn't jump to line 3317 because the condition on line 3316 was never true

3317 connstr = "postgresql://service=%s" % cnf["DB::Service"] 

3318 elif "DB::Host" in cnf: 

3319 # TCP/IP 

3320 connstr = "postgresql://%s" % cnf["DB::Host"] 

3321 if "DB::Port" in cnf and cnf["DB::Port"] != "-1": 3321 ↛ 3322line 3321 didn't jump to line 3322 because the condition on line 3321 was never true

3322 connstr += ":%s" % cnf["DB::Port"] 

3323 connstr += "/%s" % cnf["DB::Name"] 

3324 else: 

3325 # Unix Socket 

3326 connstr = "postgresql:///%s" % cnf["DB::Name"] 

3327 if "DB::Port" in cnf and cnf["DB::Port"] != "-1": 3327 ↛ 3328line 3327 didn't jump to line 3328 because the condition on line 3327 was never true

3328 connstr += "?port=%s" % cnf["DB::Port"] 

3329 

3330 engine_args: dict[str, object] = {"echo": self.debug} 

3331 if "DB::PoolSize" in cnf: 

3332 engine_args["pool_size"] = int(cnf["DB::PoolSize"]) 

3333 if "DB::MaxOverflow" in cnf: 

3334 engine_args["max_overflow"] = int(cnf["DB::MaxOverflow"]) 

3335 # we don't support non-utf-8 connections 

3336 engine_args["client_encoding"] = "utf-8" 

3337 

3338 # Monkey patch a new dialect in in order to support service= syntax 

3339 import sqlalchemy.dialects.postgresql 

3340 from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2 

3341 

3342 class PGDialect_psycopg2_dak(PGDialect_psycopg2): 

3343 @override 

3344 def create_connect_args(self, url): 

3345 if str(url).startswith("postgresql://service="): 3345 ↛ 3347line 3345 didn't jump to line 3347 because the condition on line 3345 was never true

3346 # Eww 

3347 servicename = str(url)[21:] 

3348 return (["service=%s" % servicename], {}) 

3349 else: 

3350 return PGDialect_psycopg2.create_connect_args(self, url) 

3351 

3352 sqlalchemy.dialects.postgresql.base.dialect = PGDialect_psycopg2_dak # type: ignore[attr-defined] 

3353 

3354 try: 

3355 self.db_pg = create_engine(connstr, **engine_args) 

3356 self.db_smaker = sessionmaker( 

3357 bind=self.db_pg, autoflush=True, autocommit=False 

3358 ) 

3359 except OperationalError as e: 

3360 from . import utils 

3361 

3362 utils.fubar("Cannot connect to database (%s)" % str(e)) 

3363 

3364 self.pid = os.getpid() 

3365 

3366 def session(self, work_mem=0) -> "Session": 

3367 """ 

3368 Returns a new session object. If a work_mem parameter is provided a new 

3369 transaction is started and the work_mem parameter is set for this 

3370 transaction. The work_mem parameter is measured in MB. A default value 

3371 will be used if the parameter is not set. 

3372 """ 

3373 # reinitialize DBConn in new processes 

3374 if self.pid != os.getpid(): 

3375 self.__createconn() 

3376 session = self.db_smaker() 

3377 if work_mem > 0: 

3378 session.execute(sql.text("SET LOCAL work_mem TO '%d MB'" % work_mem)) 

3379 return session 

3380 

3381 

3382__all__.append("DBConn")