Coverage for daklib/queue.py: 21%
114 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1# vim:set et sw=4:
3"""
4Queue utility functions for dak
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2001 - 2006 James Troup <james@nocrew.org>
8@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org>
9@license: GNU General Public License version 2 or later
10"""
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License as published by
14# the Free Software Foundation; either version 2 of the License, or
15# (at your option) any later version.
17# This program is distributed in the hope that it will be useful,
18# but WITHOUT ANY WARRANTY; without even the implied warranty of
19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20# GNU General Public License for more details.
22# You should have received a copy of the GNU General Public License
23# along with this program; if not, write to the Free Software
24# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26###############################################################################
28from collections.abc import Iterable
29from typing import TYPE_CHECKING, Literal
31from . import utils
32from .config import Config
33from .dbconn import (
34 Architecture,
35 DBBinary,
36 DBSource,
37 NewComment,
38 PolicyQueueUpload,
39 Priority,
40 Section,
41 Suite,
42 get_mapped_component,
43)
44from .regexes import re_default_answer
46if TYPE_CHECKING:
47 from sqlalchemy.engine import Row
48 from sqlalchemy.orm import Session
50 from .policy import MissingOverride
52################################################################################
55def check_valid(overrides: list["MissingOverride"], session: "Session") -> bool:
56 """Check if section and priority for new overrides exist in database.
58 Additionally does sanity checks:
59 - debian-installer packages have to be udeb (or source)
60 - non debian-installer packages cannot be udeb
62 :param overrides: list of overrides to check. The overrides need
63 to be given in form of a dict with the following keys:
65 - package: package name
66 - priority
67 - section
68 - component
69 - type: type of requested override ('dsc', 'deb' or 'udeb')
71 All values are strings.
72 :return: :const:`True` if all overrides are valid, :const:`False` if there is any
73 invalid override.
74 """
75 all_valid = True
76 for o in overrides:
77 o["valid"] = True
78 if session.query(Priority).filter_by(priority=o["priority"]).first() is None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 o["valid"] = False
80 if session.query(Section).filter_by(section=o["section"]).first() is None: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 o["valid"] = False
82 if get_mapped_component(o["component"], session) is None: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 o["valid"] = False
84 if o["type"] not in ("dsc", "deb", "udeb"): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 raise Exception("Unknown override type {0}".format(o["type"]))
86 if o["type"] == "udeb" and o["section"].split("/", 1)[-1] != "debian-installer": 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 o["valid"] = False
88 if o["section"].split("/", 1)[-1] == "debian-installer" and o["type"] not in ( 88 ↛ 92line 88 didn't jump to line 92 because the condition on line 88 was never true
89 "dsc",
90 "udeb",
91 ):
92 o["valid"] = False
93 all_valid = all_valid and o["valid"]
94 return all_valid
97###############################################################################
100def prod_maintainer(
101 notes: Iterable[NewComment],
102 upload: PolicyQueueUpload,
103 session: "Session",
104 trainee=False,
105) -> Literal[0] | None:
106 cnf = Config()
107 changes = upload.changes
108 whitelists = [upload.target_suite.mail_whitelist]
110 # Here we prepare an editor and get them ready to prod...
111 prod_message = "\n\n=====\n\n".join([note.comment for note in notes])
112 answer = "E"
113 while answer == "E":
114 prod_message = utils.call_editor(prod_message)
115 print("Prod message:")
116 print(
117 utils.prefix_multi_line_string(prod_message, " ", include_blank_lines=True)
118 )
119 prompt = "[P]rod, Bug, Edit, Abandon, Quit ?"
120 answer = "XXX"
121 while prompt.find(answer) == -1:
122 answer = utils.input_or_exit(prompt)
123 m = re_default_answer.search(prompt)
124 if answer == "":
125 assert m is not None
126 answer = m.group(1)
127 answer = answer[:1].upper()
128 if answer == "A":
129 return None
130 elif answer == "Q":
131 return 0
132 # Otherwise, do the proding...
133 user_email_address = utils.whoami() + " <%s>" % (cnf["Dinstall::MyAdminAddress"])
135 is_bug = answer == "B"
137 changed_by = changes.changedby or changes.maintainer
138 maintainer = changes.maintainer
139 maintainer_to = utils.mail_addresses_for_upload(
140 maintainer, changed_by, changes.fingerprint, changes.authorized_by_fingerprint
141 )
143 Subst = {
144 "__SOURCE__": upload.changes.source,
145 "__VERSION__": upload.changes.version,
146 "__ARCHITECTURE__": upload.changes.architecture,
147 "__CHANGES_FILENAME__": upload.changes.changesname,
148 "__MAINTAINER_TO__": ", ".join(maintainer_to),
149 }
151 Subst["__FROM_ADDRESS__"] = user_email_address
152 Subst["__PROD_MESSAGE__"] = prod_message
153 Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"]
155 if is_bug:
156 Subst["__DEBBUGS_CC__"] = ", ".join(
157 maintainer_to + [cnf["Dinstall::MyEmailAddress"]]
158 )
159 prod_mail_message = utils.TemplateSubst(
160 Subst, cnf["Dir::Templates"] + "/process-new.bug"
161 )
162 else:
163 prod_mail_message = utils.TemplateSubst(
164 Subst, cnf["Dir::Templates"] + "/process-new.prod"
165 )
167 # Send the prod mail
168 utils.send_mail(prod_mail_message, whitelists=whitelists)
170 if is_bug:
171 print("Filed bug against source package")
172 else:
173 print("Sent prodding message")
175 answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower()
176 if answer != "n":
177 comment = NewComment()
178 comment.policy_queue = upload.policy_queue
179 comment.package = upload.changes.source
180 comment.version = upload.changes.version
181 comment.comment = prod_mail_message
182 comment.author = utils.whoami()
183 comment.trainee = trainee
184 session.add(comment)
185 session.commit()
187 return None
190################################################################################
193def edit_note(
194 upload: PolicyQueueUpload, session: "Session", trainee=False
195) -> Literal[0] | None:
196 newnote = ""
197 answer = "E"
198 while answer == "E":
199 newnote = utils.call_editor(newnote).rstrip()
200 print("New Note:")
201 print(utils.prefix_multi_line_string(newnote, " "))
202 empty_note = not newnote.strip()
203 if empty_note:
204 prompt = "Done, Edit, [A]bandon, Quit ?"
205 else:
206 prompt = "[D]one, Edit, Abandon, Quit ?"
207 answer = "XXX"
208 while prompt.find(answer) == -1:
209 answer = utils.input_or_exit(prompt)
210 m = re_default_answer.search(prompt)
211 if answer == "":
212 assert m is not None
213 answer = m.group(1)
214 answer = answer[:1].upper()
215 if answer == "A":
216 return None
217 elif answer == "Q":
218 return 0
220 comment = NewComment()
221 comment.policy_queue = upload.policy_queue
222 comment.package = upload.changes.source
223 comment.version = upload.changes.version
224 comment.comment = newnote
225 comment.author = utils.whoami()
226 comment.trainee = trainee
227 session.add(comment)
228 session.commit()
230 return None
233###############################################################################
236def get_suite_version_by_source(
237 source: str, session: "Session"
238) -> "list[Row[tuple[str, str]]]":
239 "returns a list of tuples (suite_name, version) for source package"
240 q = (
241 session.query(Suite.suite_name, DBSource.version)
242 .join(Suite.sources)
243 .filter_by(source=source)
244 )
245 return q.all()
248def get_suite_version_by_package(
249 package: str, arch_string: str, session: "Session"
250) -> "list[Row[tuple[str, str]]]":
251 """
252 returns a list of tuples (suite_name, version) for binary package and
253 arch_string
254 """
255 return (
256 session.query(Suite.suite_name, DBBinary.version)
257 .join(Suite.binaries)
258 .filter_by(package=package)
259 .join(DBBinary.architecture)
260 .filter(Architecture.arch_string.in_([arch_string, "all"]))
261 .all()
262 )