1# vim:set et sw=4:
3"""
4Queue utility functions for dak
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2001 - 2006 James Troup <james@nocrew.org>
8@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org>
9@license: GNU General Public License version 2 or later
10"""
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License as published by
14# the Free Software Foundation; either version 2 of the License, or
15# (at your option) any later version.
17# This program is distributed in the hope that it will be useful,
18# but WITHOUT ANY WARRANTY; without even the implied warranty of
19# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20# GNU General Public License for more details.
22# You should have received a copy of the GNU General Public License
23# along with this program; if not, write to the Free Software
24# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26###############################################################################
28from . import utils
30from .regexes import *
31from .config import Config
32from .dbconn import *
34################################################################################
37def check_valid(overrides: list[dict], session) -> bool:
38 """Check if section and priority for new overrides exist in database.
40 Additionally does sanity checks:
41 - debian-installer packages have to be udeb (or source)
42 - non debian-installer packages cannot be udeb
44 :param overrides: list of overrides to check. The overrides need
45 to be given in form of a dict with the following keys:
47 - package: package name
48 - priority
49 - section
50 - component
51 - type: type of requested override ('dsc', 'deb' or 'udeb')
53 All values are strings.
54 :return: :const:`True` if all overrides are valid, :const:`False` if there is any
55 invalid override.
56 """
57 all_valid = True
58 for o in overrides:
59 o['valid'] = True
60 if session.query(Priority).filter_by(priority=o['priority']).first() is None: 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true
61 o['valid'] = False
62 if session.query(Section).filter_by(section=o['section']).first() is None: 62 ↛ 63line 62 didn't jump to line 63, because the condition on line 62 was never true
63 o['valid'] = False
64 if get_mapped_component(o['component'], session) is None: 64 ↛ 65line 64 didn't jump to line 65, because the condition on line 64 was never true
65 o['valid'] = False
66 if o['type'] not in ('dsc', 'deb', 'udeb'): 66 ↛ 67line 66 didn't jump to line 67, because the condition on line 66 was never true
67 raise Exception('Unknown override type {0}'.format(o['type']))
68 if o['type'] == 'udeb' and o['section'].split('/', 1)[-1] != 'debian-installer': 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true
69 o['valid'] = False
70 if o['section'].split('/', 1)[-1] == 'debian-installer' and o['type'] not in ('dsc', 'udeb'): 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true
71 o['valid'] = False
72 all_valid = all_valid and o['valid']
73 return all_valid
75###############################################################################
78def prod_maintainer(notes, upload: PolicyQueueUpload, session, trainee=False):
79 cnf = Config()
80 changes = upload.changes
81 whitelists = [upload.target_suite.mail_whitelist]
83 # Here we prepare an editor and get them ready to prod...
84 prod_message = "\n\n=====\n\n".join([note.comment for note in notes])
85 answer = 'E'
86 while answer == 'E':
87 prod_message = utils.call_editor(prod_message)
88 print("Prod message:")
89 print(utils.prefix_multi_line_string(prod_message, " ", include_blank_lines=True))
90 prompt = "[P]rod, Edit, Abandon, Quit ?"
91 answer = "XXX"
92 while prompt.find(answer) == -1:
93 answer = utils.input_or_exit(prompt)
94 m = re_default_answer.search(prompt)
95 if answer == "":
96 answer = m.group(1)
97 answer = answer[:1].upper()
98 if answer == 'A':
99 return
100 elif answer == 'Q':
101 return 0
102 # Otherwise, do the proding...
103 user_email_address = utils.whoami() + " <%s>" % (
104 cnf["Dinstall::MyAdminAddress"])
106 changed_by = changes.changedby or changes.maintainer
107 maintainer = changes.maintainer
108 maintainer_to = utils.mail_addresses_for_upload(maintainer, changed_by, changes.fingerprint)
110 Subst = {
111 '__SOURCE__': upload.changes.source,
112 '__VERSION__': upload.changes.version,
113 '__ARCHITECTURE__': upload.changes.architecture,
114 '__CHANGES_FILENAME__': upload.changes.changesname,
115 '__MAINTAINER_TO__': ", ".join(maintainer_to),
116 }
118 Subst["__FROM_ADDRESS__"] = user_email_address
119 Subst["__PROD_MESSAGE__"] = prod_message
120 Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"]
122 prod_mail_message = utils.TemplateSubst(
123 Subst, cnf["Dir::Templates"] + "/process-new.prod")
125 # Send the prod mail
126 utils.send_mail(prod_mail_message, whitelists=whitelists)
128 print("Sent prodding message")
130 answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower()
131 if answer != "n":
132 comment = NewComment()
133 comment.policy_queue = upload.policy_queue
134 comment.package = upload.changes.source
135 comment.version = upload.changes.version
136 comment.comment = prod_mail_message
137 comment.author = utils.whoami()
138 comment.trainee = trainee
139 session.add(comment)
140 session.commit()
142################################################################################
145def edit_note(note, upload, session, trainee=False):
146 newnote = ""
147 answer = 'E'
148 while answer == 'E':
149 newnote = utils.call_editor(newnote).rstrip()
150 print("New Note:")
151 print(utils.prefix_multi_line_string(newnote, " "))
152 empty_note = not newnote.strip()
153 if empty_note:
154 prompt = "Done, Edit, [A]bandon, Quit ?"
155 else:
156 prompt = "[D]one, Edit, Abandon, Quit ?"
157 answer = "XXX"
158 while prompt.find(answer) == -1:
159 answer = utils.input_or_exit(prompt)
160 m = re_default_answer.search(prompt)
161 if answer == "":
162 answer = m.group(1)
163 answer = answer[:1].upper()
164 if answer == 'A':
165 return
166 elif answer == 'Q':
167 return 0
169 comment = NewComment()
170 comment.policy_queue = upload.policy_queue
171 comment.package = upload.changes.source
172 comment.version = upload.changes.version
173 comment.comment = newnote
174 comment.author = utils.whoami()
175 comment.trainee = trainee
176 session.add(comment)
177 session.commit()
179###############################################################################
182def get_suite_version_by_source(source: str, session) -> list[tuple[str, str]]:
183 'returns a list of tuples (suite_name, version) for source package'
184 q = session.query(Suite.suite_name, DBSource.version). \
185 join(Suite.sources).filter_by(source=source)
186 return q.all()
189def get_suite_version_by_package(package: str, arch_string: str, session) -> list[tuple[str, str]]:
190 '''
191 returns a list of tuples (suite_name, version) for binary package and
192 arch_string
193 '''
194 return session.query(Suite.suite_name, DBBinary.version). \
195 join(Suite.binaries).filter_by(package=package). \
196 join(DBBinary.architecture). \
197 filter(Architecture.arch_string.in_([arch_string, 'all'])).all()