1# vim:set et sw=4:
3"""
4Queue utility functions for dak
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2001 - 2006 James Troup <james@nocrew.org>
8@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org>
9@license: GNU General Public License version 2 or later
10"""
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License as published by
14# the Free Software Foundation; either version 2 of the License, or
15# (at your option) any later version.
17# This program is distributed in the hope that it will be useful,
18# but WITHOUT ANY WARRANTY; without even the implied warranty of
19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20# GNU General Public License for more details.
22# You should have received a copy of the GNU General Public License
23# along with this program; if not, write to the Free Software
24# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26###############################################################################
28from . import utils
29from .config import Config
30from .dbconn import (
31 Architecture,
32 DBBinary,
33 DBSource,
34 NewComment,
35 PolicyQueueUpload,
36 Priority,
37 Section,
38 Suite,
39 get_mapped_component,
40)
41from .regexes import re_default_answer
43################################################################################
46def check_valid(overrides: list[dict], session) -> bool:
47 """Check if section and priority for new overrides exist in database.
49 Additionally does sanity checks:
50 - debian-installer packages have to be udeb (or source)
51 - non debian-installer packages cannot be udeb
53 :param overrides: list of overrides to check. The overrides need
54 to be given in form of a dict with the following keys:
56 - package: package name
57 - priority
58 - section
59 - component
60 - type: type of requested override ('dsc', 'deb' or 'udeb')
62 All values are strings.
63 :return: :const:`True` if all overrides are valid, :const:`False` if there is any
64 invalid override.
65 """
66 all_valid = True
67 for o in overrides:
68 o["valid"] = True
69 if session.query(Priority).filter_by(priority=o["priority"]).first() is None: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true
70 o["valid"] = False
71 if session.query(Section).filter_by(section=o["section"]).first() is None: 71 ↛ 72line 71 didn't jump to line 72, because the condition on line 71 was never true
72 o["valid"] = False
73 if get_mapped_component(o["component"], session) is None: 73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true
74 o["valid"] = False
75 if o["type"] not in ("dsc", "deb", "udeb"): 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true
76 raise Exception("Unknown override type {0}".format(o["type"]))
77 if o["type"] == "udeb" and o["section"].split("/", 1)[-1] != "debian-installer": 77 ↛ 78line 77 didn't jump to line 78, because the condition on line 77 was never true
78 o["valid"] = False
79 if o["section"].split("/", 1)[-1] == "debian-installer" and o["type"] not in ( 79 ↛ 83line 79 didn't jump to line 83, because the condition on line 79 was never true
80 "dsc",
81 "udeb",
82 ):
83 o["valid"] = False
84 all_valid = all_valid and o["valid"]
85 return all_valid
88###############################################################################
91def prod_maintainer(notes, upload: PolicyQueueUpload, session, trainee=False):
92 cnf = Config()
93 changes = upload.changes
94 whitelists = [upload.target_suite.mail_whitelist]
96 # Here we prepare an editor and get them ready to prod...
97 prod_message = "\n\n=====\n\n".join([note.comment for note in notes])
98 answer = "E"
99 while answer == "E":
100 prod_message = utils.call_editor(prod_message)
101 print("Prod message:")
102 print(
103 utils.prefix_multi_line_string(prod_message, " ", include_blank_lines=True)
104 )
105 prompt = "[P]rod, Bug, Edit, Abandon, Quit ?"
106 answer = "XXX"
107 while prompt.find(answer) == -1:
108 answer = utils.input_or_exit(prompt)
109 m = re_default_answer.search(prompt)
110 if answer == "":
111 answer = m.group(1)
112 answer = answer[:1].upper()
113 if answer == "A":
114 return
115 elif answer == "Q":
116 return 0
117 # Otherwise, do the proding...
118 user_email_address = utils.whoami() + " <%s>" % (cnf["Dinstall::MyAdminAddress"])
120 is_bug = answer == "B"
122 changed_by = changes.changedby or changes.maintainer
123 maintainer = changes.maintainer
124 maintainer_to = utils.mail_addresses_for_upload(
125 maintainer, changed_by, changes.fingerprint
126 )
128 Subst = {
129 "__SOURCE__": upload.changes.source,
130 "__VERSION__": upload.changes.version,
131 "__ARCHITECTURE__": upload.changes.architecture,
132 "__CHANGES_FILENAME__": upload.changes.changesname,
133 "__MAINTAINER_TO__": ", ".join(maintainer_to),
134 }
136 Subst["__FROM_ADDRESS__"] = user_email_address
137 Subst["__PROD_MESSAGE__"] = prod_message
138 Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"]
140 if is_bug:
141 Subst["__DEBBUGS_CC__"] = ", ".join(
142 maintainer_to + [cnf["Dinstall::MyEmailAddress"]]
143 )
144 prod_mail_message = utils.TemplateSubst(
145 Subst, cnf["Dir::Templates"] + "/process-new.bug"
146 )
147 else:
148 prod_mail_message = utils.TemplateSubst(
149 Subst, cnf["Dir::Templates"] + "/process-new.prod"
150 )
152 # Send the prod mail
153 utils.send_mail(prod_mail_message, whitelists=whitelists)
155 if is_bug:
156 print("Filed bug against source package")
157 else:
158 print("Sent prodding message")
160 answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower()
161 if answer != "n":
162 comment = NewComment()
163 comment.policy_queue = upload.policy_queue
164 comment.package = upload.changes.source
165 comment.version = upload.changes.version
166 comment.comment = prod_mail_message
167 comment.author = utils.whoami()
168 comment.trainee = trainee
169 session.add(comment)
170 session.commit()
173################################################################################
176def edit_note(note, upload, session, trainee=False):
177 newnote = ""
178 answer = "E"
179 while answer == "E":
180 newnote = utils.call_editor(newnote).rstrip()
181 print("New Note:")
182 print(utils.prefix_multi_line_string(newnote, " "))
183 empty_note = not newnote.strip()
184 if empty_note:
185 prompt = "Done, Edit, [A]bandon, Quit ?"
186 else:
187 prompt = "[D]one, Edit, Abandon, Quit ?"
188 answer = "XXX"
189 while prompt.find(answer) == -1:
190 answer = utils.input_or_exit(prompt)
191 m = re_default_answer.search(prompt)
192 if answer == "":
193 answer = m.group(1)
194 answer = answer[:1].upper()
195 if answer == "A":
196 return
197 elif answer == "Q":
198 return 0
200 comment = NewComment()
201 comment.policy_queue = upload.policy_queue
202 comment.package = upload.changes.source
203 comment.version = upload.changes.version
204 comment.comment = newnote
205 comment.author = utils.whoami()
206 comment.trainee = trainee
207 session.add(comment)
208 session.commit()
211###############################################################################
214def get_suite_version_by_source(source: str, session) -> list[tuple[str, str]]:
215 "returns a list of tuples (suite_name, version) for source package"
216 q = (
217 session.query(Suite.suite_name, DBSource.version)
218 .join(Suite.sources)
219 .filter_by(source=source)
220 )
221 return q.all()
224def get_suite_version_by_package(
225 package: str, arch_string: str, session
226) -> list[tuple[str, str]]:
227 """
228 returns a list of tuples (suite_name, version) for binary package and
229 arch_string
230 """
231 return (
232 session.query(Suite.suite_name, DBBinary.version)
233 .join(Suite.binaries)
234 .filter_by(package=package)
235 .join(DBBinary.architecture)
236 .filter(Architecture.arch_string.in_([arch_string, "all"]))
237 .all()
238 )