# vim:set et sw=4:
"""
Queue utility functions for dak
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2001 - 2006 James Troup <james@nocrew.org>
@copyright: 2009, 2010 Joerg Jaspert <joerg@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
###############################################################################
from . import utils
from .config import Config
from .dbconn import (
Architecture,
DBBinary,
DBSource,
NewComment,
PolicyQueueUpload,
Priority,
Section,
Suite,
get_mapped_component,
)
from .regexes import re_default_answer
################################################################################
[docs]def check_valid(overrides: list[dict], session) -> bool:
"""Check if section and priority for new overrides exist in database.
Additionally does sanity checks:
- debian-installer packages have to be udeb (or source)
- non debian-installer packages cannot be udeb
:param overrides: list of overrides to check. The overrides need
to be given in form of a dict with the following keys:
- package: package name
- priority
- section
- component
- type: type of requested override ('dsc', 'deb' or 'udeb')
All values are strings.
:return: :const:`True` if all overrides are valid, :const:`False` if there is any
invalid override.
"""
all_valid = True
for o in overrides:
o["valid"] = True
if session.query(Priority).filter_by(priority=o["priority"]).first() is None:
o["valid"] = False
if session.query(Section).filter_by(section=o["section"]).first() is None:
o["valid"] = False
if get_mapped_component(o["component"], session) is None:
o["valid"] = False
if o["type"] not in ("dsc", "deb", "udeb"):
raise Exception("Unknown override type {0}".format(o["type"]))
if o["type"] == "udeb" and o["section"].split("/", 1)[-1] != "debian-installer":
o["valid"] = False
if o["section"].split("/", 1)[-1] == "debian-installer" and o["type"] not in (
"dsc",
"udeb",
):
o["valid"] = False
all_valid = all_valid and o["valid"]
return all_valid
###############################################################################
[docs]def prod_maintainer(notes, upload: PolicyQueueUpload, session, trainee=False):
cnf = Config()
changes = upload.changes
whitelists = [upload.target_suite.mail_whitelist]
# Here we prepare an editor and get them ready to prod...
prod_message = "\n\n=====\n\n".join([note.comment for note in notes])
answer = "E"
while answer == "E":
prod_message = utils.call_editor(prod_message)
print("Prod message:")
print(
utils.prefix_multi_line_string(prod_message, " ", include_blank_lines=True)
)
prompt = "[P]rod, Bug, Edit, Abandon, Quit ?"
answer = "XXX"
while prompt.find(answer) == -1:
answer = utils.input_or_exit(prompt)
m = re_default_answer.search(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
if answer == "A":
return
elif answer == "Q":
return 0
# Otherwise, do the proding...
user_email_address = utils.whoami() + " <%s>" % (cnf["Dinstall::MyAdminAddress"])
is_bug = answer == "B"
changed_by = changes.changedby or changes.maintainer
maintainer = changes.maintainer
maintainer_to = utils.mail_addresses_for_upload(
maintainer, changed_by, changes.fingerprint
)
Subst = {
"__SOURCE__": upload.changes.source,
"__VERSION__": upload.changes.version,
"__ARCHITECTURE__": upload.changes.architecture,
"__CHANGES_FILENAME__": upload.changes.changesname,
"__MAINTAINER_TO__": ", ".join(maintainer_to),
}
Subst["__FROM_ADDRESS__"] = user_email_address
Subst["__PROD_MESSAGE__"] = prod_message
Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"]
if is_bug:
Subst["__DEBBUGS_CC__"] = ", ".join(
maintainer_to + [cnf["Dinstall::MyEmailAddress"]]
)
prod_mail_message = utils.TemplateSubst(
Subst, cnf["Dir::Templates"] + "/process-new.bug"
)
else:
prod_mail_message = utils.TemplateSubst(
Subst, cnf["Dir::Templates"] + "/process-new.prod"
)
# Send the prod mail
utils.send_mail(prod_mail_message, whitelists=whitelists)
if is_bug:
print("Filed bug against source package")
else:
print("Sent prodding message")
answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower()
if answer != "n":
comment = NewComment()
comment.policy_queue = upload.policy_queue
comment.package = upload.changes.source
comment.version = upload.changes.version
comment.comment = prod_mail_message
comment.author = utils.whoami()
comment.trainee = trainee
session.add(comment)
session.commit()
################################################################################
[docs]def edit_note(note, upload, session, trainee=False):
newnote = ""
answer = "E"
while answer == "E":
newnote = utils.call_editor(newnote).rstrip()
print("New Note:")
print(utils.prefix_multi_line_string(newnote, " "))
empty_note = not newnote.strip()
if empty_note:
prompt = "Done, Edit, [A]bandon, Quit ?"
else:
prompt = "[D]one, Edit, Abandon, Quit ?"
answer = "XXX"
while prompt.find(answer) == -1:
answer = utils.input_or_exit(prompt)
m = re_default_answer.search(prompt)
if answer == "":
answer = m.group(1)
answer = answer[:1].upper()
if answer == "A":
return
elif answer == "Q":
return 0
comment = NewComment()
comment.policy_queue = upload.policy_queue
comment.package = upload.changes.source
comment.version = upload.changes.version
comment.comment = newnote
comment.author = utils.whoami()
comment.trainee = trainee
session.add(comment)
session.commit()
###############################################################################
[docs]def get_suite_version_by_source(source: str, session) -> list[tuple[str, str]]:
"returns a list of tuples (suite_name, version) for source package"
q = (
session.query(Suite.suite_name, DBSource.version)
.join(Suite.sources)
.filter_by(source=source)
)
return q.all()
[docs]def get_suite_version_by_package(
package: str, arch_string: str, session
) -> list[tuple[str, str]]:
"""
returns a list of tuples (suite_name, version) for binary package and
arch_string
"""
return (
session.query(Suite.suite_name, DBBinary.version)
.join(Suite.binaries)
.filter_by(package=package)
.join(DBBinary.architecture)
.filter(Architecture.arch_string.in_([arch_string, "all"]))
.all()
)