Coverage for daklib/dbconn.py: 87%
1553 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1"""DB access class
3@contact: Debian FTPMaster <ftpmaster@debian.org>
4@copyright: 2000, 2001, 2002, 2003, 2004, 2006 James Troup <james@nocrew.org>
5@copyright: 2008-2009 Mark Hymers <mhy@debian.org>
6@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org>
7@copyright: 2009 Mike O'Connor <stew@debian.org>
8@license: GNU General Public License version 2 or later
9"""
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25################################################################################
27# < mhy> I need a funny comment
28# < sgran> two peanuts were walking down a dark street
29# < sgran> one was a-salted
30# * mhy looks up the definition of "funny"
32################################################################################
34import functools
35import inspect
36import os
37import re
38import subprocess
39import warnings
40from collections.abc import Callable, Iterable
41from datetime import datetime, timedelta
42from os.path import normpath
43from tarfile import TarFile
44from typing import TYPE_CHECKING, Any, Optional, TypedDict, Union, overload, override
46import apt_pkg
47import sqlalchemy
48import sqlalchemy.types
49from debian.deb822 import Deb822
50from sqlalchemy import (
51 Column,
52 ForeignKey,
53 Table,
54 UniqueConstraint,
55 create_engine,
56 desc,
57 sql,
58)
59from sqlalchemy.dialects.postgresql import ARRAY
61# Don't remove this, we re-export the exceptions to scripts which import us
62from sqlalchemy.exc import IntegrityError, OperationalError, SAWarning, SQLAlchemyError
63from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy
64from sqlalchemy.orm import (
65 DynamicMapped,
66 Mapped,
67 attribute_keyed_dict,
68 mapped_column,
69 object_session,
70 relationship,
71 sessionmaker,
72)
73from sqlalchemy.orm.exc import NoResultFound
74from sqlalchemy.sql import func
75from sqlalchemy.types import CHAR, BigInteger, Boolean, DateTime, Integer, String, Text
77import daklib.gpg
79from .aptversion import AptVersion
81# Only import Config until Queue stuff is changed to store its config
82# in the database
83from .config import Config
84from .textutils import fix_maintainer
86# suppress some deprecation warnings in squeeze related to sqlalchemy
87warnings.filterwarnings(
88 "ignore", "Predicate of partial index .* ignored during reflection", SAWarning
89)
91# (Debian 12 "bookworm") Silence warning targeted at SQLAlchemy dialect maintainers
92warnings.filterwarnings(
93 "ignore",
94 "Dialect postgresql:psycopg2 will not make use of SQL compilation caching.*",
95 SAWarning,
96)
98from .database.base import Base, BaseTimestamp
100if TYPE_CHECKING:
101 import sqlalchemy.orm.query
102 from sqlalchemy.orm import Session
105################################################################################
108class DebVersion(sqlalchemy.types.UserDefinedType):
109 @override
110 def get_col_spec(self, **kw: Any) -> str:
111 return "DEBVERSION"
113 @override
114 def bind_processor(self, dialect):
115 return None
117 @override
118 def result_processor(self, dialect, coltype):
119 return None
122################################################################################
124__all__ = ["IntegrityError", "SQLAlchemyError", "DebVersion"]
126################################################################################
129def session_wrapper[**P, R](fn: Callable[P, R]) -> Callable[P, R]:
130 """
131 Wrapper around common ".., session=None):" handling. If the wrapped
132 function is called without passing 'session', we create a local one
133 and destroy it when the function ends.
135 Also attaches a commit_or_flush method to the session; if we created a
136 local session, this is a synonym for session.commit(), otherwise it is a
137 synonym for session.flush().
138 """
140 @functools.wraps(fn)
141 def wrapped(*args, **kwargs):
142 private_transaction = False
144 # Find the session object
145 session = kwargs.get("session")
147 if session is None:
148 if len(args) < len(inspect.getfullargspec(fn).args):
149 # No session specified as last argument or in kwargs
150 private_transaction = True
151 kwargs["session"] = session = DBConn().session()
152 else:
153 # Session is last argument in args
154 session = args[-1]
155 if session is None: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 session = DBConn().session()
157 args = (*args[:-1], session)
158 private_transaction = True
160 if private_transaction:
161 session.commit_or_flush = session.commit # type: ignore[union-attr]
162 else:
163 session.commit_or_flush = session.flush # type: ignore[union-attr]
165 try:
166 return fn(*args, **kwargs)
167 finally:
168 if private_transaction:
169 # We created a session; close it.
170 session.close()
172 return wrapped
175__all__.append("session_wrapper")
177################################################################################
180class ORMObject(Base):
181 """
182 ORMObject is a base class for all ORM classes mapped by SQLalchemy. All
183 derived classes must implement the properties() method.
184 """
186 __abstract__ = True
188 def properties(self) -> list[str]:
189 """
190 This method should be implemented by all derived classes and returns a
191 list of the important properties. The properties 'created' and
192 'modified' will be added automatically. A suffix '_count' should be
193 added to properties that are lists or query objects. The most important
194 property name should be returned as the first element in the list
195 because it is used by repr().
196 """
197 return []
199 def classname(self) -> str:
200 """
201 Returns the name of the class.
202 """
203 return type(self).__name__
205 @override
206 def __repr__(self):
207 """
208 Returns a short string representation of the object using the first
209 element from the properties() method.
210 """
211 primary_property = self.properties()[0]
212 value = getattr(self, primary_property)
213 return "<%s %s>" % (self.classname(), str(value))
215 @override
216 def __str__(self):
217 """
218 Returns a human readable form of the object using the properties()
219 method.
220 """
221 return "<%s(...)>" % (self.classname())
223 @classmethod
224 @session_wrapper
225 def get(cls, primary_key, session: "Session | None" = None):
226 """
227 This is a support function that allows getting an object by its primary
228 key.
230 Architecture.get(3[, session])
232 instead of the more verbose
234 session.query(Architecture).get(3)
235 """
236 assert session is not None
237 return session.query(cls).get(primary_key)
239 def session(self) -> "Session | None":
240 """
241 Returns the current session that is associated with the object. May
242 return None is object is in detached state.
243 """
245 return object_session(self)
248__all__.append("ORMObject")
250################################################################################
253class ACL(ORMObject):
254 __tablename__ = "acl"
256 id: Mapped[int] = mapped_column(primary_key=True)
257 name: Mapped[str] = mapped_column() # TODO: add unique=True
258 is_global: Mapped[bool] = mapped_column(default=False)
259 match_fingerprint: Mapped[bool] = mapped_column(default=False)
260 match_keyring_id: Mapped[int | None] = mapped_column(ForeignKey("keyrings.id"))
261 allow_new: Mapped[bool] = mapped_column(default=False)
262 allow_source: Mapped[bool] = mapped_column(default=False)
263 allow_binary: Mapped[bool] = mapped_column(default=False)
264 allow_binary_all: Mapped[bool] = mapped_column(default=False)
265 allow_binary_only: Mapped[bool] = mapped_column(default=False)
266 allow_hijack: Mapped[bool] = mapped_column(default=False)
267 allow_per_source: Mapped[bool] = mapped_column(default=False)
268 deny_per_source: Mapped[bool] = mapped_column(default=False)
270 architectures: Mapped[set["Architecture"]] = relationship(
271 secondary="acl_architecture_map"
272 )
273 fingerprints: Mapped[set["Fingerprint"]] = relationship(
274 secondary="acl_fingerprint_map"
275 )
276 match_keyring: Mapped["Keyring"] = relationship(foreign_keys=[match_keyring_id])
277 per_source: Mapped[set["ACLPerSource"]] = relationship(back_populates="acl")
278 per_suite: Mapped[set["ACLPerSuite"]] = relationship(back_populates="acl")
280 @override
281 def __repr__(self):
282 return "<ACL {0}>".format(self.name)
285__all__.append("ACL")
288class AclArchitectureMap(Base):
289 __tablename__ = "acl_architecture_map"
291 acl_id: Mapped[int] = mapped_column(
292 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True
293 )
294 architecture_id: Mapped[int] = mapped_column(
295 ForeignKey("architecture.id", ondelete="CASCADE"), primary_key=True
296 )
299class AclFingerprintMap(Base):
300 __tablename__ = "acl_fingerprint_map"
302 acl_id: Mapped[int] = mapped_column(
303 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True
304 )
305 fingerprint_id: Mapped[int] = mapped_column(
306 ForeignKey("fingerprint.id", ondelete="CASCADE"), primary_key=True
307 )
310class ACLPerSource(ORMObject):
311 __tablename__ = "acl_per_source"
313 acl_id: Mapped[int] = mapped_column(
314 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True
315 )
316 fingerprint_id: Mapped[int] = mapped_column(
317 ForeignKey("fingerprint.id", ondelete="CASCADE"), primary_key=True
318 )
319 source: Mapped[str] = mapped_column(primary_key=True)
320 reason: Mapped[str | None] = mapped_column(default=None)
321 created_by_id: Mapped[int | None] = mapped_column(ForeignKey("fingerprint.id"))
322 created: Mapped[datetime] = mapped_column(
323 DateTime(timezone=False), server_default=func.now()
324 )
326 acl: Mapped[ACL] = relationship(back_populates="per_source")
327 fingerprint: Mapped["Fingerprint"] = relationship(foreign_keys=[fingerprint_id])
328 created_by: Mapped["Fingerprint"] = relationship(foreign_keys=[created_by_id])
330 @override
331 def __repr__(self):
332 return "<ACLPerSource acl={0} fingerprint={1} source={2} reason={3}>".format(
333 self.acl.name, self.fingerprint.fingerprint, self.source, self.reason
334 )
337__all__.append("ACLPerSource")
340class ACLPerSuite(ORMObject):
341 __tablename__ = "acl_per_suite"
343 acl_id: Mapped[int] = mapped_column(
344 ForeignKey("acl.id", ondelete="CASCADE"), primary_key=True
345 )
346 fingerprint_id: Mapped[int] = mapped_column(
347 ForeignKey("fingerprint.id", ondelete="CASCADE"), primary_key=True
348 )
349 suite_id: Mapped[int] = mapped_column(
350 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True
351 )
352 reason: Mapped[str | None] = mapped_column(default=None)
353 created_by_id: Mapped[int | None] = mapped_column(ForeignKey("fingerprint.id"))
354 created: Mapped[datetime] = mapped_column(
355 DateTime(timezone=False), server_default=func.now()
356 )
358 acl: Mapped[ACL] = relationship(back_populates="per_suite")
359 fingerprint: Mapped["Fingerprint"] = relationship(foreign_keys=[fingerprint_id])
360 suite: Mapped["Suite"] = relationship()
361 created_by: Mapped["Fingerprint"] = relationship(foreign_keys=[created_by_id])
363 @override
364 def __repr__(self):
365 return "<ACLPerSuite acl={0} fingerprint={1} suite={2} reason={3}>".format(
366 self.acl.name,
367 self.fingerprint.fingerprint,
368 self.suite.suite_name,
369 self.reason,
370 )
373__all__.append("ACLPerSuite")
375################################################################################
378class Architecture(BaseTimestamp):
379 __tablename__ = "architecture"
381 arch_id: Mapped[int] = mapped_column("id", Integer(), primary_key=True)
382 arch_string: Mapped[str] = mapped_column(Text(), unique=True)
383 description: Mapped[str]
385 suites: Mapped[list["Suite"]] = relationship(
386 secondary="suite_architectures", back_populates="architectures"
387 )
389 def __init__(self, arch_string=None, description=None):
390 self.arch_string = arch_string
391 self.description = description
393 @override
394 def __str__(self):
395 return self.arch_string
397 @override
398 def __repr__(self):
399 return "<{} {}>".format(
400 self.__class__.__name__,
401 self.arch_string,
402 )
404 @override
405 def __eq__(self, val):
406 if isinstance(val, str):
407 warnings.warn(
408 "comparison with a `str` is deprecated",
409 DeprecationWarning,
410 stacklevel=2,
411 )
412 return self.arch_string == val
413 # This signals to use the normal comparison operator
414 return NotImplemented
416 @override
417 def __ne__(self, val):
418 if isinstance(val, str): 418 ↛ 426line 418 didn't jump to line 426 because the condition on line 418 was always true
419 warnings.warn(
420 "comparison with a `str` is deprecated",
421 DeprecationWarning,
422 stacklevel=2,
423 )
424 return self.arch_string != val
425 # This signals to use the normal comparison operator
426 return NotImplemented
428 __hash__ = BaseTimestamp.__hash__
431__all__.append("Architecture")
434@session_wrapper
435def get_architecture(
436 architecture: str, session: "Session | None" = None
437) -> Optional[Architecture]:
438 """
439 Returns database id for given `architecture`.
441 :param architecture: The name of the architecture
442 :param session: Optional SQLA session object (a temporary one will be
443 generated if not supplied)
444 :return: Architecture object for the given arch (None if not present)
445 """
447 assert session is not None
448 q = session.query(Architecture).filter_by(arch_string=architecture)
449 return q.one_or_none()
452__all__.append("get_architecture")
454################################################################################
457class Archive(Base):
458 __tablename__ = "archive"
460 archive_id: Mapped[int] = mapped_column("id", primary_key=True)
461 archive_name: Mapped[str] = mapped_column("name", unique=True)
462 description: Mapped[str | None] = mapped_column(default=None)
463 created: Mapped[datetime] = mapped_column(
464 DateTime(timezone=True), server_default=func.now()
465 )
466 modified: Mapped[datetime] = mapped_column(
467 DateTime(timezone=True), server_default=func.now()
468 )
469 primary_mirror: Mapped[str | None] = mapped_column(default=None)
470 path: Mapped[str]
471 mode: Mapped[str] = mapped_column(CHAR(length=4), default="0644")
472 tainted: Mapped[bool] = mapped_column(default=False)
473 use_morgue: Mapped[bool] = mapped_column(default=True)
474 stayofexecution: Mapped[timedelta] = mapped_column(default=timedelta(hours=36))
475 changelog: Mapped[str | None]
477 files: Mapped[list["ArchiveFile"]] = relationship(back_populates="archive")
478 suites: Mapped[list["Suite"]] = relationship(back_populates="archive")
480 def __init__(self, *args, **kwargs):
481 pass
483 @override
484 def __repr__(self):
485 return "<Archive %s>" % self.archive_name
488__all__.append("Archive")
491@session_wrapper
492def get_archive(archive: str, session: "Session | None" = None) -> Optional[Archive]:
493 """
494 returns database id for given `archive`.
496 :param archive: the name of the arhive
497 :param session: Optional SQLA session object (a temporary one will be
498 generated if not supplied)
499 :return: Archive object for the given name (None if not present)
500 """
502 assert session is not None
503 archive = archive.lower()
504 q = session.query(Archive).filter_by(archive_name=archive)
505 return q.one_or_none()
508__all__.append("get_archive")
510################################################################################
513class ArchiveFile(Base):
514 __tablename__ = "files_archive_map"
516 file_id: Mapped[int] = mapped_column(ForeignKey("files.id"), primary_key=True)
517 archive_id: Mapped[int] = mapped_column(ForeignKey("archive.id"), primary_key=True)
518 component_id: Mapped[int] = mapped_column(
519 ForeignKey("component.id"), primary_key=True
520 )
521 last_used: Mapped[datetime | None] = mapped_column(
522 DateTime(timezone=False), default=None
523 )
524 created: Mapped[datetime] = mapped_column(
525 DateTime(timezone=False), server_default=func.now()
526 )
528 archive: Mapped[Archive] = relationship(back_populates="files")
529 component: Mapped["Component"] = relationship()
530 file: Mapped["PoolFile"] = relationship(back_populates="archives")
532 def __init__(self, archive=None, component=None, file=None):
533 self.archive = archive
534 self.component = component
535 self.file = file
537 @property
538 def path(self):
539 return os.path.join(
540 self.archive.path, "pool", self.component.component_name, self.file.filename
541 )
544__all__.append("ArchiveFile")
546################################################################################
549class BinContents(ORMObject):
550 __tablename__ = "bin_contents"
552 binary_id: Mapped[int] = mapped_column(
553 ForeignKey("binaries.id", ondelete="CASCADE"), primary_key=True
554 )
555 file: Mapped[str] = mapped_column(primary_key=True)
557 binary: Mapped["DBBinary"] = relationship(back_populates="contents")
559 def __init__(self, file=None, binary=None):
560 self.file = file
561 self.binary = binary
563 @override
564 def properties(self) -> list[str]:
565 return ["file", "binary"]
568__all__.append("BinContents")
570################################################################################
573class DBBinary(ORMObject):
574 __tablename__ = "binaries"
575 __table_args = (
576 UniqueConstraint(
577 "package", "version", "architecture", name="binaries_package_key"
578 ),
579 )
581 binary_id: Mapped[int] = mapped_column("id", primary_key=True)
582 package: Mapped[str]
583 version: Mapped[str] = mapped_column(DebVersion())
584 maintainer_id: Mapped[int] = mapped_column(
585 "maintainer", ForeignKey("maintainer.id")
586 )
587 source_id: Mapped[int] = mapped_column("source", ForeignKey("source.id"))
588 arch_id: Mapped[int] = mapped_column("architecture", ForeignKey("architecture.id"))
589 poolfile_id: Mapped[int] = mapped_column(
590 "file", ForeignKey("files.id"), unique=True
591 )
592 binarytype: Mapped[str] = mapped_column("type")
593 fingerprint_id: Mapped[int | None] = mapped_column(
594 "sig_fpr", ForeignKey("fingerprint.id")
595 )
596 install_date: Mapped[datetime | None] = mapped_column(
597 DateTime(timezone=True), server_default=func.now()
598 )
599 created: Mapped[datetime | None] = mapped_column(
600 DateTime(timezone=True), server_default=func.now()
601 )
602 modified: Mapped[datetime | None] = mapped_column(
603 DateTime(timezone=True), server_default=func.now()
604 )
605 stanza: Mapped[str]
606 authorized_by_fingerprint_id: Mapped[int | None] = mapped_column(
607 ForeignKey("fingerprint.id")
608 )
610 maintainer: Mapped["Maintainer"] = relationship()
611 source: Mapped["DBSource"] = relationship(back_populates="binaries")
612 architecture: Mapped[Architecture] = relationship()
613 poolfile: Mapped["PoolFile"] = relationship()
614 fingerprint: Mapped["Fingerprint"] = relationship(foreign_keys=[fingerprint_id])
615 authorized_by_fingerprint: Mapped["Fingerprint"] = relationship(
616 foreign_keys=[authorized_by_fingerprint_id]
617 )
618 suites: Mapped[list["Suite"]] = relationship(
619 secondary="bin_associations", back_populates="binaries"
620 )
621 extra_sources: Mapped[list["DBSource"]] = relationship(
622 secondary="extra_src_references", back_populates="extra_binary_references"
623 )
624 key: Mapped[dict[str, "BinaryMetadata"]] = relationship(
625 collection_class=attribute_keyed_dict("key"), cascade="all"
626 )
627 contents: DynamicMapped["BinContents"] = relationship(
628 cascade="all", back_populates="binary"
629 )
631 metadata_proxy: AssociationProxy[dict["MetadataKey", str]] = association_proxy(
632 "key", "value"
633 )
635 def __init__(
636 self,
637 package=None,
638 source=None,
639 version=None,
640 maintainer=None,
641 architecture=None,
642 poolfile=None,
643 binarytype="deb",
644 fingerprint=None,
645 ):
646 self.package = package
647 self.source = source
648 self.version = version
649 self.maintainer = maintainer
650 self.architecture = architecture
651 self.poolfile = poolfile
652 self.binarytype = binarytype
653 self.fingerprint = fingerprint
655 @property
656 def pkid(self) -> int:
657 return self.binary_id
659 @property
660 def name(self) -> str:
661 return self.package
663 @property
664 def arch_string(self) -> str:
665 return "%s" % self.architecture
667 @override
668 def properties(self) -> list[str]:
669 return [
670 "package",
671 "version",
672 "maintainer",
673 "source",
674 "architecture",
675 "poolfile",
676 "binarytype",
677 "fingerprint",
678 "install_date",
679 "suites_count",
680 "binary_id",
681 "contents_count",
682 "extra_sources",
683 ]
685 def scan_contents(self) -> Iterable[str]:
686 """
687 Yields the contents of the package. Only regular files are yielded and
688 the path names are normalized after converting them from either utf-8
689 or iso8859-1 encoding. It yields the string ' <EMPTY PACKAGE>' if the
690 package does not contain any regular file.
691 """
692 fullpath = self.poolfile.fullpath
693 dpkg_cmd = ("dpkg-deb", "--fsys-tarfile", fullpath)
694 dpkg = subprocess.Popen(dpkg_cmd, stdout=subprocess.PIPE)
695 tar = TarFile.open(fileobj=dpkg.stdout, mode="r|")
696 for member in tar.getmembers():
697 if not member.isdir():
698 name = normpath(member.name)
699 yield name
700 tar.close()
701 assert dpkg.stdout is not None
702 dpkg.stdout.close()
703 dpkg.wait()
705 def read_control(self) -> bytes:
706 """
707 Reads the control information from a binary.
709 :return: stanza text of the control section.
710 """
711 from . import utils
713 fullpath = self.poolfile.fullpath
714 return utils.deb_extract_control(fullpath)
716 def read_control_fields(self) -> apt_pkg.TagSection:
717 """
718 Reads the control information from a binary and return
719 as a dictionary.
721 :return: fields of the control section as a dictionary.
722 """
723 stanza = self.read_control()
724 return apt_pkg.TagSection(stanza)
726 @property
727 def proxy(self) -> "MetadataProxy":
728 session = object_session(self)
729 assert session is not None
730 query = session.query(BinaryMetadata).filter_by(binary=self)
731 return MetadataProxy(session, query)
734__all__.append("DBBinary")
737@session_wrapper
738def get_suites_binary_in(
739 package: str, session: "Session | None" = None
740) -> "list[Suite]":
741 """
742 Returns list of Suite objects which given `package` name is in
744 :param package: DBBinary package name to search for
745 :return: list of Suite objects for the given package
746 """
748 assert session is not None
749 return (
750 session.query(Suite)
751 .filter(Suite.binaries.any(DBBinary.package == package))
752 .all()
753 )
756__all__.append("get_suites_binary_in")
759@session_wrapper
760def get_component_by_package_suite(
761 package: str,
762 suite_list: list[str],
763 arch_list: Optional[str] = None,
764 session: "Session | None" = None,
765) -> Optional[str]:
766 """
767 Returns the component name of the newest binary package in suite_list or
768 None if no package is found. The result can be optionally filtered by a list
769 of architecture names.
771 :param package: DBBinary package name to search for
772 :param suite_list: list of suite_name items
773 :param arch_list: optional list of arch_string items that defaults to []
774 :return: name of component or None
775 """
777 assert session is not None
778 q = (
779 session.query(DBBinary)
780 .filter_by(package=package)
781 .join(DBBinary.suites)
782 .filter(Suite.suite_name.in_(suite_list))
783 )
784 if arch_list:
785 q = q.join(DBBinary.architecture).filter(
786 Architecture.arch_string.in_(arch_list)
787 )
788 binary = q.order_by(desc(DBBinary.version)).first()
789 if binary is None:
790 return None
791 else:
792 return binary.poolfile.component.component_name
795__all__.append("get_component_by_package_suite")
798class BinAssociations(Base):
799 __tablename__ = "bin_associations"
800 __table_args = (
801 UniqueConstraint("suite", "bin", name="bin_associations_suite_key"),
802 )
804 id: Mapped[int] = mapped_column(primary_key=True)
805 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"))
806 bin: Mapped[int] = mapped_column(ForeignKey("binaries.id"))
807 created: Mapped[datetime] = mapped_column(
808 DateTime(timezone=True), server_default=func.now()
809 )
810 modified: Mapped[datetime] = mapped_column(
811 DateTime(timezone=True), server_default=func.now()
812 )
815class ExtraSrcReferences(Base):
816 __tablename__ = "extra_src_references"
818 bin_id: Mapped[int] = mapped_column(
819 ForeignKey("binaries.id", ondelete="CASCADE"), primary_key=True
820 )
821 src_id: Mapped[int] = mapped_column(ForeignKey("source.id"), primary_key=True)
824################################################################################
827class BuildQueue(Base):
828 __tablename__ = "build_queue"
830 queue_id: Mapped[int] = mapped_column("id", primary_key=True)
831 queue_name: Mapped[str] = mapped_column(unique=True)
832 generate_metadata: Mapped[bool] = mapped_column(default=False)
833 stay_of_execution: Mapped[int] = mapped_column(default=86400)
834 created: Mapped[datetime | None] = mapped_column(
835 DateTime(timezone=True), server_default=func.now()
836 )
837 modified: Mapped[datetime | None] = mapped_column(
838 DateTime(timezone=True), server_default=func.now()
839 )
840 suite_id: Mapped[int] = mapped_column(ForeignKey("suite.id"))
842 suite: Mapped["Suite"] = relationship()
844 def __init__(self, *args, **kwargs):
845 pass
847 @override
848 def __repr__(self):
849 return "<BuildQueue %s>" % self.queue_name
852__all__.append("BuildQueue")
854################################################################################
857class Component(ORMObject):
858 __tablename__ = "component"
860 component_id: Mapped[int] = mapped_column("id", primary_key=True)
861 component_name: Mapped[str] = mapped_column("name", unique=True)
862 description: Mapped[str | None]
863 meets_dfsg: Mapped[bool | None]
864 created: Mapped[datetime | None] = mapped_column(
865 DateTime(timezone=True), server_default=func.now()
866 )
867 modified: Mapped[datetime | None] = mapped_column(
868 DateTime(timezone=True), server_default=func.now()
869 )
870 ordering: Mapped[int] = mapped_column(unique=True)
872 suites: Mapped[list["Suite"]] = relationship(
873 secondary="component_suite", back_populates="components"
874 )
875 overrides: DynamicMapped["Override"] = relationship(back_populates="component")
877 def __init__(self, component_name=None):
878 self.component_name = component_name
880 @override
881 def __eq__(self, val: Any):
882 if isinstance(val, str): 882 ↛ 883line 882 didn't jump to line 883 because the condition on line 882 was never true
883 warnings.warn(
884 "comparison with a `str` is deprecated",
885 DeprecationWarning,
886 stacklevel=2,
887 )
888 return self.component_name == val
889 # This signals to use the normal comparison operator
890 return NotImplemented
892 @override
893 def __ne__(self, val: Any):
894 if isinstance(val, str):
895 warnings.warn(
896 "comparison with a `str` is deprecated",
897 DeprecationWarning,
898 stacklevel=2,
899 )
900 return self.component_name != val
901 # This signals to use the normal comparison operator
902 return NotImplemented
904 __hash__ = ORMObject.__hash__
906 @override
907 def properties(self) -> list[str]:
908 return [
909 "component_name",
910 "component_id",
911 "description",
912 "meets_dfsg",
913 "overrides_count",
914 ]
917__all__.append("Component")
920@session_wrapper
921def get_component(
922 component: str, session: "Session | None" = None
923) -> Optional[Component]:
924 """
925 Returns database id for given `component`.
927 :param component: The name of the override type
928 :return: the database id for the given component
929 """
931 assert session is not None
932 component = component.lower()
933 q = session.query(Component).filter_by(component_name=component)
934 return q.one_or_none()
937__all__.append("get_component")
940def get_mapped_component_name(component_name: str) -> str:
941 cnf = Config()
942 for m in cnf.value_list("ComponentMappings"): 942 ↛ 943line 942 didn't jump to line 943 because the loop on line 942 never started
943 (src, dst) = m.split()
944 if component_name == src:
945 component_name = dst
946 return component_name
949__all__.append("get_mapped_component_name")
952@session_wrapper
953def get_mapped_component(
954 component_name: str, session: "Session | None" = None
955) -> Optional[Component]:
956 """get component after mappings
958 Evaluate component mappings from ComponentMappings in dak.conf for the
959 given component name.
961 .. todo::
963 ansgar wants to get rid of this. It's currently only used for
964 the security archive
966 :param component_name: component name
967 :param session: database session
968 :return: component after applying maps or :const:`None`
969 """
970 assert session is not None
971 component_name = get_mapped_component_name(component_name)
972 component = (
973 session.query(Component).filter_by(component_name=component_name).first()
974 )
975 return component
978__all__.append("get_mapped_component")
981@session_wrapper
982def get_component_names(session: "Session | None" = None) -> list[str]:
983 """
984 Returns list of strings of component names.
986 :return: list of strings of component names
987 """
989 assert session is not None
990 return [x.component_name for x in session.query(Component).all()]
993__all__.append("get_component_names")
996class ComponentSuite(Base):
997 __tablename__ = "component_suite"
999 component_id: Mapped[int] = mapped_column(
1000 ForeignKey("component.id", ondelete="CASCADE"), primary_key=True
1001 )
1002 suite_id: Mapped[int] = mapped_column(
1003 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True
1004 )
1007################################################################################
1010class DBConfig(Base):
1011 __tablename__ = "config"
1013 config_id: Mapped[int] = mapped_column("id", primary_key=True)
1014 name: Mapped[str] = mapped_column(unique=True)
1015 value: Mapped[str | None]
1016 created: Mapped[datetime] = mapped_column(
1017 DateTime(timezone=True), server_default=func.now()
1018 )
1019 modified: Mapped[datetime] = mapped_column(
1020 DateTime(timezone=True), server_default=func.now()
1021 )
1023 def __init__(self, *args, **kwargs):
1024 pass
1026 @override
1027 def __repr__(self):
1028 return "<DBConfig %s>" % self.name
1031__all__.append("DBConfig")
1033################################################################################
1036class DSCFile(Base):
1037 __tablename__ = "dsc_files"
1038 __table_args = (UniqueConstraint("source", "file", name="dsc_files_source_key"),)
1040 dscfile_id: Mapped[int] = mapped_column("id", primary_key=True)
1041 source_id: Mapped[int] = mapped_column("source", ForeignKey("source.id"))
1042 poolfile_id: Mapped[int] = mapped_column("file", ForeignKey("files.id"))
1043 created: Mapped[datetime | None] = mapped_column(
1044 DateTime(timezone=True), server_default=func.now()
1045 )
1046 modified: Mapped[datetime | None] = mapped_column(
1047 DateTime(timezone=True), server_default=func.now()
1048 )
1049 extra_file: Mapped[bool] = mapped_column(default=True)
1051 source: Mapped["DBSource"] = relationship(back_populates="srcfiles")
1052 poolfile: Mapped["PoolFile"] = relationship()
1054 def __init__(self, *args, **kwargs):
1055 pass
1057 @override
1058 def __repr__(self):
1059 return "<DSCFile %s>" % self.dscfile_id
1062__all__.append("DSCFile")
1065@session_wrapper
1066def get_dscfiles(
1067 dscfile_id: Optional[int] = None,
1068 source_id: Optional[int] = None,
1069 poolfile_id: Optional[int] = None,
1070 session: "Session | None" = None,
1071) -> list[DSCFile]:
1072 """
1073 Returns a list of DSCFiles which may be empty
1075 :param dscfile_id: the dscfile_id of the DSCFiles to find
1076 :param source_id: the source id related to the DSCFiles to find
1077 :param poolfile_id: the poolfile id related to the DSCFiles to find
1078 :return: Possibly empty list of DSCFiles
1079 """
1081 assert session is not None
1083 q = session.query(DSCFile)
1084 if dscfile_id is not None:
1085 q = q.filter_by(dscfile_id=dscfile_id)
1086 if source_id is not None:
1087 q = q.filter_by(source_id=source_id)
1088 if poolfile_id is not None:
1089 q = q.filter_by(poolfile_id=poolfile_id)
1090 return q.all()
1093__all__.append("get_dscfiles")
1095################################################################################
1098class ExternalOverride(ORMObject):
1099 __tablename__ = "external_overrides"
1101 suite_id: Mapped[int] = mapped_column(
1102 "suite", ForeignKey("suite.id"), primary_key=True
1103 )
1104 component_id: Mapped[int] = mapped_column(
1105 "component", ForeignKey("component.id"), primary_key=True
1106 )
1107 package: Mapped[str] = mapped_column(primary_key=True)
1108 key: Mapped[str] = mapped_column(primary_key=True)
1109 value: Mapped[str]
1111 suite: Mapped["Suite"] = relationship()
1112 component: Mapped["Component"] = relationship()
1114 def __init__(self, *args, **kwargs):
1115 pass
1117 @override
1118 def __repr__(self):
1119 return "<ExternalOverride %s = %s: %s>" % (self.package, self.key, self.value)
1122__all__.append("ExternalOverride")
1124################################################################################
1127class PoolFile(ORMObject):
1128 __tablename__ = "files"
1130 file_id: Mapped[int] = mapped_column("id", primary_key=True)
1131 filename: Mapped[str] # TODO: add unique=True?
1132 filesize: Mapped[int] = mapped_column("size", BigInteger())
1133 md5sum: Mapped[str]
1134 last_used: Mapped[datetime | None]
1135 sha1sum: Mapped[str]
1136 sha256sum: Mapped[str]
1137 created: Mapped[datetime | None] = mapped_column(
1138 DateTime(timezone=True), server_default=func.now()
1139 )
1140 modified: Mapped[datetime | None] = mapped_column(
1141 DateTime(timezone=True), server_default=func.now()
1142 )
1144 archives: Mapped[list["ArchiveFile"]] = relationship(back_populates="file")
1146 def __init__(self, filename=None, filesize=-1, md5sum=None):
1147 self.filename = filename
1148 self.filesize = filesize
1149 self.md5sum = md5sum
1151 @property
1152 def fullpath(self) -> str:
1153 session = object_session(self)
1154 assert session is not None
1155 af = (
1156 session.query(ArchiveFile)
1157 .join(Archive)
1158 .filter(ArchiveFile.file == self)
1159 .order_by(Archive.tainted.desc())
1160 .first()
1161 )
1162 assert af is not None
1163 return af.path
1165 @property
1166 def component(self) -> Component:
1167 session = object_session(self)
1168 assert session is not None
1169 component_id = (
1170 session.query(ArchiveFile.component_id)
1171 .filter(ArchiveFile.file == self)
1172 .group_by(ArchiveFile.component_id)
1173 .one()
1174 )
1175 return session.get_one(Component, component_id)
1177 @property
1178 def basename(self) -> str:
1179 return os.path.basename(self.filename)
1181 @override
1182 def properties(self) -> list[str]:
1183 return [
1184 "filename",
1185 "file_id",
1186 "filesize",
1187 "md5sum",
1188 "sha1sum",
1189 "sha256sum",
1190 "source",
1191 "binary",
1192 "last_used",
1193 ]
1196__all__.append("PoolFile")
1198################################################################################
1201class Fingerprint(ORMObject):
1202 __tablename__ = "fingerprint"
1204 fingerprint_id: Mapped[int] = mapped_column("id", primary_key=True)
1205 fingerprint: Mapped[str] = mapped_column(unique=True)
1206 uid_id: Mapped[int | None] = mapped_column("uid", ForeignKey("uid.id"))
1207 keyring_id: Mapped[int | None] = mapped_column("keyring", ForeignKey("keyrings.id"))
1208 created: Mapped[datetime | None] = mapped_column(
1209 DateTime(timezone=True), server_default=func.now()
1210 )
1211 modified: Mapped[datetime | None] = mapped_column(
1212 DateTime(timezone=True), server_default=func.now()
1213 )
1214 acl_id: Mapped[int | None] = mapped_column(ForeignKey("acl.id"))
1216 uid: Mapped["Uid | None"] = relationship(back_populates="fingerprint")
1217 keyring: Mapped["Keyring | None"] = relationship()
1218 acl: Mapped["ACL | None"] = relationship()
1220 def __init__(self, fingerprint=None):
1221 self.fingerprint = fingerprint
1223 @override
1224 def properties(self) -> list[str]:
1225 return ["fingerprint", "fingerprint_id", "keyring", "uid", "binary_reject"]
1228__all__.append("Fingerprint")
1231@session_wrapper
1232def get_fingerprint(
1233 fpr: str, session: "Session | None" = None
1234) -> Optional[Fingerprint]:
1235 """
1236 Returns Fingerprint object for given fpr.
1238 :param fpr: The fpr to find / add
1239 :param session: Optional SQL session object (a temporary one will be
1240 generated if not supplied).
1241 :return: the Fingerprint object for the given fpr or None
1242 """
1244 assert session is not None
1245 q = session.query(Fingerprint).filter_by(fingerprint=fpr)
1246 return q.one_or_none()
1249__all__.append("get_fingerprint")
1252@session_wrapper
1253def get_or_set_fingerprint(fpr: str, session: "Session | None" = None) -> Fingerprint:
1254 """
1255 Returns Fingerprint object for given fpr.
1257 If no matching fpr is found, a row is inserted.
1259 :param fpr: The fpr to find / add
1260 :param session: Optional SQL session object (a temporary one will be
1261 generated if not supplied). If not passed, a commit will be performed at
1262 the end of the function, otherwise the caller is responsible for commiting.
1263 A flush will be performed either way.
1264 :return: the Fingerprint object for the given fpr
1265 """
1267 assert session is not None
1268 q = session.query(Fingerprint).filter_by(fingerprint=fpr)
1270 try:
1271 ret = q.one()
1272 except NoResultFound:
1273 fingerprint = Fingerprint()
1274 fingerprint.fingerprint = fpr
1275 session.add(fingerprint)
1276 session.commit_or_flush() # type: ignore[attr-defined]
1277 ret = fingerprint
1279 return ret
1282__all__.append("get_or_set_fingerprint")
1284################################################################################
1286# Helper routine for Keyring class
1289def get_ldap_name(entry) -> str:
1290 name = []
1291 for k in ["cn", "mn", "sn"]:
1292 ret = entry.get(k)
1293 if not ret:
1294 continue
1295 value = ret[0].decode()
1296 if value and value[0] != "-":
1297 name.append(value)
1298 return " ".join(name)
1301################################################################################
1304class _Key(TypedDict, total=False):
1305 email: str
1306 name: str
1307 fingerprints: list[str]
1308 uid: str
1311class Keyring(Base):
1312 __tablename__ = "keyrings"
1314 keyring_id: Mapped[int] = mapped_column("id", primary_key=True)
1315 keyring_name: Mapped[str] = mapped_column("name", unique=True)
1316 priority: Mapped[int] = mapped_column(default=100)
1317 created: Mapped[datetime | None] = mapped_column(
1318 DateTime(timezone=True), server_default=func.now()
1319 )
1320 modified: Mapped[datetime | None] = mapped_column(
1321 DateTime(timezone=True), server_default=func.now()
1322 )
1323 active: Mapped[bool | None] = mapped_column(default=True)
1324 acl_id: Mapped[int | None] = mapped_column(ForeignKey("acl.id"))
1325 tag2upload: Mapped[bool] = mapped_column(default=False)
1327 acl: Mapped["ACL | None"] = relationship(foreign_keys=[acl_id])
1329 keys: dict[str, _Key] = {}
1330 fpr_lookup: dict[str, str] = {}
1332 def __init__(self, *args, **kwargs):
1333 pass
1335 @override
1336 def __repr__(self):
1337 return "<Keyring %s>" % self.keyring_name
1339 def de_escape_gpg_str(self, txt: str) -> str:
1340 esclist = re.split(r"(\\x..)", txt)
1341 for x in range(1, len(esclist), 2): 1341 ↛ 1342line 1341 didn't jump to line 1342 because the loop on line 1341 never started
1342 esclist[x] = "%c" % (int(esclist[x][2:], 16))
1343 return "".join(esclist)
1345 def parse_address(self, uid: str) -> tuple[str, str]:
1346 """parses uid and returns a tuple of real name and email address"""
1347 import email.utils
1349 (name, address) = email.utils.parseaddr(uid)
1350 name = re.sub(r"\s*[(].*[)]", "", name)
1351 name = self.de_escape_gpg_str(name)
1352 if name == "":
1353 name = uid
1354 return (name, address)
1356 def load_keys(self, keyring: str) -> None:
1357 if not self.keyring_id: 1357 ↛ 1358line 1357 didn't jump to line 1358 because the condition on line 1357 was never true
1358 raise Exception("Must be initialized with database information")
1360 cmd = [
1361 "gpg",
1362 "--no-default-keyring",
1363 "--keyring",
1364 keyring,
1365 "--with-colons",
1366 "--fingerprint",
1367 "--fingerprint",
1368 ]
1369 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1371 key = None
1372 need_fingerprint = False
1374 assert p.stdout is not None
1375 for line_raw in p.stdout:
1376 try:
1377 line = line_raw.decode()
1378 except UnicodeDecodeError:
1379 # Some old UIDs might not use UTF-8 encoding. We assume they
1380 # use latin1.
1381 line = line_raw.decode("latin1")
1382 field = line.split(":")
1383 if field[0] == "pub":
1384 key = field[4]
1385 self.keys[key] = {}
1386 (name, addr) = self.parse_address(field[9])
1387 if "@" in addr: 1387 ↛ 1388line 1387 didn't jump to line 1388 because the condition on line 1387 was never true
1388 self.keys[key]["email"] = addr
1389 self.keys[key]["name"] = name
1390 need_fingerprint = True
1391 elif key and field[0] == "uid":
1392 assert key is not None
1393 (name, addr) = self.parse_address(field[9])
1394 if "email" not in self.keys[key] and "@" in addr: 1394 ↛ 1375line 1394 didn't jump to line 1375 because the condition on line 1394 was always true
1395 self.keys[key]["email"] = addr
1396 self.keys[key]["name"] = name
1397 elif need_fingerprint and field[0] == "fpr":
1398 assert key is not None
1399 self.keys[key]["fingerprints"] = [field[9]]
1400 self.fpr_lookup[field[9]] = key
1401 need_fingerprint = False
1403 (out, err) = p.communicate()
1404 r = p.returncode
1405 if r != 0: 1405 ↛ 1406line 1405 didn't jump to line 1406 because the condition on line 1405 was never true
1406 raise daklib.gpg.GpgException(
1407 "command failed: %s\nstdout: %r\nstderr: %r\n" % (cmd, out, err)
1408 )
1410 def import_users_from_ldap(
1411 self, session: "Session"
1412 ) -> tuple[dict[str, tuple[int, str]], dict[int, tuple[str, str]]]:
1413 import ldap # type: ignore
1415 from .utils import open_ldap_connection
1417 conn = open_ldap_connection()
1418 cnf = Config()
1419 LDAPDn = cnf["Import-LDAP-Fingerprints::LDAPDn"]
1420 Attrs = conn.search_s(
1421 LDAPDn,
1422 ldap.SCOPE_ONELEVEL,
1423 "(&(keyfingerprint=*)(supplementaryGid=%s))"
1424 % (cnf["Import-Users-From-Passwd::ValidGID"]),
1425 ["uid", "keyfingerprint", "cn", "mn", "sn"],
1426 )
1428 byuid: dict[int, tuple[str, str]] = {}
1429 byname: dict[str, tuple[int, str]] = {}
1431 for i in Attrs:
1432 entry = i[1]
1433 uid = entry["uid"][0].decode()
1434 name = get_ldap_name(entry)
1435 fingerprints = entry["keyFingerPrint"]
1436 keyid = None
1437 for f_raw in fingerprints:
1438 f = f_raw.decode()
1439 key = self.fpr_lookup.get(f, None)
1440 if key not in self.keys:
1441 continue
1442 self.keys[key]["uid"] = uid
1444 if keyid is not None:
1445 continue
1446 keyid = get_or_set_uid(uid, session).uid_id
1447 byuid[keyid] = (uid, name)
1448 byname[uid] = (keyid, name)
1450 return (byname, byuid)
1452 def generate_users_from_keyring(
1453 self, format: str, session: "Session"
1454 ) -> tuple[dict[str, tuple[int, str]], dict[int, tuple[str, str]]]:
1455 byuid: dict[int, tuple[str, str]] = {}
1456 byname: dict[str, tuple[int, str]] = {}
1457 any_invalid = False
1458 for x in list(self.keys.keys()):
1459 if "email" not in self.keys[x]: 1459 ↛ 1460line 1459 didn't jump to line 1460 because the condition on line 1459 was never true
1460 any_invalid = True
1461 self.keys[x]["uid"] = format % "invalid-uid"
1462 else:
1463 uid = format % self.keys[x]["email"]
1464 keyid = get_or_set_uid(uid, session).uid_id
1465 byuid[keyid] = (uid, self.keys[x]["name"])
1466 byname[uid] = (keyid, self.keys[x]["name"])
1467 self.keys[x]["uid"] = uid
1469 if any_invalid: 1469 ↛ 1470line 1469 didn't jump to line 1470 because the condition on line 1469 was never true
1470 uid = format % "invalid-uid"
1471 keyid = get_or_set_uid(uid, session).uid_id
1472 byuid[keyid] = (uid, "ungeneratable user id")
1473 byname[uid] = (keyid, "ungeneratable user id")
1475 return (byname, byuid)
1478__all__.append("Keyring")
1481@session_wrapper
1482def get_keyring(keyring: str, session: "Session | None" = None) -> Optional[Keyring]:
1483 """
1484 If `keyring` does not have an entry in the `keyrings` table yet, return None
1485 If `keyring` already has an entry, simply return the existing :class:`Keyring`
1487 :param keyring: the keyring name
1488 :return: the :class:`Keyring` object for this keyring
1489 """
1491 assert session is not None
1492 q = session.query(Keyring).filter_by(keyring_name=keyring)
1493 return q.one_or_none()
1496__all__.append("get_keyring")
1499@session_wrapper
1500def get_active_keyring_paths(session: "Session | None" = None) -> list[str]:
1501 """
1502 :return: list of active keyring paths
1503 """
1504 assert session is not None
1505 return [
1506 x.keyring_name
1507 for x in session.query(Keyring)
1508 .filter(Keyring.active == True) # noqa:E712
1509 .order_by(desc(Keyring.priority))
1510 .all()
1511 ]
1514__all__.append("get_active_keyring_paths")
1516################################################################################
1519class DBChange(Base):
1520 __tablename__ = "changes"
1522 change_id: Mapped[int] = mapped_column("id", primary_key=True)
1523 changesname: Mapped[str]
1524 seen: Mapped[datetime] = mapped_column(
1525 DateTime(timezone=True), server_default=func.now()
1526 )
1527 source: Mapped[str]
1528 binaries: Mapped[str | None]
1529 architecture: Mapped[str]
1530 version: Mapped[str]
1531 distribution: Mapped[str]
1532 urgency: Mapped[str]
1533 maintainer: Mapped[str]
1534 fingerprint: Mapped[str]
1535 changedby: Mapped[str]
1536 date: Mapped[str]
1537 created: Mapped[datetime] = mapped_column(
1538 DateTime(timezone=True), server_default=func.now()
1539 )
1540 modified: Mapped[datetime] = mapped_column(
1541 DateTime(timezone=True), server_default=func.now()
1542 )
1543 changelog_id: Mapped[int | None]
1544 closes: Mapped[list[str] | None] = mapped_column(ARRAY(String()))
1545 authorized_by_fingerprint: Mapped[str | None]
1547 def __init__(self, *args, **kwargs):
1548 pass
1550 @override
1551 def __repr__(self):
1552 return "<DBChange %s>" % self.changesname
1555__all__.append("DBChange")
1558@session_wrapper
1559def get_dbchange(filename: str, session: "Session | None" = None) -> Optional[DBChange]:
1560 """
1561 returns DBChange object for given `filename`.
1563 :param filename: the name of the file
1564 :param session: Optional SQLA session object (a temporary one will be
1565 generated if not supplied)
1566 :return: DBChange object for the given filename (:const:`None` if not present)
1567 """
1568 assert session is not None
1569 q = session.query(DBChange).filter_by(changesname=filename)
1570 return q.one_or_none()
1573__all__.append("get_dbchange")
1575################################################################################
1578class DBChangelog(Base):
1579 __tablename__ = "changelogs_text"
1581 id: Mapped[int] = mapped_column(primary_key=True)
1582 changelog: Mapped[str | None]
1584 def __init__(self, *args, **kwargs):
1585 pass
1587 @override
1588 def __repr__(self):
1589 return "<DBChangelog %s>" % self.id
1592__all__.append("DBChangelog")
1594################################################################################
1597class Maintainer(ORMObject):
1598 __tablename__ = "maintainer"
1600 maintainer_id: Mapped[int] = mapped_column("id", primary_key=True)
1601 name: Mapped[str] = mapped_column(unique=True)
1602 created: Mapped[datetime | None] = mapped_column(
1603 DateTime(timezone=True), server_default=func.now()
1604 )
1605 modified: Mapped[datetime | None] = mapped_column(
1606 DateTime(timezone=True), server_default=func.now()
1607 )
1609 maintains_sources: Mapped[list["DBSource"]] = relationship(
1610 back_populates="maintainer", foreign_keys=lambda: DBSource.maintainer_id
1611 )
1612 changed_sources: Mapped[list["DBSource"]] = relationship(
1613 back_populates="changedby", foreign_keys=lambda: DBSource.changedby_id
1614 )
1616 def __init__(self, name=None):
1617 self.name = name
1619 @override
1620 def properties(self) -> list[str]:
1621 return ["name", "maintainer_id"]
1623 def get_split_maintainer(self) -> tuple[str, str, str, str]:
1624 if not hasattr(self, "name") or self.name is None:
1625 return ("", "", "", "")
1627 return fix_maintainer(self.name.strip())
1630__all__.append("Maintainer")
1633@session_wrapper
1634def get_or_set_maintainer(name: str, session: "Session | None" = None) -> Maintainer:
1635 """
1636 Returns Maintainer object for given maintainer name.
1638 If no matching maintainer name is found, a row is inserted.
1640 :param name: The maintainer name to add
1641 :param session: Optional SQL session object (a temporary one will be
1642 generated if not supplied). If not passed, a commit will be performed at
1643 the end of the function, otherwise the caller is responsible for commiting.
1644 A flush will be performed either way.
1645 :return: the Maintainer object for the given maintainer
1646 """
1648 assert session is not None
1649 q = session.query(Maintainer).filter_by(name=name)
1650 try:
1651 ret = q.one()
1652 except NoResultFound:
1653 maintainer = Maintainer()
1654 maintainer.name = name
1655 session.add(maintainer)
1656 session.commit_or_flush() # type: ignore[attr-defined]
1657 ret = maintainer
1659 return ret
1662__all__.append("get_or_set_maintainer")
1665@session_wrapper
1666def get_maintainer(
1667 maintainer_id: int, session: "Session | None" = None
1668) -> Optional[Maintainer]:
1669 """
1670 Return the name of the maintainer behind `maintainer_id` or :const:`None`
1671 if that `maintainer_id` is invalid.
1673 :param maintainer_id: the id of the maintainer
1674 :return: the Maintainer with this `maintainer_id`
1675 """
1677 assert session is not None
1678 return session.get(Maintainer, maintainer_id)
1681__all__.append("get_maintainer")
1683################################################################################
1686class NewComment(Base):
1687 __tablename__ = "new_comments"
1689 comment_id: Mapped[int] = mapped_column("id", primary_key=True)
1690 package: Mapped[str]
1691 version: Mapped[str]
1692 comment: Mapped[str]
1693 author: Mapped[str]
1694 notedate: Mapped[datetime] = mapped_column(
1695 DateTime(timezone=True), server_default=func.now()
1696 )
1697 trainee: Mapped[bool] = mapped_column(default=False)
1698 created: Mapped[datetime | None] = mapped_column(
1699 DateTime(timezone=True), server_default=func.now()
1700 )
1701 modified: Mapped[datetime | None] = mapped_column(
1702 DateTime(timezone=True), server_default=func.now()
1703 )
1704 policy_queue_id: Mapped[int] = mapped_column(ForeignKey("policy_queue.id"))
1706 policy_queue: Mapped["PolicyQueue"] = relationship()
1708 def __init__(self, *args, **kwargs):
1709 pass
1711 @override
1712 def __repr__(self):
1713 return """<NewComment for '%s %s' (%s)>""" % (
1714 self.package,
1715 self.version,
1716 self.comment_id,
1717 )
1720__all__.append("NewComment")
1723@session_wrapper
1724def has_new_comment(
1725 policy_queue: "PolicyQueue",
1726 package: str,
1727 version: str,
1728 session: "Session | None" = None,
1729) -> bool:
1730 """
1731 Returns :const:`True` if the given combination of `package`, `version` has a comment.
1733 :param package: name of the package
1734 :param version: package version
1735 :param session: Optional SQLA session object (a temporary one will be
1736 generated if not supplied)
1737 """
1739 assert session is not None
1740 q = session.query(NewComment).filter_by(policy_queue=policy_queue)
1741 q = q.filter_by(package=package)
1742 q = q.filter_by(version=version)
1744 return bool(q.count() > 0)
1747__all__.append("has_new_comment")
1750@session_wrapper
1751def get_new_comments(
1752 policy_queue: "PolicyQueue",
1753 package: Optional[str] = None,
1754 version: Optional[str] = None,
1755 comment_id: Optional[int] = None,
1756 session: "Session | None" = None,
1757) -> list[NewComment]:
1758 """
1759 Returns (possibly empty) list of NewComment objects for the given
1760 parameters
1762 :param package: name of the package
1763 :param version: package version
1764 :param comment_id: An id of a comment
1765 :param session: Optional SQLA session object (a temporary one will be
1766 generated if not supplied)
1767 :return: A (possibly empty) list of NewComment objects will be returned
1768 """
1770 assert session is not None
1771 q = session.query(NewComment).filter_by(policy_queue=policy_queue)
1772 if package is not None: 1772 ↛ 1774line 1772 didn't jump to line 1774 because the condition on line 1772 was always true
1773 q = q.filter_by(package=package)
1774 if version is not None: 1774 ↛ 1775line 1774 didn't jump to line 1775 because the condition on line 1774 was never true
1775 q = q.filter_by(version=version)
1776 if comment_id is not None: 1776 ↛ 1777line 1776 didn't jump to line 1777 because the condition on line 1776 was never true
1777 q = q.filter_by(comment_id=comment_id)
1779 return q.all()
1782__all__.append("get_new_comments")
1784################################################################################
1787class Override(ORMObject):
1788 __tablename__ = "override"
1790 suite_id: Mapped[int] = mapped_column(
1791 "suite", ForeignKey("suite.id"), primary_key=True
1792 )
1793 component_id: Mapped[int] = mapped_column(
1794 "component", ForeignKey("component.id"), primary_key=True
1795 )
1796 package: Mapped[str] = mapped_column(primary_key=True)
1797 overridetype_id: Mapped[int] = mapped_column(
1798 "type", ForeignKey("override_type.id"), primary_key=True
1799 )
1800 priority_id: Mapped[int | None] = mapped_column(
1801 "priority", ForeignKey("priority.id")
1802 )
1803 section_id: Mapped[int] = mapped_column("section", ForeignKey("section.id"))
1804 maintainer: Mapped[str | None]
1805 created: Mapped[datetime | None] = mapped_column(
1806 DateTime(timezone=True), server_default=func.now()
1807 )
1808 modified: Mapped[datetime | None] = mapped_column(
1809 DateTime(timezone=True), server_default=func.now()
1810 )
1812 suite: Mapped["Suite"] = relationship(back_populates="overrides")
1813 component: Mapped["Component"] = relationship(back_populates="overrides")
1814 priority: Mapped["Priority"] = relationship(back_populates="overrides")
1815 section: Mapped["Section"] = relationship(back_populates="overrides")
1816 overridetype: Mapped["OverrideType"] = relationship(back_populates="overrides")
1818 def __init__(
1819 self,
1820 package=None,
1821 suite=None,
1822 component=None,
1823 overridetype=None,
1824 section=None,
1825 priority=None,
1826 ):
1827 self.package = package
1828 self.suite = suite
1829 self.component = component
1830 self.overridetype = overridetype
1831 self.section = section
1832 self.priority = priority
1834 @override
1835 def properties(self) -> list[str]:
1836 return ["package", "suite", "component", "overridetype", "section", "priority"]
1839__all__.append("Override")
1842@session_wrapper
1843def get_override(
1844 package: str,
1845 suite: Union[str, list[str], None] = None,
1846 component: Union[str, list[str], None] = None,
1847 overridetype: Union[str, list[str], None] = None,
1848 session: "Session | None" = None,
1849) -> list[Override]:
1850 """
1851 Returns Override object for the given parameters
1853 :param package: The name of the package
1854 :param suite: The name of the suite (or suites if a list) to limit to. If
1855 None, don't limit. Defaults to None.
1856 :param component: The name of the component (or components if a list) to
1857 limit to. If None, don't limit. Defaults to None.
1858 :param overridetype: The name of the overridetype (or overridetypes if a list) to
1859 limit to. If None, don't limit. Defaults to None.
1860 :param session: Optional SQLA session object (a temporary one will be
1861 generated if not supplied)
1862 :return: A (possibly empty) list of Override objects will be returned
1863 """
1865 assert session is not None
1867 q = session.query(Override)
1868 q = q.filter_by(package=package)
1870 if suite is not None:
1871 if not isinstance(suite, list):
1872 suite = [suite]
1873 q = q.join(Suite).filter(Suite.suite_name.in_(suite))
1875 if component is not None:
1876 if not isinstance(component, list):
1877 component = [component]
1878 q = q.join(Component).filter(Component.component_name.in_(component))
1880 if overridetype is not None:
1881 if not isinstance(overridetype, list):
1882 overridetype = [overridetype]
1883 q = q.join(OverrideType).filter(OverrideType.overridetype.in_(overridetype))
1885 return q.all()
1888__all__.append("get_override")
1891################################################################################
1894class OverrideType(ORMObject):
1895 __tablename__ = "override_type"
1897 overridetype_id: Mapped[int] = mapped_column("id", primary_key=True)
1898 overridetype: Mapped[str] = mapped_column("type", unique=True)
1899 created: Mapped[datetime | None] = mapped_column(
1900 DateTime(timezone=True), server_default=func.now()
1901 )
1902 modified: Mapped[datetime | None] = mapped_column(
1903 DateTime(timezone=True), server_default=func.now()
1904 )
1906 overrides: DynamicMapped["Override"] = relationship(back_populates="overridetype")
1908 def __init__(self, overridetype=None):
1909 self.overridetype = overridetype
1911 @override
1912 def properties(self) -> list[str]:
1913 return ["overridetype", "overridetype_id", "overrides_count"]
1916__all__.append("OverrideType")
1919@session_wrapper
1920def get_override_type(
1921 override_type: str, session: "Session | None" = None
1922) -> Optional[OverrideType]:
1923 """
1924 Returns OverrideType object for given `override_type`.
1926 :param override_type: The name of the override type
1927 :param session: Optional SQLA session object (a temporary one will be
1928 generated if not supplied)
1929 :return: the database id for the given override type
1930 """
1932 assert session is not None
1933 q = session.query(OverrideType).filter_by(overridetype=override_type)
1934 return q.one_or_none()
1937__all__.append("get_override_type")
1939################################################################################
1942class PolicyQueue(Base):
1943 __tablename__ = "policy_queue"
1945 policy_queue_id: Mapped[int] = mapped_column("id", primary_key=True)
1946 queue_name: Mapped[str] = mapped_column(unique=True)
1947 path: Mapped[str]
1948 change_perms: Mapped[str] = mapped_column(CHAR(length=4))
1949 generate_metadata: Mapped[bool] = mapped_column(default=False)
1950 created: Mapped[datetime | None] = mapped_column(
1951 DateTime(timezone=True), server_default=func.now()
1952 )
1953 modified: Mapped[datetime | None] = mapped_column(
1954 DateTime(timezone=True), server_default=func.now()
1955 )
1956 send_to_build_queues: Mapped[bool] = mapped_column(default=False)
1957 suite_id: Mapped[int] = mapped_column(ForeignKey("suite.id"))
1959 suite: Mapped["Suite"] = relationship(foreign_keys=[suite_id])
1960 uploads: Mapped[list["PolicyQueueUpload"]] = relationship(
1961 back_populates="policy_queue"
1962 )
1964 def __init__(self, *args, **kwargs):
1965 pass
1967 @override
1968 def __repr__(self):
1969 return "<PolicyQueue %s>" % self.queue_name
1972__all__.append("PolicyQueue")
1975@session_wrapper
1976def get_policy_queue(
1977 queuename: str, session: "Session | None" = None
1978) -> Optional[PolicyQueue]:
1979 """
1980 Returns PolicyQueue object for given `queuename`
1982 :param queuename: The name of the queue
1983 :param session: Optional SQLA session object (a temporary one will be
1984 generated if not supplied)
1985 :return: PolicyQueue object for the given queue
1986 """
1988 assert session is not None
1989 q = session.query(PolicyQueue).filter_by(queue_name=queuename)
1990 return q.one_or_none()
1993__all__.append("get_policy_queue")
1995################################################################################
1998@functools.total_ordering
1999class PolicyQueueUpload(Base):
2000 __tablename__ = "policy_queue_upload"
2001 __table_args = (
2002 UniqueConstraint(
2003 "policy_queue_id",
2004 "target_suite_id",
2005 "changes_id",
2006 name="policy_queue_upload_policy_queue_id_target_suite_id_changes_key",
2007 ),
2008 )
2010 id: Mapped[int] = mapped_column(primary_key=True)
2011 policy_queue_id: Mapped[int] = mapped_column(ForeignKey("policy_queue.id"))
2012 target_suite_id: Mapped[int] = mapped_column(ForeignKey("suite.id"))
2013 changes_id: Mapped[int] = mapped_column(ForeignKey("changes.id"))
2014 source_id: Mapped[int | None] = mapped_column(ForeignKey("source.id"))
2016 changes: Mapped["DBChange"] = relationship()
2017 policy_queue: Mapped["PolicyQueue"] = relationship(back_populates="uploads")
2018 target_suite: Mapped["Suite"] = relationship()
2019 source: Mapped["DBSource | None"] = relationship()
2020 binaries: Mapped[list["DBBinary"]] = relationship(
2021 secondary="policy_queue_upload_binaries_map"
2022 )
2023 byhand: Mapped[list["PolicyQueueByhandFile"]] = relationship(
2024 back_populates="upload"
2025 )
2027 def _key(self):
2028 return (
2029 self.changes.source,
2030 AptVersion(self.changes.version),
2031 self.source is None,
2032 self.changes.changesname,
2033 )
2035 @override
2036 def __eq__(self, other: Any) -> bool:
2037 if not isinstance(other, PolicyQueueUpload): 2037 ↛ 2038line 2037 didn't jump to line 2038 because the condition on line 2037 was never true
2038 return NotImplemented
2039 return self._key() == other._key()
2041 def __lt__(self, other):
2042 return self._key() < other._key()
2045__all__.append("PolicyQueueUpload")
2048class PolicyQueueUploadBinariesMap(Base):
2049 __tablename__ = "policy_queue_upload_binaries_map"
2051 policy_queue_upload_id: Mapped[int] = mapped_column(
2052 ForeignKey("policy_queue_upload.id", ondelete="CASCADE"), primary_key=True
2053 )
2054 binary_id: Mapped[int] = mapped_column(ForeignKey("binaries.id"), primary_key=True)
2057################################################################################
2060class PolicyQueueByhandFile(Base):
2061 __tablename__ = "policy_queue_byhand_file"
2063 id: Mapped[int] = mapped_column(primary_key=True)
2064 upload_id: Mapped[int] = mapped_column(ForeignKey("policy_queue_upload.id"))
2065 filename: Mapped[str]
2066 processed: Mapped[bool] = mapped_column(default=False)
2068 upload: Mapped[PolicyQueueUpload] = relationship(back_populates="byhand")
2071__all__.append("PolicyQueueByhandFile")
2073################################################################################
2076class Priority(ORMObject):
2077 __tablename__ = "priority"
2079 priority_id: Mapped[int] = mapped_column("id", primary_key=True)
2080 priority: Mapped[str] = mapped_column(unique=True)
2081 level: Mapped[int] = mapped_column(unique=True)
2082 created: Mapped[datetime | None] = mapped_column(
2083 DateTime(timezone=True), server_default=func.now()
2084 )
2085 modified: Mapped[datetime | None] = mapped_column(
2086 DateTime(timezone=True), server_default=func.now()
2087 )
2089 overrides: DynamicMapped["Override"] = relationship(back_populates="priority")
2091 def __init__(self, priority=None, level=None):
2092 self.priority = priority
2093 self.level = level
2095 @override
2096 def properties(self) -> list[str]:
2097 return ["priority", "priority_id", "level", "overrides_count"]
2099 @override
2100 def __eq__(self, val: Any):
2101 if isinstance(val, str):
2102 warnings.warn(
2103 "comparison with a `str` is deprecated",
2104 DeprecationWarning,
2105 stacklevel=2,
2106 )
2107 return self.priority == val
2108 # This signals to use the normal comparison operator
2109 return NotImplemented
2111 @override
2112 def __ne__(self, val: Any):
2113 if isinstance(val, str): 2113 ↛ 2121line 2113 didn't jump to line 2121 because the condition on line 2113 was always true
2114 warnings.warn(
2115 "comparison with a `str` is deprecated",
2116 DeprecationWarning,
2117 stacklevel=2,
2118 )
2119 return self.priority != val
2120 # This signals to use the normal comparison operator
2121 return NotImplemented
2123 __hash__ = ORMObject.__hash__
2126__all__.append("Priority")
2129@session_wrapper
2130def get_priority(priority: str, session: "Session | None" = None) -> Optional[Priority]:
2131 """
2132 Returns Priority object for given `priority` name.
2134 :param priority: The name of the priority
2135 :param session: Optional SQLA session object (a temporary one will be
2136 generated if not supplied)
2137 :return: Priority object for the given priority
2138 """
2140 assert session is not None
2141 q = session.query(Priority).filter_by(priority=priority)
2142 return q.one_or_none()
2145__all__.append("get_priority")
2148@session_wrapper
2149def get_priorities(session: "Session | None" = None) -> dict[str, int]:
2150 """
2151 Returns dictionary of priority names -> id mappings
2153 :param session: Optional SQL session object (a temporary one will be
2154 generated if not supplied)
2155 :return: dictionary of priority names -> id mappings
2156 """
2158 assert session is not None
2159 ret = {}
2160 q = session.query(Priority)
2161 for x in q.all():
2162 ret[x.priority] = x.priority_id
2164 return ret
2167__all__.append("get_priorities")
2169################################################################################
2172class Section(BaseTimestamp):
2173 __tablename__ = "section"
2175 section_id: Mapped[int] = mapped_column("id", primary_key=True)
2176 section: Mapped[str] = mapped_column(unique=True)
2178 overrides: DynamicMapped["Override"] = relationship(back_populates="section")
2180 def __init__(self, section=None):
2181 self.section = section
2183 @override
2184 def __str__(self):
2185 return self.section
2187 @override
2188 def __repr__(self):
2189 return "<{} {}>".format(
2190 self.__class__.__name__,
2191 self.section,
2192 )
2194 @override
2195 def __eq__(self, val: Any):
2196 if isinstance(val, str):
2197 warnings.warn(
2198 "comparison with a `str` is deprecated",
2199 DeprecationWarning,
2200 stacklevel=2,
2201 )
2202 return self.section == val
2203 # This signals to use the normal comparison operator
2204 return NotImplemented
2206 @override
2207 def __ne__(self, val: Any):
2208 if isinstance(val, str): 2208 ↛ 2216line 2208 didn't jump to line 2216 because the condition on line 2208 was always true
2209 warnings.warn(
2210 "comparison with a `str` is deprecated",
2211 DeprecationWarning,
2212 stacklevel=2,
2213 )
2214 return self.section != val
2215 # This signals to use the normal comparison operator
2216 return NotImplemented
2218 __hash__ = BaseTimestamp.__hash__
2221__all__.append("Section")
2224@session_wrapper
2225def get_section(section: str, session: "Session | None" = None) -> Optional[Section]:
2226 """
2227 Returns Section object for given `section` name.
2229 :param section: The name of the section
2230 :param session: Optional SQLA session object (a temporary one will be
2231 generated if not supplied)
2232 :return: Section object for the given section name
2233 """
2235 assert session is not None
2236 q = session.query(Section).filter_by(section=section)
2237 return q.one_or_none()
2240__all__.append("get_section")
2243@session_wrapper
2244def get_sections(session: "Session | None" = None) -> dict[str, int]:
2245 """
2246 Returns dictionary of section names -> id mappings
2248 :param session: Optional SQL session object (a temporary one will be
2249 generated if not supplied)
2250 :return: dictionary of section names -> id mappings
2251 """
2253 assert session is not None
2254 ret = {}
2255 q = session.query(Section)
2256 for x in q.all():
2257 ret[x.section] = x.section_id
2259 return ret
2262__all__.append("get_sections")
2264################################################################################
2267class SignatureHistory(ORMObject):
2268 __tablename__ = "signature_history"
2270 fingerprint: Mapped[str] = mapped_column(primary_key=True)
2271 signature_timestamp: Mapped[datetime] = mapped_column(
2272 DateTime(timezone=False), primary_key=True
2273 )
2274 contents_sha1: Mapped[str] = mapped_column(primary_key=True)
2275 seen: Mapped[datetime] = mapped_column(
2276 DateTime(timezone=False), server_default=func.now()
2277 )
2279 @classmethod
2280 def from_signed_file(
2281 cls, signed_file: "daklib.gpg.SignedFile"
2282 ) -> "SignatureHistory":
2283 """signature history entry from signed file
2285 :param signed_file: signed file
2286 """
2287 self = cls()
2288 self.fingerprint = signed_file.primary_fingerprint
2289 self.signature_timestamp = signed_file.signature_timestamp
2290 self.contents_sha1 = signed_file.contents_sha1
2291 return self
2293 def query(self, session):
2294 return (
2295 session.query(SignatureHistory)
2296 .filter_by(
2297 fingerprint=self.fingerprint,
2298 signature_timestamp=self.signature_timestamp,
2299 contents_sha1=self.contents_sha1,
2300 )
2301 .first()
2302 )
2305__all__.append("SignatureHistory")
2307################################################################################
2310class SrcContents(ORMObject):
2311 __tablename__ = "src_contents"
2313 source_id: Mapped[int] = mapped_column(
2314 ForeignKey("source.id", ondelete="CASCADE"), primary_key=True
2315 )
2316 file: Mapped[str] = mapped_column(primary_key=True)
2318 source: Mapped["DBSource"] = relationship(back_populates="contents")
2320 def __init__(self, file=None, source=None):
2321 self.file = file
2322 self.source = source
2324 @override
2325 def properties(self) -> list[str]:
2326 return ["file", "source"]
2329__all__.append("SrcContents")
2331################################################################################
2334class DBSource(ORMObject):
2335 __tablename__ = "source"
2336 __table_args = (UniqueConstraint("source", "version", name="source_source_key"),)
2338 source_id: Mapped[int] = mapped_column("id", primary_key=True)
2339 source: Mapped[str]
2340 version: Mapped[str] = mapped_column(DebVersion())
2341 maintainer_id: Mapped[int] = mapped_column(
2342 "maintainer", ForeignKey("maintainer.id")
2343 )
2344 poolfile_id: Mapped[int] = mapped_column(
2345 "file", ForeignKey("files.id"), unique=True
2346 )
2347 fingerprint_id: Mapped[int | None] = mapped_column(
2348 "sig_fpr", ForeignKey("fingerprint.id")
2349 )
2350 install_date: Mapped[datetime] = mapped_column(
2351 DateTime(timezone=True), server_default=func.now()
2352 )
2353 changedby_id: Mapped[int] = mapped_column("changedby", ForeignKey("maintainer.id"))
2354 dm_upload_allowed: Mapped[bool] = mapped_column(default=False)
2355 created: Mapped[datetime | None] = mapped_column(
2356 DateTime(timezone=True), server_default=func.now()
2357 )
2358 modified: Mapped[datetime | None] = mapped_column(
2359 DateTime(timezone=True), server_default=func.now()
2360 )
2361 stanza: Mapped[str]
2362 authorized_by_fingerprint_id: Mapped[int | None] = mapped_column(
2363 ForeignKey("fingerprint.id")
2364 )
2366 poolfile: Mapped["PoolFile"] = relationship()
2367 fingerprint: Mapped["Fingerprint | None"] = relationship(
2368 foreign_keys=[fingerprint_id]
2369 )
2370 authorized_by_fingerprint: Mapped["Fingerprint | None"] = relationship(
2371 foreign_keys=[authorized_by_fingerprint_id]
2372 )
2373 srcfiles: Mapped[list["DSCFile"]] = relationship(back_populates="source")
2374 suites: Mapped[list["Suite"]] = relationship(
2375 secondary="src_associations", back_populates="sources"
2376 )
2377 uploaders: Mapped[list["Maintainer"]] = relationship(secondary="src_uploaders")
2378 key: Mapped[dict[str, "SourceMetadata"]] = relationship(
2379 collection_class=attribute_keyed_dict("key"),
2380 cascade="all",
2381 back_populates="source",
2382 )
2383 contents: DynamicMapped["SrcContents"] = relationship(
2384 cascade="all",
2385 back_populates="source",
2386 )
2387 binaries: Mapped[list["DBBinary"]] = relationship(back_populates="source")
2388 extra_binary_references: DynamicMapped["DBBinary"] = relationship(
2389 secondary="extra_src_references", back_populates="extra_sources"
2390 )
2391 maintainer: Mapped["Maintainer"] = relationship(
2392 back_populates="maintains_sources", foreign_keys=[maintainer_id]
2393 )
2394 changedby: Mapped["Maintainer"] = relationship(
2395 back_populates="changed_sources", foreign_keys=[changedby_id]
2396 )
2398 metadata_proxy: AssociationProxy[dict["MetadataKey", str]] = association_proxy(
2399 "key", "value"
2400 )
2402 def __init__(
2403 self,
2404 source=None,
2405 version=None,
2406 maintainer=None,
2407 changedby=None,
2408 poolfile=None,
2409 install_date=None,
2410 fingerprint=None,
2411 ):
2412 self.source = source
2413 self.version = version
2414 self.maintainer = maintainer
2415 self.changedby = changedby
2416 self.poolfile = poolfile
2417 self.install_date = install_date
2418 self.fingerprint = fingerprint
2420 @property
2421 def pkid(self) -> int:
2422 return self.source_id
2424 @property
2425 def name(self) -> str:
2426 return self.source
2428 @property
2429 def arch_string(self) -> str:
2430 return "source"
2432 @override
2433 def properties(self) -> list[str]:
2434 return [
2435 "source",
2436 "source_id",
2437 "maintainer",
2438 "changedby",
2439 "fingerprint",
2440 "poolfile",
2441 "version",
2442 "suites_count",
2443 "install_date",
2444 "binaries_count",
2445 "uploaders_count",
2446 ]
2448 def read_control_fields(self) -> Deb822:
2449 """
2450 Reads the control information from a dsc
2452 :return: fields is the dsc information in a dictionary form
2453 """
2454 with open(self.poolfile.fullpath, "r") as fd:
2455 fields = Deb822(fd)
2456 return fields
2458 def scan_contents(self) -> set[str]:
2459 """
2460 Returns a set of names for non directories. The path names are
2461 normalized after converting them from either utf-8 or iso8859-1
2462 encoding.
2463 """
2464 fullpath = self.poolfile.fullpath
2465 from daklib.contents import UnpackedSource
2467 unpacked = UnpackedSource(fullpath)
2468 fileset = set()
2469 for name in unpacked.get_all_filenames():
2470 fileset.add(name)
2471 return fileset
2473 @property
2474 def proxy(self) -> "MetadataProxy":
2475 session = object_session(self)
2476 assert session is not None
2477 query = session.query(SourceMetadata).filter_by(source=self)
2478 return MetadataProxy(session, query)
2481__all__.append("DBSource")
2484@session_wrapper
2485def get_suites_source_in(source: str, session=None) -> "list[Suite]":
2486 """
2487 Returns list of Suite objects which given `source` name is in
2489 :param source: DBSource package name to search for
2490 :return: list of Suite objects for the given source
2491 """
2493 return session.query(Suite).filter(Suite.sources.any(source=source)).all()
2496__all__.append("get_suites_source_in")
2498# FIXME: This function fails badly if it finds more than 1 source package and
2499# its implementation is trivial enough to be inlined.
2502@session_wrapper
2503def get_source_in_suite(
2504 source: str, suite_name: Optional[str], session: "Session | None" = None
2505) -> Optional[DBSource]:
2506 """
2507 Returns a DBSource object for a combination of `source` and `suite_name`.
2509 :param source: source package name
2510 :param suite_name: the suite name
2511 :return: the version for `source` in `suite`
2512 """
2513 assert session is not None
2514 if suite_name is None: 2514 ↛ 2515line 2514 didn't jump to line 2515 because the condition on line 2514 was never true
2515 return None
2516 suite = get_suite(suite_name, session)
2517 if suite is None: 2517 ↛ 2518line 2517 didn't jump to line 2518 because the condition on line 2517 was never true
2518 return None
2519 return suite.get_sources(source).one_or_none()
2522__all__.append("get_source_in_suite")
2525@session_wrapper
2526def import_metadata_into_db(
2527 obj: Union[DBBinary, DBSource], session: "Session | None" = None
2528) -> None:
2529 """
2530 This routine works on either DBBinary or DBSource objects and imports
2531 their metadata into the database
2532 """
2533 assert session is not None
2534 fields = obj.read_control_fields()
2535 for k in fields.keys():
2536 try:
2537 # Try raw ASCII
2538 val = str(fields[k])
2539 except UnicodeEncodeError:
2540 # Fall back to UTF-8
2541 try:
2542 val = fields[k].encode("utf-8")
2543 except UnicodeEncodeError:
2544 # Finally try iso8859-1
2545 val = fields[k].encode("iso8859-1")
2546 # Otherwise we allow the exception to percolate up and we cause
2547 # a reject as someone is playing silly buggers
2549 obj.metadata_proxy[get_or_set_metadatakey(k, session)] = val
2551 session.commit_or_flush() # type: ignore[attr-defined]
2554__all__.append("import_metadata_into_db")
2557class SrcAssociations(Base):
2558 __tablename__ = "src_associations"
2559 __table_args = (
2560 UniqueConstraint("suite", "source", name="src_associations_suite_key"),
2561 )
2563 id: Mapped[int] = mapped_column(primary_key=True)
2564 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"))
2565 source: Mapped[int] = mapped_column(ForeignKey("source.id"))
2566 created: Mapped[datetime] = mapped_column(
2567 DateTime(timezone=True), server_default=func.now()
2568 )
2569 modified: Mapped[datetime] = mapped_column(
2570 DateTime(timezone=True), server_default=func.now()
2571 )
2574class SrcUploaders(Base):
2575 __tablename__ = "src_uploaders"
2576 __table_args = (
2577 UniqueConstraint("source", "maintainer", name="src_uploaders_source_key"),
2578 )
2580 id: Mapped[int] = mapped_column(primary_key=True)
2581 source: Mapped[int] = mapped_column(ForeignKey("source.id", ondelete="CASCADE"))
2582 maintainer: Mapped[int] = mapped_column(
2583 ForeignKey("maintainer.id", ondelete="CASCADE")
2584 )
2585 created: Mapped[datetime] = mapped_column(
2586 DateTime(timezone=True), server_default=func.now()
2587 )
2588 modified: Mapped[datetime] = mapped_column(
2589 DateTime(timezone=True), server_default=func.now()
2590 )
2593################################################################################
2596class SrcFormat(Base):
2597 __tablename__ = "src_format"
2599 src_format_id: Mapped[int] = mapped_column("id", primary_key=True)
2600 format_name: Mapped[str] = mapped_column(unique=True)
2601 created: Mapped[datetime] = mapped_column(
2602 DateTime(timezone=True), server_default=func.now()
2603 )
2604 modified: Mapped[datetime] = mapped_column(
2605 DateTime(timezone=True), server_default=func.now()
2606 )
2608 suites: DynamicMapped["Suite"] = relationship(
2609 secondary="suite_src_formats", back_populates="srcformats"
2610 )
2612 def __init__(self, *args, **kwargs):
2613 pass
2615 @override
2616 def __repr__(self):
2617 return "<SrcFormat %s>" % (self.format_name)
2620__all__.append("SrcFormat")
2622################################################################################
2624SUITE_FIELDS = [
2625 ("SuiteName", "suite_name"),
2626 ("SuiteID", "suite_id"),
2627 ("Version", "version"),
2628 ("Origin", "origin"),
2629 ("Label", "label"),
2630 ("Description", "description"),
2631 ("Untouchable", "untouchable"),
2632 ("Announce", "announce"),
2633 ("Codename", "codename"),
2634 ("OverrideCodename", "overridecodename"),
2635 ("ValidTime", "validtime"),
2636 ("Priority", "priority"),
2637 ("NotAutomatic", "notautomatic"),
2638 ("CopyChanges", "copychanges"),
2639 ("OverrideSuite", "overridesuite"),
2640]
2642# Why the heck don't we have any UNIQUE constraints in table suite?
2643# TODO: Add UNIQUE constraints for appropriate columns.
2646class Suite(ORMObject):
2647 __tablename__ = "suite"
2649 suite_id: Mapped[int] = mapped_column("id", primary_key=True)
2650 suite_name: Mapped[str] = mapped_column(unique=True)
2651 version: Mapped[str | None]
2652 origin: Mapped[str | None]
2653 label: Mapped[str | None]
2654 description: Mapped[str | None]
2655 untouchable: Mapped[bool] = mapped_column(default=False)
2656 codename: Mapped[str | None]
2657 overridecodename: Mapped[str | None]
2658 validtime: Mapped[int] = mapped_column(default=604800)
2659 priority: Mapped[int] = mapped_column(default=0)
2660 notautomatic: Mapped[bool] = mapped_column(default=False)
2661 copychanges: Mapped[str | None]
2662 overridesuite: Mapped[str | None]
2663 policy_queue_id: Mapped[int | None] = mapped_column(ForeignKey("policy_queue.id"))
2664 created: Mapped[datetime | None] = mapped_column(
2665 DateTime(timezone=True), server_default=func.now()
2666 )
2667 modified: Mapped[datetime | None] = mapped_column(
2668 DateTime(timezone=True), server_default=func.now()
2669 )
2670 changelog: Mapped[str | None]
2671 butautomaticupgrades: Mapped[bool] = mapped_column(default=False)
2672 signingkeys: Mapped[list[str] | None] = mapped_column(ARRAY(String()))
2673 announce: Mapped[list[str] | None] = mapped_column(ARRAY(String()))
2674 include_long_description: Mapped[bool] = mapped_column(default=False)
2675 overrideprocess: Mapped[bool] = mapped_column(default=False)
2676 overrideorigin: Mapped[str | None]
2677 allowcsset: Mapped[bool] = mapped_column(default=False)
2678 archive_id: Mapped[int] = mapped_column(ForeignKey("archive.id"))
2679 new_queue_id: Mapped[int | None] = mapped_column(ForeignKey("policy_queue.id"))
2680 close_bugs: Mapped[bool | None]
2681 mail_whitelist: Mapped[str | None]
2682 indices_compression: Mapped[list[str] | None] = mapped_column(
2683 ARRAY(String()), default=["xz"]
2684 )
2685 i18n_compression: Mapped[list[str] | None] = mapped_column(
2686 ARRAY(String()), default=["xz"]
2687 )
2688 release_suite: Mapped[str | None]
2689 debugsuite_id: Mapped[int | None] = mapped_column(ForeignKey("suite.id"))
2690 changelog_url: Mapped[str | None]
2691 accept_source_uploads: Mapped[bool | None] = mapped_column(default=True)
2692 accept_binary_uploads: Mapped[bool | None] = mapped_column(default=True)
2693 checksums: Mapped[list[str] | None] = mapped_column(
2694 ARRAY(String()), default=["sha256"]
2695 )
2696 last_changed: Mapped[datetime] = mapped_column(
2697 DateTime(timezone=False), server_default=func.now()
2698 )
2699 byhash: Mapped[bool | None] = mapped_column(default=True)
2700 separate_contents_architecture_all: Mapped[bool] = mapped_column(default=False)
2701 separate_packages_architecture_all: Mapped[bool] = mapped_column(default=False)
2702 merged_pdiffs: Mapped[bool] = mapped_column(default=True)
2703 stayofexecution: Mapped[timedelta] = mapped_column(default=timedelta(hours=0))
2705 policy_queue: Mapped["PolicyQueue | None"] = relationship(
2706 foreign_keys=[policy_queue_id]
2707 )
2708 new_queue: Mapped["PolicyQueue | None"] = relationship(foreign_keys=[new_queue_id])
2709 debug_suite: Mapped["Suite | None"] = relationship(remote_side=[suite_id])
2710 copy_queues: Mapped[list["BuildQueue"]] = relationship(
2711 secondary="suite_build_queue_copy"
2712 )
2713 srcformats: Mapped[list["SrcFormat"]] = relationship(
2714 secondary="suite_src_formats", back_populates="suites"
2715 )
2716 archive: Mapped[Archive] = relationship(back_populates="suites")
2717 acls: Mapped[set[ACL]] = relationship(secondary="suite_acl_map")
2718 components: Mapped[list["Component"]] = relationship(
2719 secondary="component_suite",
2720 back_populates="suites",
2721 order_by=lambda: Component.ordering,
2722 )
2723 architectures: Mapped[list[Architecture]] = relationship(
2724 secondary="suite_architectures", back_populates="suites"
2725 )
2726 binaries: DynamicMapped["DBBinary"] = relationship(
2727 secondary="bin_associations",
2728 back_populates="suites",
2729 )
2730 sources: DynamicMapped["DBSource"] = relationship(
2731 secondary="src_associations", back_populates="suites"
2732 )
2733 overrides: DynamicMapped["Override"] = relationship(back_populates="suite")
2735 def __init__(self, suite_name=None, version=None):
2736 self.suite_name = suite_name
2737 self.version = version
2739 @override
2740 def properties(self) -> list[str]:
2741 return [
2742 "suite_name",
2743 "version",
2744 "sources_count",
2745 "binaries_count",
2746 "overrides_count",
2747 ]
2749 @override
2750 def __eq__(self, val: Any):
2751 if isinstance(val, str): 2751 ↛ 2752line 2751 didn't jump to line 2752 because the condition on line 2751 was never true
2752 warnings.warn(
2753 "comparison with a `str` is deprecated",
2754 DeprecationWarning,
2755 stacklevel=2,
2756 )
2757 return self.suite_name == val
2758 # This signals to use the normal comparison operator
2759 return NotImplemented
2761 @override
2762 def __ne__(self, val: Any):
2763 if isinstance(val, str): 2763 ↛ 2764line 2763 didn't jump to line 2764 because the condition on line 2763 was never true
2764 warnings.warn(
2765 "comparison with a `str` is deprecated",
2766 DeprecationWarning,
2767 stacklevel=2,
2768 )
2769 return self.suite_name != val
2770 # This signals to use the normal comparison operator
2771 return NotImplemented
2773 __hash__ = ORMObject.__hash__
2775 def details(self) -> str:
2776 ret = []
2777 for disp, field in SUITE_FIELDS:
2778 val = getattr(self, field, None)
2779 if val is not None:
2780 ret.append("%s: %s" % (disp, val))
2782 return "\n".join(ret)
2784 def get_architectures(
2785 self, skipsrc: bool = False, skipall: bool = False
2786 ) -> list[Architecture]:
2787 """
2788 Returns list of Architecture objects
2790 :param skipsrc: Whether to skip returning the 'source' architecture entry
2791 :param skipall: Whether to skip returning the 'all' architecture entry
2792 :return: list of Architecture objects for the given name (may be empty)
2793 """
2795 session = object_session(self)
2796 assert session is not None
2797 q = session.query(Architecture).with_parent(self)
2798 if skipsrc:
2799 q = q.filter(Architecture.arch_string != "source")
2800 if skipall:
2801 q = q.filter(Architecture.arch_string != "all")
2802 return q.order_by(Architecture.arch_string).all()
2804 def get_sources(self, source: str) -> sqlalchemy.orm.query.Query:
2805 """
2806 Returns a query object representing DBSource that is part of this suite.
2808 :param source: source package name
2809 :return: a query of DBSource
2810 """
2812 session = object_session(self)
2813 assert session is not None
2814 return session.query(DBSource).filter_by(source=source).with_parent(self)
2816 def get_overridesuite(self) -> "Suite":
2817 if self.overridesuite is None:
2818 return self
2819 session = object_session(self)
2820 assert session is not None
2821 return session.query(Suite).filter_by(suite_name=self.overridesuite).one()
2823 def update_last_changed(self) -> None:
2824 self.last_changed = sqlalchemy.func.now()
2826 @property
2827 def path(self) -> str:
2828 return os.path.join(self.archive.path, "dists", self.suite_name)
2830 @property
2831 def release_suite_output(self) -> str:
2832 if self.release_suite is not None: 2832 ↛ 2833line 2832 didn't jump to line 2833 because the condition on line 2832 was never true
2833 return self.release_suite
2834 return self.suite_name
2837__all__.append("Suite")
2840@session_wrapper
2841def get_suite(suite: str, session: "Session | None" = None) -> Optional[Suite]:
2842 """
2843 Returns Suite object for given `suite` name.
2845 :param suite: The name of the suite
2846 :param session: Optional SQLA session object (a temporary one will be
2847 generated if not supplied)
2848 :return: Suite object for the requested suite name (None if not present)
2849 """
2851 assert session is not None
2853 # Start by looking for the dak internal name
2854 q = session.query(Suite).filter_by(suite_name=suite)
2855 try:
2856 return q.one()
2857 except NoResultFound:
2858 pass
2860 # Now try codename
2861 q = session.query(Suite).filter_by(codename=suite)
2862 try:
2863 return q.one()
2864 except NoResultFound:
2865 pass
2867 # Finally give release_suite a try
2868 q = session.query(Suite).filter_by(release_suite=suite)
2869 return q.one_or_none()
2872__all__.append("get_suite")
2875class SuiteBuildQueueCopy(Base):
2876 __tablename__ = "suite_build_queue_copy"
2878 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"), primary_key=True)
2879 build_queue_id: Mapped[int] = mapped_column(
2880 ForeignKey("build_queue.id"), primary_key=True
2881 )
2882 created: Mapped[datetime] = mapped_column(
2883 DateTime(timezone=True), server_default=func.now()
2884 )
2885 modified: Mapped[datetime] = mapped_column(
2886 DateTime(timezone=True), server_default=func.now()
2887 )
2890class SuiteSrcFormats(Base):
2891 __tablename__ = "suite_src_formats"
2893 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"), primary_key=True)
2894 src_format: Mapped[int] = mapped_column(
2895 ForeignKey("src_format.id"), primary_key=True
2896 )
2897 created: Mapped[datetime] = mapped_column(
2898 DateTime(timezone=True), server_default=func.now()
2899 )
2900 modified: Mapped[datetime] = mapped_column(
2901 DateTime(timezone=True), server_default=func.now()
2902 )
2905class SuiteAclMap(Base):
2906 __tablename__ = "suite_acl_map"
2908 suite_id: Mapped[int] = mapped_column(
2909 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True
2910 )
2911 acl_id: Mapped[int] = mapped_column(ForeignKey("acl.id"), primary_key=True)
2914class SuiteArchitectures(Base):
2915 __tablename__ = "suite_architectures"
2917 suite: Mapped[int] = mapped_column(ForeignKey("suite.id"), primary_key=True)
2918 architecture: Mapped[int] = mapped_column(
2919 ForeignKey("architecture.id"), primary_key=True
2920 )
2921 created: Mapped[datetime] = mapped_column(
2922 DateTime(timezone=True), server_default=func.now()
2923 )
2924 modified: Mapped[datetime] = mapped_column(
2925 DateTime(timezone=True), server_default=func.now()
2926 )
2929################################################################################
2932@session_wrapper
2933def get_suite_architectures(
2934 suite_name: str,
2935 skipsrc: bool = False,
2936 skipall: bool = False,
2937 session: "Session | None" = None,
2938) -> list[Architecture]:
2939 """
2940 Returns list of Architecture objects for given `suite` name. The list is
2941 empty if `suite` does not exist.
2943 :param suite_name: Suite name to search for
2944 :param skipsrc: Whether to skip returning the 'source' architecture entry
2945 :param skipall: Whether to skip returning the 'all' architecture entry
2946 :param session: Optional SQL session object (a temporary one will be
2947 generated if not supplied)
2948 :return: list of Architecture objects for the given name (may be empty)
2949 """
2951 assert session is not None
2952 suite = get_suite(suite_name, session)
2953 if suite is None: 2953 ↛ 2954line 2953 didn't jump to line 2954 because the condition on line 2953 was never true
2954 return []
2955 return suite.get_architectures(skipsrc, skipall)
2958__all__.append("get_suite_architectures")
2960################################################################################
2963class Uid(ORMObject):
2964 __tablename__ = "uid"
2966 uid_id: Mapped[int] = mapped_column("id", primary_key=True)
2967 uid: Mapped[str] = mapped_column(unique=True)
2968 name: Mapped[str | None]
2969 created: Mapped[datetime | None] = mapped_column(
2970 DateTime(timezone=True), server_default=func.now()
2971 )
2972 modified: Mapped[datetime | None] = mapped_column(
2973 DateTime(timezone=True), server_default=func.now()
2974 )
2976 fingerprint: Mapped[list["Fingerprint"]] = relationship(back_populates="uid")
2978 def __init__(self, uid=None, name=None):
2979 self.uid = uid
2980 self.name = name
2982 @override
2983 def __eq__(self, val: Any):
2984 if isinstance(val, str):
2985 warnings.warn(
2986 "comparison with a `str` is deprecated",
2987 DeprecationWarning,
2988 stacklevel=2,
2989 )
2990 return self.uid == val
2991 # This signals to use the normal comparison operator
2992 return NotImplemented
2994 @override
2995 def __ne__(self, val: Any):
2996 if isinstance(val, str):
2997 warnings.warn(
2998 "comparison with a `str` is deprecated",
2999 DeprecationWarning,
3000 stacklevel=2,
3001 )
3002 return self.uid != val
3003 # This signals to use the normal comparison operator
3004 return NotImplemented
3006 __hash__ = ORMObject.__hash__
3008 @override
3009 def properties(self) -> list[str]:
3010 return ["uid", "name", "fingerprint"]
3013__all__.append("Uid")
3016@session_wrapper
3017def get_or_set_uid(uidname: str, session: "Session | None" = None) -> Uid:
3018 """
3019 Returns uid object for given uidname.
3021 If no matching uidname is found, a row is inserted.
3023 :param uidname: The uid to add
3024 :param session: Optional SQL session object (a temporary one will be
3025 generated if not supplied). If not passed, a commit will be performed at
3026 the end of the function, otherwise the caller is responsible for commiting.
3027 :return: the uid object for the given uidname
3028 """
3030 assert session is not None
3031 q = session.query(Uid).filter_by(uid=uidname)
3033 try:
3034 ret = q.one()
3035 except NoResultFound:
3036 uid = Uid()
3037 uid.uid = uidname
3038 session.add(uid)
3039 session.commit_or_flush() # type: ignore[attr-defined]
3040 ret = uid
3042 return ret
3045__all__.append("get_or_set_uid")
3048@session_wrapper
3049def get_uid_from_fingerprint(
3050 fpr: str, session: "Session | None" = None
3051) -> Optional[Uid]:
3052 assert session is not None
3053 q = session.query(Uid)
3054 q = q.join(Fingerprint).filter_by(fingerprint=fpr)
3056 return q.one_or_none()
3059__all__.append("get_uid_from_fingerprint")
3061################################################################################
3064class MetadataKey(ORMObject):
3065 __tablename__ = "metadata_keys"
3067 key_id: Mapped[int] = mapped_column(primary_key=True)
3068 key: Mapped[str] = mapped_column(unique=True)
3069 ordering: Mapped[int] = mapped_column(default=0)
3071 def __init__(self, key=None):
3072 self.key = key
3074 @override
3075 def properties(self) -> list[str]:
3076 return ["key"]
3079__all__.append("MetadataKey")
3082@session_wrapper
3083def get_or_set_metadatakey(
3084 keyname: str, session: "Session | None" = None
3085) -> MetadataKey:
3086 """
3087 Returns MetadataKey object for given uidname.
3089 If no matching keyname is found, a row is inserted.
3091 :param keyname: The keyname to add
3092 :param session: Optional SQL session object (a temporary one will be
3093 generated if not supplied). If not passed, a commit will be performed at
3094 the end of the function, otherwise the caller is responsible for commiting.
3095 :return: the metadatakey object for the given keyname
3096 """
3098 assert session is not None
3099 q = session.query(MetadataKey).filter_by(key=keyname)
3101 try:
3102 ret = q.one()
3103 except NoResultFound:
3104 ret = MetadataKey(keyname)
3105 session.add(ret)
3106 session.commit_or_flush() # type: ignore[attr-defined]
3108 return ret
3111__all__.append("get_or_set_metadatakey")
3113################################################################################
3116class BinaryMetadata(ORMObject):
3117 __tablename__ = "binaries_metadata"
3119 binary_id: Mapped[int] = mapped_column(
3120 "bin_id", ForeignKey("binaries.id", ondelete="CASCADE"), primary_key=True
3121 )
3122 key_id: Mapped[int] = mapped_column(
3123 ForeignKey("metadata_keys.key_id"), primary_key=True
3124 )
3125 value: Mapped[str]
3127 binary: Mapped["DBBinary"] = relationship(back_populates="key")
3128 key: Mapped[MetadataKey] = relationship()
3130 def __init__(self, key=None, value=None, binary=None):
3131 self.key = key
3132 self.value = value
3133 if binary is not None:
3134 self.binary = binary
3136 @override
3137 def properties(self) -> list[str]:
3138 return ["binary", "key", "value"]
3141__all__.append("BinaryMetadata")
3143################################################################################
3146class SourceMetadata(ORMObject):
3147 __tablename__ = "source_metadata"
3149 source_id: Mapped[int] = mapped_column(
3150 "src_id", ForeignKey("source.id", ondelete="CASCADE"), primary_key=True
3151 )
3152 key_id: Mapped[int] = mapped_column(
3153 ForeignKey("metadata_keys.key_id"), primary_key=True
3154 )
3155 value: Mapped[str]
3157 source: Mapped["DBSource"] = relationship(back_populates="key")
3158 key: Mapped[MetadataKey] = relationship()
3160 def __init__(self, key=None, value=None, source=None):
3161 self.key = key
3162 self.value = value
3163 if source is not None:
3164 self.source = source
3166 @override
3167 def properties(self) -> list[str]:
3168 return ["source", "key", "value"]
3171__all__.append("SourceMetadata")
3173################################################################################
3176class MetadataProxy:
3177 def __init__(self, session, query):
3178 self.session = session
3179 self.query = query
3181 def _get(self, key):
3182 metadata_key = self.session.query(MetadataKey).filter_by(key=key).first()
3183 if metadata_key is None:
3184 return None
3185 metadata = self.query.filter_by(key=metadata_key).first()
3186 return metadata
3188 def __contains__(self, key: str) -> bool:
3189 if self._get(key) is not None: 3189 ↛ 3191line 3189 didn't jump to line 3191 because the condition on line 3189 was always true
3190 return True
3191 return False
3193 def __getitem__(self, key: str) -> str:
3194 metadata = self._get(key)
3195 if metadata is None:
3196 raise KeyError
3197 return metadata.value
3199 @overload
3200 def get(self, key: str, default: str) -> str: ... # noqa: E704 3200 ↛ exitline 3200 didn't return from function 'get' because
3202 @overload
3203 def get(self, key: str, default: None = None) -> str | None: ... # noqa: E704 3203 ↛ exitline 3203 didn't return from function 'get' because
3205 def get(self, key, default=None):
3206 try:
3207 return self[key]
3208 except KeyError:
3209 return default
3212################################################################################
3215class VersionCheck(ORMObject):
3216 __tablename__ = "version_check"
3218 suite_id: Mapped[int] = mapped_column(
3219 "suite", ForeignKey("suite.id"), primary_key=True
3220 )
3221 check: Mapped[str] = mapped_column(primary_key=True)
3222 reference_id: Mapped[int] = mapped_column(
3223 "reference", ForeignKey("suite.id"), primary_key=True
3224 )
3226 suite: Mapped["Suite"] = relationship(foreign_keys=[suite_id])
3227 reference: Mapped["Suite"] = relationship(
3228 foreign_keys=[reference_id], lazy="joined"
3229 )
3231 def __init__(self, *args, **kwargs):
3232 pass
3234 @override
3235 def properties(self) -> list[str]:
3236 return ["check"]
3239__all__.append("VersionCheck")
3242@session_wrapper
3243def get_version_checks(
3244 suite_name: str, check: Optional[str] = None, session: "Session | None" = None
3245) -> list[VersionCheck]:
3246 assert session is not None
3247 suite = get_suite(suite_name, session)
3248 if not suite: 3248 ↛ 3251line 3248 didn't jump to line 3251 because the condition on line 3248 was never true
3249 # Make sure that what we return is iterable so that list comprehensions
3250 # involving this don't cause a traceback
3251 return []
3252 q = session.query(VersionCheck).filter_by(suite=suite)
3253 if check: 3253 ↛ 3255line 3253 didn't jump to line 3255 because the condition on line 3253 was always true
3254 q = q.filter_by(check=check)
3255 return q.all()
3258__all__.append("get_version_checks")
3260################################################################################
3263class ExternalSignatureRequests(Base):
3264 __tablename__ = "external_signature_requests"
3266 association_id: Mapped[int] = mapped_column(
3267 ForeignKey("bin_associations.id", ondelete="CASCADE"), primary_key=True
3268 )
3269 suite_id: Mapped[int] = mapped_column(
3270 ForeignKey("suite.id", ondelete="CASCADE"), primary_key=True
3271 )
3274################################################################################
3276package_list = Table(
3277 "package_list",
3278 Base.metadata,
3279 Column("package", String()),
3280 Column("version", DebVersion()),
3281 Column("source", String()),
3282 Column("source_version", DebVersion()),
3283 Column("suite", String()),
3284 Column("codename", String()),
3285 Column("archive", String()),
3286 Column("component", String()),
3287 Column("display_suite", String()),
3288 Column("architecture_is_source", Boolean()),
3289 Column("architecture", String()),
3290 Column("type", String()),
3291)
3293################################################################################
3296class DBConn:
3297 """
3298 database module init.
3299 """
3301 __shared_state: dict[str, Any] = {}
3303 def __init__(self, *, debug=False) -> None:
3304 self.__dict__ = self.__shared_state
3306 if not getattr(self, "initialised", False):
3307 self.initialised = True
3308 self.debug = debug
3309 self.__createconn()
3311 ## Connection functions
3312 def __createconn(self) -> None:
3313 from .config import Config
3315 cnf = Config()
3316 if "DB::Service" in cnf: 3316 ↛ 3317line 3316 didn't jump to line 3317 because the condition on line 3316 was never true
3317 connstr = "postgresql://service=%s" % cnf["DB::Service"]
3318 elif "DB::Host" in cnf:
3319 # TCP/IP
3320 connstr = "postgresql://%s" % cnf["DB::Host"]
3321 if "DB::Port" in cnf and cnf["DB::Port"] != "-1": 3321 ↛ 3322line 3321 didn't jump to line 3322 because the condition on line 3321 was never true
3322 connstr += ":%s" % cnf["DB::Port"]
3323 connstr += "/%s" % cnf["DB::Name"]
3324 else:
3325 # Unix Socket
3326 connstr = "postgresql:///%s" % cnf["DB::Name"]
3327 if "DB::Port" in cnf and cnf["DB::Port"] != "-1": 3327 ↛ 3328line 3327 didn't jump to line 3328 because the condition on line 3327 was never true
3328 connstr += "?port=%s" % cnf["DB::Port"]
3330 engine_args: dict[str, object] = {"echo": self.debug}
3331 if "DB::PoolSize" in cnf:
3332 engine_args["pool_size"] = int(cnf["DB::PoolSize"])
3333 if "DB::MaxOverflow" in cnf:
3334 engine_args["max_overflow"] = int(cnf["DB::MaxOverflow"])
3335 # we don't support non-utf-8 connections
3336 engine_args["client_encoding"] = "utf-8"
3338 # Monkey patch a new dialect in in order to support service= syntax
3339 import sqlalchemy.dialects.postgresql
3340 from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
3342 class PGDialect_psycopg2_dak(PGDialect_psycopg2):
3343 @override
3344 def create_connect_args(self, url):
3345 if str(url).startswith("postgresql://service="): 3345 ↛ 3347line 3345 didn't jump to line 3347 because the condition on line 3345 was never true
3346 # Eww
3347 servicename = str(url)[21:]
3348 return (["service=%s" % servicename], {})
3349 else:
3350 return PGDialect_psycopg2.create_connect_args(self, url)
3352 sqlalchemy.dialects.postgresql.base.dialect = PGDialect_psycopg2_dak # type: ignore[attr-defined]
3354 try:
3355 self.db_pg = create_engine(connstr, **engine_args)
3356 self.db_smaker = sessionmaker(
3357 bind=self.db_pg, autoflush=True, autocommit=False
3358 )
3359 except OperationalError as e:
3360 from . import utils
3362 utils.fubar("Cannot connect to database (%s)" % str(e))
3364 self.pid = os.getpid()
3366 def session(self, work_mem=0) -> "Session":
3367 """
3368 Returns a new session object. If a work_mem parameter is provided a new
3369 transaction is started and the work_mem parameter is set for this
3370 transaction. The work_mem parameter is measured in MB. A default value
3371 will be used if the parameter is not set.
3372 """
3373 # reinitialize DBConn in new processes
3374 if self.pid != os.getpid():
3375 self.__createconn()
3376 session = self.db_smaker()
3377 if work_mem > 0:
3378 session.execute(sql.text("SET LOCAL work_mem TO '%d MB'" % work_mem))
3379 return session
3382__all__.append("DBConn")