1"""DB access class
3@contact: Debian FTPMaster <ftpmaster@debian.org>
4@copyright: 2000, 2001, 2002, 2003, 2004, 2006 James Troup <james@nocrew.org>
5@copyright: 2008-2009 Mark Hymers <mhy@debian.org>
6@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org>
7@copyright: 2009 Mike O'Connor <stew@debian.org>
8@license: GNU General Public License version 2 or later
9"""
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25################################################################################
27# < mhy> I need a funny comment
28# < sgran> two peanuts were walking down a dark street
29# < sgran> one was a-salted
30# * mhy looks up the definition of "funny"
32################################################################################
34import functools
35import inspect
36import os
37import re
38import subprocess
39import warnings
40from collections.abc import Iterable
41from os.path import normpath
42from tarfile import TarFile
43from typing import TYPE_CHECKING, Optional, Union
45import apt_pkg
46import sqlalchemy
47import sqlalchemy.types
48from debian.debfile import Deb822
49from sqlalchemy import Table, create_engine, desc
51# Don't remove this, we re-export the exceptions to scripts which import us
52from sqlalchemy.exc import IntegrityError, OperationalError, SAWarning, SQLAlchemyError
53from sqlalchemy.ext.associationproxy import association_proxy
54from sqlalchemy.orm import (
55 backref,
56 mapper,
57 object_session,
58 relation,
59 sessionmaker,
60)
61from sqlalchemy.orm.collections import attribute_mapped_collection
62from sqlalchemy.orm.exc import NoResultFound
64import daklib.gpg
66from .aptversion import AptVersion
68# Only import Config until Queue stuff is changed to store its config
69# in the database
70from .config import Config
71from .textutils import fix_maintainer
73# suppress some deprecation warnings in squeeze related to sqlalchemy
74warnings.filterwarnings(
75 "ignore", "Predicate of partial index .* ignored during reflection", SAWarning
76)
78# (Debian 12 "bookworm") Silence warning targeted at SQLAlchemy dialect maintainers
79warnings.filterwarnings(
80 "ignore",
81 "Dialect postgresql:psycopg2 will not make use of SQL compilation caching.*",
82 SAWarning,
83)
85from .database.base import Base
87if TYPE_CHECKING: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true
88 import sqlalchemy.orm.query
91################################################################################
93# Patch in support for the debversion field type so that it works during
94# reflection
97class DebVersion(sqlalchemy.types.UserDefinedType):
98 def get_col_spec(self):
99 return "DEBVERSION"
101 def bind_processor(self, dialect):
102 return None
104 def result_processor(self, dialect, coltype):
105 return None
108from sqlalchemy.databases import postgresql
110postgresql.ischema_names["debversion"] = DebVersion
112################################################################################
114__all__ = ["IntegrityError", "SQLAlchemyError", "DebVersion"]
116################################################################################
119def session_wrapper(fn):
120 """
121 Wrapper around common ".., session=None):" handling. If the wrapped
122 function is called without passing 'session', we create a local one
123 and destroy it when the function ends.
125 Also attaches a commit_or_flush method to the session; if we created a
126 local session, this is a synonym for session.commit(), otherwise it is a
127 synonym for session.flush().
128 """
130 @functools.wraps(fn)
131 def wrapped(*args, **kwargs):
132 private_transaction = False
134 # Find the session object
135 session = kwargs.get("session")
137 if session is None:
138 if len(args) < len(inspect.getfullargspec(fn).args):
139 # No session specified as last argument or in kwargs
140 private_transaction = True
141 session = kwargs["session"] = DBConn().session()
142 else:
143 # Session is last argument in args
144 session = args[-1]
145 if session is None: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true
146 args = list(args)
147 session = args[-1] = DBConn().session()
148 private_transaction = True
150 if private_transaction:
151 session.commit_or_flush = session.commit
152 else:
153 session.commit_or_flush = session.flush
155 try:
156 return fn(*args, **kwargs)
157 finally:
158 if private_transaction:
159 # We created a session; close it.
160 session.close()
162 return wrapped
165__all__.append("session_wrapper")
167################################################################################
170class ORMObject:
171 """
172 ORMObject is a base class for all ORM classes mapped by SQLalchemy. All
173 derived classes must implement the properties() method.
174 """
176 def properties(self) -> list[str]:
177 """
178 This method should be implemented by all derived classes and returns a
179 list of the important properties. The properties 'created' and
180 'modified' will be added automatically. A suffix '_count' should be
181 added to properties that are lists or query objects. The most important
182 property name should be returned as the first element in the list
183 because it is used by repr().
184 """
185 return []
187 def classname(self) -> str:
188 """
189 Returns the name of the class.
190 """
191 return type(self).__name__
193 def __repr__(self):
194 """
195 Returns a short string representation of the object using the first
196 element from the properties() method.
197 """
198 primary_property = self.properties()[0]
199 value = getattr(self, primary_property)
200 return "<%s %s>" % (self.classname(), str(value))
202 def __str__(self):
203 """
204 Returns a human readable form of the object using the properties()
205 method.
206 """
207 return "<%s(...)>" % (self.classname())
209 @classmethod
210 @session_wrapper
211 def get(cls, primary_key, session=None):
212 """
213 This is a support function that allows getting an object by its primary
214 key.
216 Architecture.get(3[, session])
218 instead of the more verbose
220 session.query(Architecture).get(3)
221 """
222 return session.query(cls).get(primary_key)
224 def session(self):
225 """
226 Returns the current session that is associated with the object. May
227 return None is object is in detached state.
228 """
230 return object_session(self)
233__all__.append("ORMObject")
235################################################################################
238class ACL(ORMObject):
239 def __repr__(self):
240 return "<ACL {0}>".format(self.name)
243__all__.append("ACL")
246class ACLPerSource(ORMObject):
247 def __repr__(self):
248 return "<ACLPerSource acl={0} fingerprint={1} source={2} reason={3}>".format(
249 self.acl.name, self.fingerprint.fingerprint, self.source, self.reason
250 )
253__all__.append("ACLPerSource")
256class ACLPerSuite(ORMObject):
257 def __repr__(self):
258 return "<ACLPerSuite acl={0} fingerprint={1} suite={2} reason={3}>".format(
259 self.acl.name,
260 self.fingerprint.fingerprint,
261 self.suite.suite_name,
262 self.reason,
263 )
266__all__.append("ACLPerSuite")
268################################################################################
271from .database.architecture import Architecture
273__all__.append("Architecture")
276@session_wrapper
277def get_architecture(architecture: str, session=None) -> Optional[Architecture]:
278 """
279 Returns database id for given `architecture`.
281 :param architecture: The name of the architecture
282 :param session: Optional SQLA session object (a temporary one will be
283 generated if not supplied)
284 :return: Architecture object for the given arch (None if not present)
285 """
287 q = session.query(Architecture).filter_by(arch_string=architecture)
288 return q.one_or_none()
291__all__.append("get_architecture")
293################################################################################
296class Archive:
297 def __init__(self, *args, **kwargs):
298 pass
300 def __repr__(self):
301 return "<Archive %s>" % self.archive_name
304__all__.append("Archive")
307@session_wrapper
308def get_archive(archive: str, session=None) -> Optional[Archive]:
309 """
310 returns database id for given `archive`.
312 :param archive: the name of the arhive
313 :param session: Optional SQLA session object (a temporary one will be
314 generated if not supplied)
315 :return: Archive object for the given name (None if not present)
316 """
317 archive = archive.lower()
319 q = session.query(Archive).filter_by(archive_name=archive)
320 return q.one_or_none()
323__all__.append("get_archive")
325################################################################################
328class ArchiveFile:
329 def __init__(self, archive=None, component=None, file=None):
330 self.archive = archive
331 self.component = component
332 self.file = file
334 @property
335 def path(self):
336 return os.path.join(
337 self.archive.path, "pool", self.component.component_name, self.file.filename
338 )
341__all__.append("ArchiveFile")
343################################################################################
346class BinContents(ORMObject):
347 def __init__(self, file=None, binary=None):
348 self.file = file
349 self.binary = binary
351 def properties(self) -> list[str]:
352 return ["file", "binary"]
355__all__.append("BinContents")
357################################################################################
360class DBBinary(ORMObject):
361 def __init__(
362 self,
363 package=None,
364 source=None,
365 version=None,
366 maintainer=None,
367 architecture=None,
368 poolfile=None,
369 binarytype="deb",
370 fingerprint=None,
371 ):
372 self.package = package
373 self.source = source
374 self.version = version
375 self.maintainer = maintainer
376 self.architecture = architecture
377 self.poolfile = poolfile
378 self.binarytype = binarytype
379 self.fingerprint = fingerprint
381 @property
382 def pkid(self) -> int:
383 return self.binary_id
385 @property
386 def name(self) -> str:
387 return self.package
389 @property
390 def arch_string(self) -> str:
391 return "%s" % self.architecture
393 def properties(self) -> list[str]:
394 return [
395 "package",
396 "version",
397 "maintainer",
398 "source",
399 "architecture",
400 "poolfile",
401 "binarytype",
402 "fingerprint",
403 "install_date",
404 "suites_count",
405 "binary_id",
406 "contents_count",
407 "extra_sources",
408 ]
410 metadata = association_proxy("key", "value")
412 def scan_contents(self) -> Iterable[str]:
413 """
414 Yields the contents of the package. Only regular files are yielded and
415 the path names are normalized after converting them from either utf-8
416 or iso8859-1 encoding. It yields the string ' <EMPTY PACKAGE>' if the
417 package does not contain any regular file.
418 """
419 fullpath = self.poolfile.fullpath
420 dpkg_cmd = ("dpkg-deb", "--fsys-tarfile", fullpath)
421 dpkg = subprocess.Popen(dpkg_cmd, stdout=subprocess.PIPE)
422 tar = TarFile.open(fileobj=dpkg.stdout, mode="r|")
423 for member in tar.getmembers():
424 if not member.isdir():
425 name = normpath(member.name)
426 yield name
427 tar.close()
428 dpkg.stdout.close()
429 dpkg.wait()
431 def read_control(self) -> bytes:
432 """
433 Reads the control information from a binary.
435 :return: stanza text of the control section.
436 """
437 from . import utils
439 fullpath = self.poolfile.fullpath
440 return utils.deb_extract_control(fullpath)
442 def read_control_fields(self) -> apt_pkg.TagSection:
443 """
444 Reads the control information from a binary and return
445 as a dictionary.
447 :return: fields of the control section as a dictionary.
448 """
449 stanza = self.read_control()
450 return apt_pkg.TagSection(stanza)
452 @property
453 def proxy(self) -> "MetadataProxy":
454 session = object_session(self)
455 query = session.query(BinaryMetadata).filter_by(binary=self)
456 return MetadataProxy(session, query)
459__all__.append("DBBinary")
462@session_wrapper
463def get_suites_binary_in(package: str, session=None) -> "list[Suite]":
464 """
465 Returns list of Suite objects which given `package` name is in
467 :param package: DBBinary package name to search for
468 :return: list of Suite objects for the given package
469 """
471 return (
472 session.query(Suite)
473 .filter(Suite.binaries.any(DBBinary.package == package))
474 .all()
475 )
478__all__.append("get_suites_binary_in")
481@session_wrapper
482def get_component_by_package_suite(
483 package: str, suite_list: list[str], arch_list: Optional[str] = None, session=None
484) -> Optional[str]:
485 """
486 Returns the component name of the newest binary package in suite_list or
487 None if no package is found. The result can be optionally filtered by a list
488 of architecture names.
490 :param package: DBBinary package name to search for
491 :param suite_list: list of suite_name items
492 :param arch_list: optional list of arch_string items that defaults to []
493 :return: name of component or None
494 """
496 q = (
497 session.query(DBBinary)
498 .filter_by(package=package)
499 .join(DBBinary.suites)
500 .filter(Suite.suite_name.in_(suite_list))
501 )
502 if arch_list:
503 q = q.join(DBBinary.architecture).filter(
504 Architecture.arch_string.in_(arch_list)
505 )
506 binary = q.order_by(desc(DBBinary.version)).first()
507 if binary is None:
508 return None
509 else:
510 return binary.poolfile.component.component_name
513__all__.append("get_component_by_package_suite")
515################################################################################
518class BuildQueue:
519 def __init__(self, *args, **kwargs):
520 pass
522 def __repr__(self):
523 return "<BuildQueue %s>" % self.queue_name
526__all__.append("BuildQueue")
528################################################################################
531class Component(ORMObject):
532 def __init__(self, component_name=None):
533 self.component_name = component_name
535 def __eq__(self, val):
536 if isinstance(val, str): 536 ↛ 537line 536 didn't jump to line 537, because the condition on line 536 was never true
537 warnings.warn(
538 "comparison with a `str` is deprecated",
539 DeprecationWarning,
540 stacklevel=2,
541 )
542 return self.component_name == val
543 # This signals to use the normal comparison operator
544 return NotImplemented
546 def __ne__(self, val):
547 if isinstance(val, str):
548 warnings.warn(
549 "comparison with a `str` is deprecated",
550 DeprecationWarning,
551 stacklevel=2,
552 )
553 return self.component_name != val
554 # This signals to use the normal comparison operator
555 return NotImplemented
557 __hash__ = ORMObject.__hash__
559 def properties(self) -> list[str]:
560 return [
561 "component_name",
562 "component_id",
563 "description",
564 "meets_dfsg",
565 "overrides_count",
566 ]
569__all__.append("Component")
572@session_wrapper
573def get_component(component: str, session=None) -> Optional[Component]:
574 """
575 Returns database id for given `component`.
577 :param component: The name of the override type
578 :return: the database id for the given component
579 """
580 component = component.lower()
582 q = session.query(Component).filter_by(component_name=component)
584 return q.one_or_none()
587__all__.append("get_component")
590def get_mapped_component_name(component_name):
591 cnf = Config()
592 for m in cnf.value_list("ComponentMappings"): 592 ↛ 593line 592 didn't jump to line 593, because the loop on line 592 never started
593 (src, dst) = m.split()
594 if component_name == src:
595 component_name = dst
596 return component_name
599__all__.append("get_mapped_component_name")
602@session_wrapper
603def get_mapped_component(component_name: str, session=None) -> Optional[Component]:
604 """get component after mappings
606 Evaluate component mappings from ComponentMappings in dak.conf for the
607 given component name.
609 .. todo::
611 ansgar wants to get rid of this. It's currently only used for
612 the security archive
614 :param component_name: component name
615 :param session: database session
616 :return: component after applying maps or :const:`None`
617 """
618 component_name = get_mapped_component_name(component_name)
619 component = (
620 session.query(Component).filter_by(component_name=component_name).first()
621 )
622 return component
625__all__.append("get_mapped_component")
628@session_wrapper
629def get_component_names(session=None) -> list[str]:
630 """
631 Returns list of strings of component names.
633 :return: list of strings of component names
634 """
636 return [x.component_name for x in session.query(Component).all()]
639__all__.append("get_component_names")
641################################################################################
644class DBConfig:
645 def __init__(self, *args, **kwargs):
646 pass
648 def __repr__(self):
649 return "<DBConfig %s>" % self.name
652__all__.append("DBConfig")
654################################################################################
657class DSCFile:
658 def __init__(self, *args, **kwargs):
659 pass
661 def __repr__(self):
662 return "<DSCFile %s>" % self.dscfile_id
665__all__.append("DSCFile")
668@session_wrapper
669def get_dscfiles(
670 dscfile_id: Optional[int] = None,
671 source_id: Optional[int] = None,
672 poolfile_id: Optional[int] = None,
673 session=None,
674) -> list[DSCFile]:
675 """
676 Returns a list of DSCFiles which may be empty
678 :param dscfile_id: the dscfile_id of the DSCFiles to find
679 :param source_id: the source id related to the DSCFiles to find
680 :param poolfile_id: the poolfile id related to the DSCFiles to find
681 :return: Possibly empty list of DSCFiles
682 """
684 q = session.query(DSCFile)
686 if dscfile_id is not None:
687 q = q.filter_by(dscfile_id=dscfile_id)
689 if source_id is not None:
690 q = q.filter_by(source_id=source_id)
692 if poolfile_id is not None:
693 q = q.filter_by(poolfile_id=poolfile_id)
695 return q.all()
698__all__.append("get_dscfiles")
700################################################################################
703class ExternalOverride(ORMObject):
704 def __init__(self, *args, **kwargs):
705 pass
707 def __repr__(self):
708 return "<ExternalOverride %s = %s: %s>" % (self.package, self.key, self.value)
711__all__.append("ExternalOverride")
713################################################################################
716class PoolFile(ORMObject):
717 def __init__(self, filename=None, filesize=-1, md5sum=None):
718 self.filename = filename
719 self.filesize = filesize
720 self.md5sum = md5sum
722 @property
723 def fullpath(self) -> str:
724 session = DBConn().session().object_session(self)
725 af = (
726 session.query(ArchiveFile)
727 .join(Archive)
728 .filter(ArchiveFile.file == self)
729 .order_by(Archive.tainted.desc())
730 .first()
731 )
732 return af.path
734 @property
735 def component(self) -> Component:
736 session = DBConn().session().object_session(self)
737 component_id = (
738 session.query(ArchiveFile.component_id)
739 .filter(ArchiveFile.file == self)
740 .group_by(ArchiveFile.component_id)
741 .one()
742 )
743 return session.query(Component).get(component_id)
745 @property
746 def basename(self) -> str:
747 return os.path.basename(self.filename)
749 def properties(self) -> list[str]:
750 return [
751 "filename",
752 "file_id",
753 "filesize",
754 "md5sum",
755 "sha1sum",
756 "sha256sum",
757 "source",
758 "binary",
759 "last_used",
760 ]
763__all__.append("PoolFile")
765################################################################################
768class Fingerprint(ORMObject):
769 def __init__(self, fingerprint=None):
770 self.fingerprint = fingerprint
772 def properties(self) -> list[str]:
773 return ["fingerprint", "fingerprint_id", "keyring", "uid", "binary_reject"]
776__all__.append("Fingerprint")
779@session_wrapper
780def get_fingerprint(fpr: str, session=None) -> Optional[Fingerprint]:
781 """
782 Returns Fingerprint object for given fpr.
784 :param fpr: The fpr to find / add
785 :param session: Optional SQL session object (a temporary one will be
786 generated if not supplied).
787 :return: the Fingerprint object for the given fpr or None
788 """
790 q = session.query(Fingerprint).filter_by(fingerprint=fpr)
791 return q.one_or_none()
794__all__.append("get_fingerprint")
797@session_wrapper
798def get_or_set_fingerprint(fpr: str, session=None) -> Fingerprint:
799 """
800 Returns Fingerprint object for given fpr.
802 If no matching fpr is found, a row is inserted.
804 :param fpr: The fpr to find / add
805 :param session: Optional SQL session object (a temporary one will be
806 generated if not supplied). If not passed, a commit will be performed at
807 the end of the function, otherwise the caller is responsible for commiting.
808 A flush will be performed either way.
809 :return: the Fingerprint object for the given fpr
810 """
812 q = session.query(Fingerprint).filter_by(fingerprint=fpr)
814 try:
815 ret = q.one()
816 except NoResultFound:
817 fingerprint = Fingerprint()
818 fingerprint.fingerprint = fpr
819 session.add(fingerprint)
820 session.commit_or_flush()
821 ret = fingerprint
823 return ret
826__all__.append("get_or_set_fingerprint")
828################################################################################
830# Helper routine for Keyring class
833def get_ldap_name(entry) -> str:
834 name = []
835 for k in ["cn", "mn", "sn"]:
836 ret = entry.get(k)
837 if not ret:
838 continue
839 value = ret[0].decode()
840 if value and value[0] != "-":
841 name.append(value)
842 return " ".join(name)
845################################################################################
848class Keyring:
849 keys = {}
850 fpr_lookup: dict[str, str] = {}
852 def __init__(self, *args, **kwargs):
853 pass
855 def __repr__(self):
856 return "<Keyring %s>" % self.keyring_name
858 def de_escape_gpg_str(self, txt: str) -> str:
859 esclist = re.split(r"(\\x..)", txt)
860 for x in range(1, len(esclist), 2): 860 ↛ 861line 860 didn't jump to line 861, because the loop on line 860 never started
861 esclist[x] = "%c" % (int(esclist[x][2:], 16))
862 return "".join(esclist)
864 def parse_address(self, uid: str) -> tuple[str, str]:
865 """parses uid and returns a tuple of real name and email address"""
866 import email.utils
868 (name, address) = email.utils.parseaddr(uid)
869 name = re.sub(r"\s*[(].*[)]", "", name)
870 name = self.de_escape_gpg_str(name)
871 if name == "":
872 name = uid
873 return (name, address)
875 def load_keys(self, keyring: str) -> None:
876 if not self.keyring_id: 876 ↛ 877line 876 didn't jump to line 877, because the condition on line 876 was never true
877 raise Exception("Must be initialized with database information")
879 cmd = [
880 "gpg",
881 "--no-default-keyring",
882 "--keyring",
883 keyring,
884 "--with-colons",
885 "--fingerprint",
886 "--fingerprint",
887 ]
888 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
890 key = None
891 need_fingerprint = False
893 for line_raw in p.stdout:
894 try:
895 line = line_raw.decode()
896 except UnicodeDecodeError:
897 # Some old UIDs might not use UTF-8 encoding. We assume they
898 # use latin1.
899 line = line_raw.decode("latin1")
900 field = line.split(":")
901 if field[0] == "pub":
902 key = field[4]
903 self.keys[key] = {}
904 (name, addr) = self.parse_address(field[9])
905 if "@" in addr: 905 ↛ 906line 905 didn't jump to line 906, because the condition on line 905 was never true
906 self.keys[key]["email"] = addr
907 self.keys[key]["name"] = name
908 need_fingerprint = True
909 elif key and field[0] == "uid":
910 (name, addr) = self.parse_address(field[9])
911 if "email" not in self.keys[key] and "@" in addr: 911 ↛ 893line 911 didn't jump to line 893, because the condition on line 911 was never false
912 self.keys[key]["email"] = addr
913 self.keys[key]["name"] = name
914 elif need_fingerprint and field[0] == "fpr":
915 self.keys[key]["fingerprints"] = [field[9]]
916 self.fpr_lookup[field[9]] = key
917 need_fingerprint = False
919 (out, err) = p.communicate()
920 r = p.returncode
921 if r != 0: 921 ↛ 922line 921 didn't jump to line 922, because the condition on line 921 was never true
922 raise daklib.gpg.GpgException(
923 "command failed: %s\nstdout: %s\nstderr: %s\n" % (cmd, out, err)
924 )
926 def import_users_from_ldap(
927 self, session
928 ) -> tuple[dict[str, tuple[int, str]], dict[int, tuple[str, str]]]:
929 import ldap # type: ignore
931 from .utils import open_ldap_connection
933 conn = open_ldap_connection()
934 cnf = Config()
935 LDAPDn = cnf["Import-LDAP-Fingerprints::LDAPDn"]
936 Attrs = conn.search_s(
937 LDAPDn,
938 ldap.SCOPE_ONELEVEL,
939 "(&(keyfingerprint=*)(supplementaryGid=%s))"
940 % (cnf["Import-Users-From-Passwd::ValidGID"]),
941 ["uid", "keyfingerprint", "cn", "mn", "sn"],
942 )
944 byuid: dict[int, tuple[str, str]] = {}
945 byname: dict[str, tuple[int, str]] = {}
947 for i in Attrs:
948 entry = i[1]
949 uid = entry["uid"][0].decode()
950 name = get_ldap_name(entry)
951 fingerprints = entry["keyFingerPrint"]
952 keyid = None
953 for f_raw in fingerprints:
954 f = f_raw.decode()
955 key = self.fpr_lookup.get(f, None)
956 if key not in self.keys:
957 continue
958 self.keys[key]["uid"] = uid
960 if keyid is not None:
961 continue
962 keyid = get_or_set_uid(uid, session).uid_id
963 byuid[keyid] = (uid, name)
964 byname[uid] = (keyid, name)
966 return (byname, byuid)
968 def generate_users_from_keyring(
969 self, format: str, session
970 ) -> tuple[dict[str, tuple[int, str]], dict[int, tuple[str, str]]]:
971 byuid: dict[int, tuple[str, str]] = {}
972 byname: dict[str, tuple[int, str]] = {}
973 any_invalid = False
974 for x in list(self.keys.keys()):
975 if "email" not in self.keys[x]: 975 ↛ 976line 975 didn't jump to line 976, because the condition on line 975 was never true
976 any_invalid = True
977 self.keys[x]["uid"] = format % "invalid-uid"
978 else:
979 uid = format % self.keys[x]["email"]
980 keyid = get_or_set_uid(uid, session).uid_id
981 byuid[keyid] = (uid, self.keys[x]["name"])
982 byname[uid] = (keyid, self.keys[x]["name"])
983 self.keys[x]["uid"] = uid
985 if any_invalid: 985 ↛ 986line 985 didn't jump to line 986, because the condition on line 985 was never true
986 uid = format % "invalid-uid"
987 keyid = get_or_set_uid(uid, session).uid_id
988 byuid[keyid] = (uid, "ungeneratable user id")
989 byname[uid] = (keyid, "ungeneratable user id")
991 return (byname, byuid)
994__all__.append("Keyring")
997@session_wrapper
998def get_keyring(keyring: str, session=None) -> Optional[Keyring]:
999 """
1000 If `keyring` does not have an entry in the `keyrings` table yet, return None
1001 If `keyring` already has an entry, simply return the existing :class:`Keyring`
1003 :param keyring: the keyring name
1004 :return: the :class:`Keyring` object for this keyring
1005 """
1007 q = session.query(Keyring).filter_by(keyring_name=keyring)
1008 return q.one_or_none()
1011__all__.append("get_keyring")
1014@session_wrapper
1015def get_active_keyring_paths(session=None) -> list[str]:
1016 """
1017 :return: list of active keyring paths
1018 """
1019 return [
1020 x.keyring_name
1021 for x in session.query(Keyring)
1022 .filter(Keyring.active == True) # noqa:E712
1023 .order_by(desc(Keyring.priority))
1024 .all()
1025 ]
1028__all__.append("get_active_keyring_paths")
1030################################################################################
1033class DBChange:
1034 def __init__(self, *args, **kwargs):
1035 pass
1037 def __repr__(self):
1038 return "<DBChange %s>" % self.changesname
1041__all__.append("DBChange")
1044@session_wrapper
1045def get_dbchange(filename: str, session=None) -> Optional[DBChange]:
1046 """
1047 returns DBChange object for given `filename`.
1049 :param filename: the name of the file
1050 :param session: Optional SQLA session object (a temporary one will be
1051 generated if not supplied)
1052 :return: DBChange object for the given filename (:const:`None` if not present)
1053 """
1054 q = session.query(DBChange).filter_by(changesname=filename)
1055 return q.one_or_none()
1058__all__.append("get_dbchange")
1060################################################################################
1063class Maintainer(ORMObject):
1064 def __init__(self, name=None):
1065 self.name = name
1067 def properties(self) -> list[str]:
1068 return ["name", "maintainer_id"]
1070 def get_split_maintainer(self) -> tuple[str, str, str, str]:
1071 if not hasattr(self, "name") or self.name is None:
1072 return ("", "", "", "")
1074 return fix_maintainer(self.name.strip())
1077__all__.append("Maintainer")
1080@session_wrapper
1081def get_or_set_maintainer(name: str, session=None) -> Maintainer:
1082 """
1083 Returns Maintainer object for given maintainer name.
1085 If no matching maintainer name is found, a row is inserted.
1087 :param name: The maintainer name to add
1088 :param session: Optional SQL session object (a temporary one will be
1089 generated if not supplied). If not passed, a commit will be performed at
1090 the end of the function, otherwise the caller is responsible for commiting.
1091 A flush will be performed either way.
1092 :return: the Maintainer object for the given maintainer
1093 """
1095 q = session.query(Maintainer).filter_by(name=name)
1096 try:
1097 ret = q.one()
1098 except NoResultFound:
1099 maintainer = Maintainer()
1100 maintainer.name = name
1101 session.add(maintainer)
1102 session.commit_or_flush()
1103 ret = maintainer
1105 return ret
1108__all__.append("get_or_set_maintainer")
1111@session_wrapper
1112def get_maintainer(maintainer_id: int, session=None) -> Optional[Maintainer]:
1113 """
1114 Return the name of the maintainer behind `maintainer_id` or :const:`None`
1115 if that `maintainer_id` is invalid.
1117 :param maintainer_id: the id of the maintainer
1118 :return: the Maintainer with this `maintainer_id`
1119 """
1121 return session.query(Maintainer).get(maintainer_id)
1124__all__.append("get_maintainer")
1126################################################################################
1129class NewComment:
1130 def __init__(self, *args, **kwargs):
1131 pass
1133 def __repr__(self):
1134 return """<NewComment for '%s %s' (%s)>""" % (
1135 self.package,
1136 self.version,
1137 self.comment_id,
1138 )
1141__all__.append("NewComment")
1144@session_wrapper
1145def has_new_comment(
1146 policy_queue: "PolicyQueue", package: str, version: str, session=None
1147) -> bool:
1148 """
1149 Returns :const:`True` if the given combination of `package`, `version` has a comment.
1151 :param package: name of the package
1152 :param version: package version
1153 :param session: Optional SQLA session object (a temporary one will be
1154 generated if not supplied)
1155 """
1157 q = session.query(NewComment).filter_by(policy_queue=policy_queue)
1158 q = q.filter_by(package=package)
1159 q = q.filter_by(version=version)
1161 return bool(q.count() > 0)
1164__all__.append("has_new_comment")
1167@session_wrapper
1168def get_new_comments(
1169 policy_queue: "PolicyQueue",
1170 package: Optional[str] = None,
1171 version: Optional[str] = None,
1172 comment_id: Optional[int] = None,
1173 session=None,
1174) -> list[NewComment]:
1175 """
1176 Returns (possibly empty) list of NewComment objects for the given
1177 parameters
1179 :param package: name of the package
1180 :param version: package version
1181 :param comment_id: An id of a comment
1182 :param session: Optional SQLA session object (a temporary one will be
1183 generated if not supplied)
1184 :return: A (possibly empty) list of NewComment objects will be returned
1185 """
1187 q = session.query(NewComment).filter_by(policy_queue=policy_queue)
1188 if package is not None: 1188 ↛ 1190line 1188 didn't jump to line 1190, because the condition on line 1188 was never false
1189 q = q.filter_by(package=package)
1190 if version is not None: 1190 ↛ 1191line 1190 didn't jump to line 1191, because the condition on line 1190 was never true
1191 q = q.filter_by(version=version)
1192 if comment_id is not None: 1192 ↛ 1193line 1192 didn't jump to line 1193, because the condition on line 1192 was never true
1193 q = q.filter_by(comment_id=comment_id)
1195 return q.all()
1198__all__.append("get_new_comments")
1200################################################################################
1203class Override(ORMObject):
1204 def __init__(
1205 self,
1206 package=None,
1207 suite=None,
1208 component=None,
1209 overridetype=None,
1210 section=None,
1211 priority=None,
1212 ):
1213 self.package = package
1214 self.suite = suite
1215 self.component = component
1216 self.overridetype = overridetype
1217 self.section = section
1218 self.priority = priority
1220 def properties(self) -> list[str]:
1221 return ["package", "suite", "component", "overridetype", "section", "priority"]
1224__all__.append("Override")
1227@session_wrapper
1228def get_override(
1229 package: str,
1230 suite: Union[str, list[str], None] = None,
1231 component: Union[str, list[str], None] = None,
1232 overridetype: Union[str, list[str], None] = None,
1233 session=None,
1234) -> list[Override]:
1235 """
1236 Returns Override object for the given parameters
1238 :param package: The name of the package
1239 :param suite: The name of the suite (or suites if a list) to limit to. If
1240 None, don't limit. Defaults to None.
1241 :param component: The name of the component (or components if a list) to
1242 limit to. If None, don't limit. Defaults to None.
1243 :param overridetype: The name of the overridetype (or overridetypes if a list) to
1244 limit to. If None, don't limit. Defaults to None.
1245 :param session: Optional SQLA session object (a temporary one will be
1246 generated if not supplied)
1247 :return: A (possibly empty) list of Override objects will be returned
1248 """
1250 q = session.query(Override)
1251 q = q.filter_by(package=package)
1253 if suite is not None:
1254 if not isinstance(suite, list):
1255 suite = [suite]
1256 q = q.join(Suite).filter(Suite.suite_name.in_(suite))
1258 if component is not None:
1259 if not isinstance(component, list):
1260 component = [component]
1261 q = q.join(Component).filter(Component.component_name.in_(component))
1263 if overridetype is not None:
1264 if not isinstance(overridetype, list):
1265 overridetype = [overridetype]
1266 q = q.join(OverrideType).filter(OverrideType.overridetype.in_(overridetype))
1268 return q.all()
1271__all__.append("get_override")
1274################################################################################
1277class OverrideType(ORMObject):
1278 def __init__(self, overridetype=None):
1279 self.overridetype = overridetype
1281 def properties(self) -> list[str]:
1282 return ["overridetype", "overridetype_id", "overrides_count"]
1285__all__.append("OverrideType")
1288@session_wrapper
1289def get_override_type(override_type: str, session=None) -> Optional[OverrideType]:
1290 """
1291 Returns OverrideType object for given `override_type`.
1293 :param override_type: The name of the override type
1294 :param session: Optional SQLA session object (a temporary one will be
1295 generated if not supplied)
1296 :return: the database id for the given override type
1297 """
1299 q = session.query(OverrideType).filter_by(overridetype=override_type)
1300 return q.one_or_none()
1303__all__.append("get_override_type")
1305################################################################################
1308class PolicyQueue:
1309 def __init__(self, *args, **kwargs):
1310 pass
1312 def __repr__(self):
1313 return "<PolicyQueue %s>" % self.queue_name
1316__all__.append("PolicyQueue")
1319@session_wrapper
1320def get_policy_queue(queuename: str, session=None) -> Optional[PolicyQueue]:
1321 """
1322 Returns PolicyQueue object for given `queuename`
1324 :param queuename: The name of the queue
1325 :param session: Optional SQLA session object (a temporary one will be
1326 generated if not supplied)
1327 :return: PolicyQueue object for the given queue
1328 """
1330 q = session.query(PolicyQueue).filter_by(queue_name=queuename)
1331 return q.one_or_none()
1334__all__.append("get_policy_queue")
1336################################################################################
1339@functools.total_ordering
1340class PolicyQueueUpload:
1341 def _key(self):
1342 return (
1343 self.changes.source,
1344 AptVersion(self.changes.version),
1345 self.source is None,
1346 self.changes.changesname,
1347 )
1349 def __eq__(self, other: object) -> bool:
1350 if not isinstance(other, PolicyQueueUpload): 1350 ↛ 1351line 1350 didn't jump to line 1351, because the condition on line 1350 was never true
1351 return NotImplemented
1352 return self._key() == other._key()
1354 def __lt__(self, other):
1355 return self._key() < other._key()
1358__all__.append("PolicyQueueUpload")
1360################################################################################
1363class PolicyQueueByhandFile:
1364 pass
1367__all__.append("PolicyQueueByhandFile")
1369################################################################################
1372class Priority(ORMObject):
1373 def __init__(self, priority=None, level=None):
1374 self.priority = priority
1375 self.level = level
1377 def properties(self) -> list[str]:
1378 return ["priority", "priority_id", "level", "overrides_count"]
1380 def __eq__(self, val):
1381 if isinstance(val, str):
1382 warnings.warn(
1383 "comparison with a `str` is deprecated",
1384 DeprecationWarning,
1385 stacklevel=2,
1386 )
1387 return self.priority == val
1388 # This signals to use the normal comparison operator
1389 return NotImplemented
1391 def __ne__(self, val):
1392 if isinstance(val, str): 1392 ↛ 1400line 1392 didn't jump to line 1400, because the condition on line 1392 was never false
1393 warnings.warn(
1394 "comparison with a `str` is deprecated",
1395 DeprecationWarning,
1396 stacklevel=2,
1397 )
1398 return self.priority != val
1399 # This signals to use the normal comparison operator
1400 return NotImplemented
1402 __hash__ = ORMObject.__hash__
1405__all__.append("Priority")
1408@session_wrapper
1409def get_priority(priority: str, session=None) -> Optional[Priority]:
1410 """
1411 Returns Priority object for given `priority` name.
1413 :param priority: The name of the priority
1414 :param session: Optional SQLA session object (a temporary one will be
1415 generated if not supplied)
1416 :return: Priority object for the given priority
1417 """
1419 q = session.query(Priority).filter_by(priority=priority)
1420 return q.one_or_none()
1423__all__.append("get_priority")
1426@session_wrapper
1427def get_priorities(session=None) -> dict[str, int]:
1428 """
1429 Returns dictionary of priority names -> id mappings
1431 :param session: Optional SQL session object (a temporary one will be
1432 generated if not supplied)
1433 :return: dictionary of priority names -> id mappings
1434 """
1436 ret = {}
1437 q = session.query(Priority)
1438 for x in q.all():
1439 ret[x.priority] = x.priority_id
1441 return ret
1444__all__.append("get_priorities")
1446################################################################################
1449from .database.section import Section
1451__all__.append("Section")
1454@session_wrapper
1455def get_section(section: str, session=None) -> Optional[Section]:
1456 """
1457 Returns Section object for given `section` name.
1459 :param section: The name of the section
1460 :param session: Optional SQLA session object (a temporary one will be
1461 generated if not supplied)
1462 :return: Section object for the given section name
1463 """
1465 q = session.query(Section).filter_by(section=section)
1466 return q.one_or_none()
1469__all__.append("get_section")
1472@session_wrapper
1473def get_sections(session=None) -> dict[str, int]:
1474 """
1475 Returns dictionary of section names -> id mappings
1477 :param session: Optional SQL session object (a temporary one will be
1478 generated if not supplied)
1479 :return: dictionary of section names -> id mappings
1480 """
1482 ret = {}
1483 q = session.query(Section)
1484 for x in q.all():
1485 ret[x.section] = x.section_id
1487 return ret
1490__all__.append("get_sections")
1492################################################################################
1495class SignatureHistory(ORMObject):
1496 @classmethod
1497 def from_signed_file(
1498 cls, signed_file: "daklib.gpg.SignedFile"
1499 ) -> "SignatureHistory":
1500 """signature history entry from signed file
1502 :param signed_file: signed file
1503 """
1504 self = cls()
1505 self.fingerprint = signed_file.primary_fingerprint
1506 self.signature_timestamp = signed_file.signature_timestamp
1507 self.contents_sha1 = signed_file.contents_sha1
1508 return self
1510 def query(self, session):
1511 return (
1512 session.query(SignatureHistory)
1513 .filter_by(
1514 fingerprint=self.fingerprint,
1515 signature_timestamp=self.signature_timestamp,
1516 contents_sha1=self.contents_sha1,
1517 )
1518 .first()
1519 )
1522__all__.append("SignatureHistory")
1524################################################################################
1527class SrcContents(ORMObject):
1528 def __init__(self, file=None, source=None):
1529 self.file = file
1530 self.source = source
1532 def properties(self) -> list[str]:
1533 return ["file", "source"]
1536__all__.append("SrcContents")
1538################################################################################
1541class DBSource(ORMObject):
1542 def __init__(
1543 self,
1544 source=None,
1545 version=None,
1546 maintainer=None,
1547 changedby=None,
1548 poolfile=None,
1549 install_date=None,
1550 fingerprint=None,
1551 ):
1552 self.source = source
1553 self.version = version
1554 self.maintainer = maintainer
1555 self.changedby = changedby
1556 self.poolfile = poolfile
1557 self.install_date = install_date
1558 self.fingerprint = fingerprint
1560 @property
1561 def pkid(self) -> int:
1562 return self.source_id
1564 @property
1565 def name(self) -> str:
1566 return self.source
1568 @property
1569 def arch_string(self) -> str:
1570 return "source"
1572 def properties(self) -> list[str]:
1573 return [
1574 "source",
1575 "source_id",
1576 "maintainer",
1577 "changedby",
1578 "fingerprint",
1579 "poolfile",
1580 "version",
1581 "suites_count",
1582 "install_date",
1583 "binaries_count",
1584 "uploaders_count",
1585 ]
1587 def read_control_fields(self) -> Deb822:
1588 """
1589 Reads the control information from a dsc
1591 :return: fields is the dsc information in a dictionary form
1592 """
1593 with open(self.poolfile.fullpath, "r") as fd:
1594 fields = Deb822(fd)
1595 return fields
1597 metadata = association_proxy("key", "value")
1599 def scan_contents(self) -> set[str]:
1600 """
1601 Returns a set of names for non directories. The path names are
1602 normalized after converting them from either utf-8 or iso8859-1
1603 encoding.
1604 """
1605 fullpath = self.poolfile.fullpath
1606 from daklib.contents import UnpackedSource
1608 unpacked = UnpackedSource(fullpath)
1609 fileset = set()
1610 for name in unpacked.get_all_filenames():
1611 fileset.add(name)
1612 return fileset
1614 @property
1615 def proxy(self) -> "MetadataProxy":
1616 session = object_session(self)
1617 query = session.query(SourceMetadata).filter_by(source=self)
1618 return MetadataProxy(session, query)
1621__all__.append("DBSource")
1624@session_wrapper
1625def get_suites_source_in(source: str, session=None) -> "list[Suite]":
1626 """
1627 Returns list of Suite objects which given `source` name is in
1629 :param source: DBSource package name to search for
1630 :return: list of Suite objects for the given source
1631 """
1633 return session.query(Suite).filter(Suite.sources.any(source=source)).all()
1636__all__.append("get_suites_source_in")
1638# FIXME: This function fails badly if it finds more than 1 source package and
1639# its implementation is trivial enough to be inlined.
1642@session_wrapper
1643def get_source_in_suite(
1644 source: str, suite_name: Optional[str], session=None
1645) -> Optional[DBSource]:
1646 """
1647 Returns a DBSource object for a combination of `source` and `suite_name`.
1649 :param source: source package name
1650 :param suite_name: the suite name
1651 :return: the version for `source` in `suite`
1652 """
1653 suite = get_suite(suite_name, session)
1654 if suite is None: 1654 ↛ 1655line 1654 didn't jump to line 1655, because the condition on line 1654 was never true
1655 return None
1656 return suite.get_sources(source).one_or_none()
1659__all__.append("get_source_in_suite")
1662@session_wrapper
1663def import_metadata_into_db(obj: Union[DBBinary, DBSource], session=None) -> None:
1664 """
1665 This routine works on either DBBinary or DBSource objects and imports
1666 their metadata into the database
1667 """
1668 fields = obj.read_control_fields()
1669 for k in fields.keys():
1670 try:
1671 # Try raw ASCII
1672 val = str(fields[k])
1673 except UnicodeEncodeError:
1674 # Fall back to UTF-8
1675 try:
1676 val = fields[k].encode("utf-8")
1677 except UnicodeEncodeError:
1678 # Finally try iso8859-1
1679 val = fields[k].encode("iso8859-1")
1680 # Otherwise we allow the exception to percolate up and we cause
1681 # a reject as someone is playing silly buggers
1683 obj.metadata[get_or_set_metadatakey(k, session)] = val
1685 session.commit_or_flush()
1688__all__.append("import_metadata_into_db")
1690################################################################################
1693class SrcFormat:
1694 def __init__(self, *args, **kwargs):
1695 pass
1697 def __repr__(self):
1698 return "<SrcFormat %s>" % (self.format_name)
1701__all__.append("SrcFormat")
1703################################################################################
1705SUITE_FIELDS = [
1706 ("SuiteName", "suite_name"),
1707 ("SuiteID", "suite_id"),
1708 ("Version", "version"),
1709 ("Origin", "origin"),
1710 ("Label", "label"),
1711 ("Description", "description"),
1712 ("Untouchable", "untouchable"),
1713 ("Announce", "announce"),
1714 ("Codename", "codename"),
1715 ("OverrideCodename", "overridecodename"),
1716 ("ValidTime", "validtime"),
1717 ("Priority", "priority"),
1718 ("NotAutomatic", "notautomatic"),
1719 ("CopyChanges", "copychanges"),
1720 ("OverrideSuite", "overridesuite"),
1721]
1723# Why the heck don't we have any UNIQUE constraints in table suite?
1724# TODO: Add UNIQUE constraints for appropriate columns.
1727class Suite(ORMObject):
1728 def __init__(self, suite_name=None, version=None):
1729 self.suite_name = suite_name
1730 self.version = version
1732 def properties(self) -> list[str]:
1733 return [
1734 "suite_name",
1735 "version",
1736 "sources_count",
1737 "binaries_count",
1738 "overrides_count",
1739 ]
1741 def __eq__(self, val):
1742 if isinstance(val, str): 1742 ↛ 1743line 1742 didn't jump to line 1743, because the condition on line 1742 was never true
1743 warnings.warn(
1744 "comparison with a `str` is deprecated",
1745 DeprecationWarning,
1746 stacklevel=2,
1747 )
1748 return self.suite_name == val
1749 # This signals to use the normal comparison operator
1750 return NotImplemented
1752 def __ne__(self, val):
1753 if isinstance(val, str): 1753 ↛ 1754line 1753 didn't jump to line 1754, because the condition on line 1753 was never true
1754 warnings.warn(
1755 "comparison with a `str` is deprecated",
1756 DeprecationWarning,
1757 stacklevel=2,
1758 )
1759 return self.suite_name != val
1760 # This signals to use the normal comparison operator
1761 return NotImplemented
1763 __hash__ = ORMObject.__hash__
1765 def details(self) -> str:
1766 ret = []
1767 for disp, field in SUITE_FIELDS:
1768 val = getattr(self, field, None)
1769 if val is not None:
1770 ret.append("%s: %s" % (disp, val))
1772 return "\n".join(ret)
1774 def get_architectures(
1775 self, skipsrc: bool = False, skipall: bool = False
1776 ) -> list[Architecture]:
1777 """
1778 Returns list of Architecture objects
1780 :param skipsrc: Whether to skip returning the 'source' architecture entry
1781 :param skipall: Whether to skip returning the 'all' architecture entry
1782 :return: list of Architecture objects for the given name (may be empty)
1783 """
1785 q = object_session(self).query(Architecture).with_parent(self)
1786 if skipsrc:
1787 q = q.filter(Architecture.arch_string != "source")
1788 if skipall:
1789 q = q.filter(Architecture.arch_string != "all")
1790 return q.order_by(Architecture.arch_string).all()
1792 def get_sources(self, source: str) -> sqlalchemy.orm.query.Query:
1793 """
1794 Returns a query object representing DBSource that is part of this suite.
1796 :param source: source package name
1797 :return: a query of DBSource
1798 """
1800 session = object_session(self)
1801 return session.query(DBSource).filter_by(source=source).with_parent(self)
1803 def get_overridesuite(self) -> "Suite":
1804 if self.overridesuite is None:
1805 return self
1806 else:
1807 return (
1808 object_session(self)
1809 .query(Suite)
1810 .filter_by(suite_name=self.overridesuite)
1811 .one()
1812 )
1814 def update_last_changed(self) -> None:
1815 self.last_changed = sqlalchemy.func.now()
1817 @property
1818 def path(self) -> str:
1819 return os.path.join(self.archive.path, "dists", self.suite_name)
1821 @property
1822 def release_suite_output(self) -> str:
1823 if self.release_suite is not None: 1823 ↛ 1824line 1823 didn't jump to line 1824, because the condition on line 1823 was never true
1824 return self.release_suite
1825 return self.suite_name
1828__all__.append("Suite")
1831@session_wrapper
1832def get_suite(suite: str, session=None) -> Optional[Suite]:
1833 """
1834 Returns Suite object for given `suite` name.
1836 :param suite: The name of the suite
1837 :param session: Optional SQLA session object (a temporary one will be
1838 generated if not supplied)
1839 :return: Suite object for the requested suite name (None if not present)
1840 """
1842 # Start by looking for the dak internal name
1843 q = session.query(Suite).filter_by(suite_name=suite)
1844 try:
1845 return q.one()
1846 except NoResultFound:
1847 pass
1849 # Now try codename
1850 q = session.query(Suite).filter_by(codename=suite)
1851 try:
1852 return q.one()
1853 except NoResultFound:
1854 pass
1856 # Finally give release_suite a try
1857 q = session.query(Suite).filter_by(release_suite=suite)
1858 return q.one_or_none()
1861__all__.append("get_suite")
1863################################################################################
1866@session_wrapper
1867def get_suite_architectures(
1868 suite: str, skipsrc: bool = False, skipall: bool = False, session=None
1869) -> list[Architecture]:
1870 """
1871 Returns list of Architecture objects for given `suite` name. The list is
1872 empty if `suite` does not exist.
1874 :param suite: Suite name to search for
1875 :param skipsrc: Whether to skip returning the 'source' architecture entry
1876 :param skipall: Whether to skip returning the 'all' architecture entry
1877 :param session: Optional SQL session object (a temporary one will be
1878 generated if not supplied)
1879 :return: list of Architecture objects for the given name (may be empty)
1880 """
1882 try:
1883 return get_suite(suite, session).get_architectures(skipsrc, skipall)
1884 except AttributeError:
1885 return []
1888__all__.append("get_suite_architectures")
1890################################################################################
1893class Uid(ORMObject):
1894 def __init__(self, uid=None, name=None):
1895 self.uid = uid
1896 self.name = name
1898 def __eq__(self, val):
1899 if isinstance(val, str):
1900 warnings.warn(
1901 "comparison with a `str` is deprecated",
1902 DeprecationWarning,
1903 stacklevel=2,
1904 )
1905 return self.uid == val
1906 # This signals to use the normal comparison operator
1907 return NotImplemented
1909 def __ne__(self, val):
1910 if isinstance(val, str):
1911 warnings.warn(
1912 "comparison with a `str` is deprecated",
1913 DeprecationWarning,
1914 stacklevel=2,
1915 )
1916 return self.uid != val
1917 # This signals to use the normal comparison operator
1918 return NotImplemented
1920 __hash__ = ORMObject.__hash__
1922 def properties(self) -> list[str]:
1923 return ["uid", "name", "fingerprint"]
1926__all__.append("Uid")
1929@session_wrapper
1930def get_or_set_uid(uidname: str, session=None) -> Uid:
1931 """
1932 Returns uid object for given uidname.
1934 If no matching uidname is found, a row is inserted.
1936 :param uidname: The uid to add
1937 :param session: Optional SQL session object (a temporary one will be
1938 generated if not supplied). If not passed, a commit will be performed at
1939 the end of the function, otherwise the caller is responsible for commiting.
1940 :return: the uid object for the given uidname
1941 """
1943 q = session.query(Uid).filter_by(uid=uidname)
1945 try:
1946 ret = q.one()
1947 except NoResultFound:
1948 uid = Uid()
1949 uid.uid = uidname
1950 session.add(uid)
1951 session.commit_or_flush()
1952 ret = uid
1954 return ret
1957__all__.append("get_or_set_uid")
1960@session_wrapper
1961def get_uid_from_fingerprint(fpr: str, session=None) -> Optional[Uid]:
1962 q = session.query(Uid)
1963 q = q.join(Fingerprint).filter_by(fingerprint=fpr)
1965 return q.one_or_none()
1968__all__.append("get_uid_from_fingerprint")
1970################################################################################
1973class MetadataKey(ORMObject):
1974 def __init__(self, key=None):
1975 self.key = key
1977 def properties(self) -> list[str]:
1978 return ["key"]
1981__all__.append("MetadataKey")
1984@session_wrapper
1985def get_or_set_metadatakey(keyname: str, session=None) -> MetadataKey:
1986 """
1987 Returns MetadataKey object for given uidname.
1989 If no matching keyname is found, a row is inserted.
1991 :param keyname: The keyname to add
1992 :param session: Optional SQL session object (a temporary one will be
1993 generated if not supplied). If not passed, a commit will be performed at
1994 the end of the function, otherwise the caller is responsible for commiting.
1995 :return: the metadatakey object for the given keyname
1996 """
1998 q = session.query(MetadataKey).filter_by(key=keyname)
2000 try:
2001 ret = q.one()
2002 except NoResultFound:
2003 ret = MetadataKey(keyname)
2004 session.add(ret)
2005 session.commit_or_flush()
2007 return ret
2010__all__.append("get_or_set_metadatakey")
2012################################################################################
2015class BinaryMetadata(ORMObject):
2016 def __init__(self, key=None, value=None, binary=None):
2017 self.key = key
2018 self.value = value
2019 if binary is not None:
2020 self.binary = binary
2022 def properties(self) -> list[str]:
2023 return ["binary", "key", "value"]
2026__all__.append("BinaryMetadata")
2028################################################################################
2031class SourceMetadata(ORMObject):
2032 def __init__(self, key=None, value=None, source=None):
2033 self.key = key
2034 self.value = value
2035 if source is not None:
2036 self.source = source
2038 def properties(self) -> list[str]:
2039 return ["source", "key", "value"]
2042__all__.append("SourceMetadata")
2044################################################################################
2047class MetadataProxy:
2048 def __init__(self, session, query):
2049 self.session = session
2050 self.query = query
2052 def _get(self, key):
2053 metadata_key = self.session.query(MetadataKey).filter_by(key=key).first()
2054 if metadata_key is None:
2055 return None
2056 metadata = self.query.filter_by(key=metadata_key).first()
2057 return metadata
2059 def __contains__(self, key: str) -> bool:
2060 if self._get(key) is not None: 2060 ↛ 2062line 2060 didn't jump to line 2062, because the condition on line 2060 was never false
2061 return True
2062 return False
2064 def __getitem__(self, key: str) -> str:
2065 metadata = self._get(key)
2066 if metadata is None:
2067 raise KeyError
2068 return metadata.value
2070 def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
2071 try:
2072 return self[key]
2073 except KeyError:
2074 return default
2077################################################################################
2080class VersionCheck(ORMObject):
2081 def __init__(self, *args, **kwargs):
2082 pass
2084 def properties(self) -> list[str]:
2085 return ["check"]
2088__all__.append("VersionCheck")
2091@session_wrapper
2092def get_version_checks(
2093 suite_name: str, check: Optional[str] = None, session=None
2094) -> list[VersionCheck]:
2095 suite = get_suite(suite_name, session)
2096 if not suite: 2096 ↛ 2099line 2096 didn't jump to line 2099, because the condition on line 2096 was never true
2097 # Make sure that what we return is iterable so that list comprehensions
2098 # involving this don't cause a traceback
2099 return []
2100 q = session.query(VersionCheck).filter_by(suite=suite)
2101 if check: 2101 ↛ 2103line 2101 didn't jump to line 2103, because the condition on line 2101 was never false
2102 q = q.filter_by(check=check)
2103 return q.all()
2106__all__.append("get_version_checks")
2108################################################################################
2111class DBConn:
2112 """
2113 database module init.
2114 """
2116 __shared_state = {}
2118 db_meta = None
2120 tbl_architecture = Architecture.__table__
2122 tables = (
2123 "acl",
2124 "acl_architecture_map",
2125 "acl_fingerprint_map",
2126 "acl_per_source",
2127 "acl_per_suite",
2128 "archive",
2129 "bin_associations",
2130 "bin_contents",
2131 "binaries",
2132 "binaries_metadata",
2133 "build_queue",
2134 "changelogs_text",
2135 "changes",
2136 "component",
2137 "component_suite",
2138 "config",
2139 "dsc_files",
2140 "external_files",
2141 "external_overrides",
2142 "external_signature_requests",
2143 "extra_src_references",
2144 "files",
2145 "files_archive_map",
2146 "fingerprint",
2147 "hashfile",
2148 "keyrings",
2149 "maintainer",
2150 "metadata_keys",
2151 "new_comments",
2152 # TODO: the maintainer column in table override should be removed.
2153 "override",
2154 "override_type",
2155 "policy_queue",
2156 "policy_queue_upload",
2157 "policy_queue_upload_binaries_map",
2158 "policy_queue_byhand_file",
2159 "priority",
2160 "signature_history",
2161 "source",
2162 "source_metadata",
2163 "src_associations",
2164 "src_contents",
2165 "src_format",
2166 "src_uploaders",
2167 "suite",
2168 "suite_acl_map",
2169 "suite_architectures",
2170 "suite_build_queue_copy",
2171 "suite_permission",
2172 "suite_src_formats",
2173 "uid",
2174 "version_check",
2175 )
2177 views = (
2178 "bin_associations_binaries",
2179 "changelogs",
2180 "newest_source",
2181 "newest_src_association",
2182 "package_list",
2183 "source_suite",
2184 "src_associations_src",
2185 )
2187 def __init__(self, *args, **kwargs):
2188 self.__dict__ = self.__shared_state
2190 if not getattr(self, "initialised", False):
2191 self.initialised = True
2192 self.debug = "debug" in kwargs
2193 self.__createconn()
2195 def __setuptables(self):
2196 for table_name in self.tables:
2197 table = Table(table_name, self.db_meta, autoload=True, extend_existing=True)
2198 setattr(self, "tbl_%s" % table_name, table)
2200 for view_name in self.views:
2201 view = Table(view_name, self.db_meta, autoload=True)
2202 setattr(self, "view_%s" % view_name, view)
2204 def __setupmappers(self):
2205 mapper(
2206 ACL,
2207 self.tbl_acl,
2208 properties=dict(
2209 architectures=relation(
2210 Architecture,
2211 secondary=self.tbl_acl_architecture_map,
2212 collection_class=set,
2213 ),
2214 fingerprints=relation(
2215 Fingerprint,
2216 secondary=self.tbl_acl_fingerprint_map,
2217 collection_class=set,
2218 ),
2219 match_keyring=relation(
2220 Keyring,
2221 primaryjoin=(
2222 self.tbl_acl.c.match_keyring_id == self.tbl_keyrings.c.id
2223 ),
2224 ),
2225 per_source=relation(
2226 ACLPerSource, collection_class=set, back_populates="acl"
2227 ),
2228 per_suite=relation(
2229 ACLPerSuite, collection_class=set, back_populates="acl"
2230 ),
2231 ),
2232 )
2234 mapper(
2235 ACLPerSource,
2236 self.tbl_acl_per_source,
2237 properties=dict(
2238 acl=relation(ACL, back_populates="per_source"),
2239 fingerprint=relation(
2240 Fingerprint,
2241 primaryjoin=(
2242 self.tbl_acl_per_source.c.fingerprint_id
2243 == self.tbl_fingerprint.c.id
2244 ),
2245 ),
2246 created_by=relation(
2247 Fingerprint,
2248 primaryjoin=(
2249 self.tbl_acl_per_source.c.created_by_id
2250 == self.tbl_fingerprint.c.id
2251 ),
2252 ),
2253 ),
2254 )
2256 mapper(
2257 ACLPerSuite,
2258 self.tbl_acl_per_suite,
2259 properties=dict(
2260 acl=relation(ACL, back_populates="per_suite"),
2261 fingerprint=relation(
2262 Fingerprint,
2263 primaryjoin=(
2264 self.tbl_acl_per_suite.c.fingerprint_id
2265 == self.tbl_fingerprint.c.id
2266 ),
2267 ),
2268 suite=relation(
2269 Suite,
2270 primaryjoin=(
2271 self.tbl_acl_per_suite.c.suite_id == self.tbl_suite.c.id
2272 ),
2273 ),
2274 created_by=relation(
2275 Fingerprint,
2276 primaryjoin=(
2277 self.tbl_acl_per_suite.c.created_by_id
2278 == self.tbl_fingerprint.c.id
2279 ),
2280 ),
2281 ),
2282 )
2284 mapper(
2285 Archive,
2286 self.tbl_archive,
2287 properties=dict(
2288 archive_id=self.tbl_archive.c.id, archive_name=self.tbl_archive.c.name
2289 ),
2290 )
2292 mapper(
2293 ArchiveFile,
2294 self.tbl_files_archive_map,
2295 properties=dict(
2296 archive=relation(Archive, backref="files"),
2297 component=relation(Component),
2298 file=relation(PoolFile, backref="archives"),
2299 ),
2300 )
2302 mapper(
2303 BuildQueue,
2304 self.tbl_build_queue,
2305 properties=dict(
2306 queue_id=self.tbl_build_queue.c.id,
2307 suite=relation(
2308 Suite,
2309 primaryjoin=(
2310 self.tbl_build_queue.c.suite_id == self.tbl_suite.c.id
2311 ),
2312 ),
2313 ),
2314 )
2316 mapper(
2317 DBBinary,
2318 self.tbl_binaries,
2319 properties=dict(
2320 binary_id=self.tbl_binaries.c.id,
2321 package=self.tbl_binaries.c.package,
2322 version=self.tbl_binaries.c.version,
2323 maintainer_id=self.tbl_binaries.c.maintainer,
2324 maintainer=relation(Maintainer),
2325 source_id=self.tbl_binaries.c.source,
2326 source=relation(DBSource, backref="binaries"),
2327 arch_id=self.tbl_binaries.c.architecture,
2328 architecture=relation(Architecture),
2329 poolfile_id=self.tbl_binaries.c.file,
2330 poolfile=relation(PoolFile),
2331 binarytype=self.tbl_binaries.c.type,
2332 fingerprint_id=self.tbl_binaries.c.sig_fpr,
2333 fingerprint=relation(Fingerprint),
2334 install_date=self.tbl_binaries.c.install_date,
2335 suites=relation(
2336 Suite,
2337 secondary=self.tbl_bin_associations,
2338 backref=backref("binaries", lazy="dynamic"),
2339 ),
2340 extra_sources=relation(
2341 DBSource,
2342 secondary=self.tbl_extra_src_references,
2343 backref=backref("extra_binary_references", lazy="dynamic"),
2344 ),
2345 key=relation(
2346 BinaryMetadata,
2347 cascade="all",
2348 collection_class=attribute_mapped_collection("key"),
2349 back_populates="binary",
2350 ),
2351 ),
2352 )
2354 mapper(
2355 Component,
2356 self.tbl_component,
2357 properties=dict(
2358 component_id=self.tbl_component.c.id,
2359 component_name=self.tbl_component.c.name,
2360 ),
2361 )
2363 mapper(
2364 DBConfig, self.tbl_config, properties=dict(config_id=self.tbl_config.c.id)
2365 )
2367 mapper(
2368 DSCFile,
2369 self.tbl_dsc_files,
2370 properties=dict(
2371 dscfile_id=self.tbl_dsc_files.c.id,
2372 source_id=self.tbl_dsc_files.c.source,
2373 source=relation(DBSource, back_populates="srcfiles"),
2374 poolfile_id=self.tbl_dsc_files.c.file,
2375 poolfile=relation(PoolFile),
2376 ),
2377 )
2379 mapper(
2380 ExternalOverride,
2381 self.tbl_external_overrides,
2382 properties=dict(
2383 suite_id=self.tbl_external_overrides.c.suite,
2384 suite=relation(Suite),
2385 component_id=self.tbl_external_overrides.c.component,
2386 component=relation(Component),
2387 ),
2388 )
2390 mapper(
2391 PoolFile,
2392 self.tbl_files,
2393 properties=dict(
2394 file_id=self.tbl_files.c.id, filesize=self.tbl_files.c.size
2395 ),
2396 )
2398 mapper(
2399 Fingerprint,
2400 self.tbl_fingerprint,
2401 properties=dict(
2402 fingerprint_id=self.tbl_fingerprint.c.id,
2403 uid_id=self.tbl_fingerprint.c.uid,
2404 uid=relation(Uid, back_populates="fingerprint"),
2405 keyring_id=self.tbl_fingerprint.c.keyring,
2406 keyring=relation(Keyring),
2407 acl=relation(ACL),
2408 ),
2409 )
2411 mapper(
2412 Keyring,
2413 self.tbl_keyrings,
2414 properties=dict(
2415 keyring_name=self.tbl_keyrings.c.name,
2416 keyring_id=self.tbl_keyrings.c.id,
2417 acl=relation(
2418 ACL, primaryjoin=(self.tbl_keyrings.c.acl_id == self.tbl_acl.c.id)
2419 ),
2420 ),
2421 ),
2423 mapper(
2424 DBChange,
2425 self.tbl_changes,
2426 properties=dict(
2427 change_id=self.tbl_changes.c.id,
2428 seen=self.tbl_changes.c.seen,
2429 source=self.tbl_changes.c.source,
2430 binaries=self.tbl_changes.c.binaries,
2431 architecture=self.tbl_changes.c.architecture,
2432 distribution=self.tbl_changes.c.distribution,
2433 urgency=self.tbl_changes.c.urgency,
2434 maintainer=self.tbl_changes.c.maintainer,
2435 changedby=self.tbl_changes.c.changedby,
2436 date=self.tbl_changes.c.date,
2437 version=self.tbl_changes.c.version,
2438 ),
2439 )
2441 mapper(
2442 Maintainer,
2443 self.tbl_maintainer,
2444 properties=dict(
2445 maintainer_id=self.tbl_maintainer.c.id,
2446 maintains_sources=relation(
2447 DBSource,
2448 backref="maintainer",
2449 primaryjoin=(
2450 self.tbl_maintainer.c.id == self.tbl_source.c.maintainer
2451 ),
2452 ),
2453 changed_sources=relation(
2454 DBSource,
2455 backref="changedby",
2456 primaryjoin=(
2457 self.tbl_maintainer.c.id == self.tbl_source.c.changedby
2458 ),
2459 ),
2460 ),
2461 )
2463 mapper(
2464 NewComment,
2465 self.tbl_new_comments,
2466 properties=dict(
2467 comment_id=self.tbl_new_comments.c.id,
2468 policy_queue=relation(PolicyQueue),
2469 ),
2470 )
2472 mapper(
2473 Override,
2474 self.tbl_override,
2475 properties=dict(
2476 suite_id=self.tbl_override.c.suite,
2477 suite=relation(Suite, backref=backref("overrides", lazy="dynamic")),
2478 package=self.tbl_override.c.package,
2479 component_id=self.tbl_override.c.component,
2480 component=relation(
2481 Component, backref=backref("overrides", lazy="dynamic")
2482 ),
2483 priority_id=self.tbl_override.c.priority,
2484 priority=relation(
2485 Priority, backref=backref("overrides", lazy="dynamic")
2486 ),
2487 section_id=self.tbl_override.c.section,
2488 section=relation(Section, backref=backref("overrides", lazy="dynamic")),
2489 overridetype_id=self.tbl_override.c.type,
2490 overridetype=relation(
2491 OverrideType, backref=backref("overrides", lazy="dynamic")
2492 ),
2493 ),
2494 )
2496 mapper(
2497 OverrideType,
2498 self.tbl_override_type,
2499 properties=dict(
2500 overridetype=self.tbl_override_type.c.type,
2501 overridetype_id=self.tbl_override_type.c.id,
2502 ),
2503 )
2505 mapper(
2506 PolicyQueue,
2507 self.tbl_policy_queue,
2508 properties=dict(
2509 policy_queue_id=self.tbl_policy_queue.c.id,
2510 suite=relation(
2511 Suite,
2512 primaryjoin=(
2513 self.tbl_policy_queue.c.suite_id == self.tbl_suite.c.id
2514 ),
2515 ),
2516 ),
2517 )
2519 mapper(
2520 PolicyQueueUpload,
2521 self.tbl_policy_queue_upload,
2522 properties=dict(
2523 changes=relation(DBChange),
2524 policy_queue=relation(PolicyQueue, backref="uploads"),
2525 target_suite=relation(Suite),
2526 source=relation(DBSource),
2527 binaries=relation(
2528 DBBinary, secondary=self.tbl_policy_queue_upload_binaries_map
2529 ),
2530 ),
2531 )
2533 mapper(
2534 PolicyQueueByhandFile,
2535 self.tbl_policy_queue_byhand_file,
2536 properties=dict(
2537 upload=relation(PolicyQueueUpload, backref="byhand"),
2538 ),
2539 )
2541 mapper(
2542 Priority,
2543 self.tbl_priority,
2544 properties=dict(priority_id=self.tbl_priority.c.id),
2545 )
2547 mapper(SignatureHistory, self.tbl_signature_history)
2549 mapper(
2550 DBSource,
2551 self.tbl_source,
2552 properties=dict(
2553 source_id=self.tbl_source.c.id,
2554 version=self.tbl_source.c.version,
2555 maintainer_id=self.tbl_source.c.maintainer,
2556 poolfile_id=self.tbl_source.c.file,
2557 poolfile=relation(PoolFile),
2558 fingerprint_id=self.tbl_source.c.sig_fpr,
2559 fingerprint=relation(Fingerprint),
2560 changedby_id=self.tbl_source.c.changedby,
2561 srcfiles=relation(
2562 DSCFile,
2563 primaryjoin=(self.tbl_source.c.id == self.tbl_dsc_files.c.source),
2564 back_populates="source",
2565 ),
2566 suites=relation(
2567 Suite,
2568 secondary=self.tbl_src_associations,
2569 backref=backref("sources", lazy="dynamic"),
2570 ),
2571 uploaders=relation(Maintainer, secondary=self.tbl_src_uploaders),
2572 key=relation(
2573 SourceMetadata,
2574 cascade="all",
2575 collection_class=attribute_mapped_collection("key"),
2576 back_populates="source",
2577 ),
2578 ),
2579 )
2581 mapper(
2582 SrcFormat,
2583 self.tbl_src_format,
2584 properties=dict(
2585 src_format_id=self.tbl_src_format.c.id,
2586 format_name=self.tbl_src_format.c.format_name,
2587 ),
2588 )
2590 mapper(
2591 Suite,
2592 self.tbl_suite,
2593 properties=dict(
2594 suite_id=self.tbl_suite.c.id,
2595 policy_queue=relation(
2596 PolicyQueue,
2597 primaryjoin=(
2598 self.tbl_suite.c.policy_queue_id == self.tbl_policy_queue.c.id
2599 ),
2600 ),
2601 new_queue=relation(
2602 PolicyQueue,
2603 primaryjoin=(
2604 self.tbl_suite.c.new_queue_id == self.tbl_policy_queue.c.id
2605 ),
2606 ),
2607 debug_suite=relation(Suite, remote_side=[self.tbl_suite.c.id]),
2608 copy_queues=relation(
2609 BuildQueue, secondary=self.tbl_suite_build_queue_copy
2610 ),
2611 srcformats=relation(
2612 SrcFormat,
2613 secondary=self.tbl_suite_src_formats,
2614 backref=backref("suites", lazy="dynamic"),
2615 ),
2616 archive=relation(Archive, backref="suites"),
2617 acls=relation(
2618 ACL, secondary=self.tbl_suite_acl_map, collection_class=set
2619 ),
2620 components=relation(
2621 Component,
2622 secondary=self.tbl_component_suite,
2623 order_by=self.tbl_component.c.ordering,
2624 backref=backref("suites"),
2625 ),
2626 architectures=relation(
2627 Architecture,
2628 secondary=self.tbl_suite_architectures,
2629 backref=backref("suites"),
2630 ),
2631 ),
2632 )
2634 mapper(
2635 Uid,
2636 self.tbl_uid,
2637 properties=dict(
2638 uid_id=self.tbl_uid.c.id,
2639 fingerprint=relation(Fingerprint, back_populates="uid"),
2640 ),
2641 )
2643 mapper(
2644 BinContents,
2645 self.tbl_bin_contents,
2646 properties=dict(
2647 binary=relation(
2648 DBBinary, backref=backref("contents", lazy="dynamic", cascade="all")
2649 ),
2650 file=self.tbl_bin_contents.c.file,
2651 ),
2652 )
2654 mapper(
2655 SrcContents,
2656 self.tbl_src_contents,
2657 properties=dict(
2658 source=relation(
2659 DBSource, backref=backref("contents", lazy="dynamic", cascade="all")
2660 ),
2661 file=self.tbl_src_contents.c.file,
2662 ),
2663 )
2665 mapper(
2666 MetadataKey,
2667 self.tbl_metadata_keys,
2668 properties=dict(
2669 key_id=self.tbl_metadata_keys.c.key_id, key=self.tbl_metadata_keys.c.key
2670 ),
2671 )
2673 mapper(
2674 BinaryMetadata,
2675 self.tbl_binaries_metadata,
2676 properties=dict(
2677 binary_id=self.tbl_binaries_metadata.c.bin_id,
2678 binary=relation(DBBinary, back_populates="key"),
2679 key_id=self.tbl_binaries_metadata.c.key_id,
2680 key=relation(MetadataKey),
2681 value=self.tbl_binaries_metadata.c.value,
2682 ),
2683 )
2685 mapper(
2686 SourceMetadata,
2687 self.tbl_source_metadata,
2688 properties=dict(
2689 source_id=self.tbl_source_metadata.c.src_id,
2690 source=relation(DBSource, back_populates="key"),
2691 key_id=self.tbl_source_metadata.c.key_id,
2692 key=relation(MetadataKey),
2693 value=self.tbl_source_metadata.c.value,
2694 ),
2695 )
2697 mapper(
2698 VersionCheck,
2699 self.tbl_version_check,
2700 properties=dict(
2701 suite_id=self.tbl_version_check.c.suite,
2702 suite=relation(
2703 Suite,
2704 primaryjoin=self.tbl_version_check.c.suite == self.tbl_suite.c.id,
2705 ),
2706 reference_id=self.tbl_version_check.c.reference,
2707 reference=relation(
2708 Suite,
2709 primaryjoin=self.tbl_version_check.c.reference
2710 == self.tbl_suite.c.id,
2711 lazy="joined",
2712 ),
2713 ),
2714 )
2716 ## Connection functions
2717 def __createconn(self):
2718 from .config import Config
2720 cnf = Config()
2721 if "DB::Service" in cnf: 2721 ↛ 2722line 2721 didn't jump to line 2722, because the condition on line 2721 was never true
2722 connstr = "postgresql://service=%s" % cnf["DB::Service"]
2723 elif "DB::Host" in cnf:
2724 # TCP/IP
2725 connstr = "postgresql://%s" % cnf["DB::Host"]
2726 if "DB::Port" in cnf and cnf["DB::Port"] != "-1": 2726 ↛ 2727line 2726 didn't jump to line 2727, because the condition on line 2726 was never true
2727 connstr += ":%s" % cnf["DB::Port"]
2728 connstr += "/%s" % cnf["DB::Name"]
2729 else:
2730 # Unix Socket
2731 connstr = "postgresql:///%s" % cnf["DB::Name"]
2732 if "DB::Port" in cnf and cnf["DB::Port"] != "-1": 2732 ↛ 2733line 2732 didn't jump to line 2733, because the condition on line 2732 was never true
2733 connstr += "?port=%s" % cnf["DB::Port"]
2735 engine_args = {"echo": self.debug}
2736 if "DB::PoolSize" in cnf:
2737 engine_args["pool_size"] = int(cnf["DB::PoolSize"])
2738 if "DB::MaxOverflow" in cnf:
2739 engine_args["max_overflow"] = int(cnf["DB::MaxOverflow"])
2740 # we don't support non-utf-8 connections
2741 engine_args["client_encoding"] = "utf-8"
2743 # Monkey patch a new dialect in in order to support service= syntax
2744 import sqlalchemy.dialects.postgresql
2745 from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
2747 class PGDialect_psycopg2_dak(PGDialect_psycopg2):
2748 def create_connect_args(self, url):
2749 if str(url).startswith("postgresql://service="): 2749 ↛ 2751line 2749 didn't jump to line 2751, because the condition on line 2749 was never true
2750 # Eww
2751 servicename = str(url)[21:]
2752 return (["service=%s" % servicename], {})
2753 else:
2754 return PGDialect_psycopg2.create_connect_args(self, url)
2756 sqlalchemy.dialects.postgresql.base.dialect = PGDialect_psycopg2_dak
2758 try:
2759 self.db_pg = create_engine(connstr, **engine_args)
2760 self.db_smaker = sessionmaker(
2761 bind=self.db_pg, autoflush=True, autocommit=False
2762 )
2764 if self.db_meta is None:
2765 self.__class__.db_meta = Base.metadata
2766 self.__class__.db_meta.bind = self.db_pg
2767 self.__setuptables()
2768 self.__setupmappers()
2770 except OperationalError as e:
2771 from . import utils
2773 utils.fubar("Cannot connect to database (%s)" % str(e))
2775 self.pid = os.getpid()
2777 def session(self, work_mem=0):
2778 """
2779 Returns a new session object. If a work_mem parameter is provided a new
2780 transaction is started and the work_mem parameter is set for this
2781 transaction. The work_mem parameter is measured in MB. A default value
2782 will be used if the parameter is not set.
2783 """
2784 # reinitialize DBConn in new processes
2785 if self.pid != os.getpid():
2786 self.__createconn()
2787 session = self.db_smaker()
2788 if work_mem > 0:
2789 session.execute("SET LOCAL work_mem TO '%d MB'" % work_mem)
2790 return session
2793__all__.append("DBConn")