# vim:set et sw=4:
"""
Queue utility functions for dak
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2001 - 2006 James Troup <james@nocrew.org>
@copyright: 2009, 2010  Joerg Jaspert <joerg@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
###############################################################################
from . import utils
from .config import Config
from .dbconn import (
    Architecture,
    DBBinary,
    DBSource,
    NewComment,
    PolicyQueueUpload,
    Priority,
    Section,
    Suite,
    get_mapped_component,
)
from .regexes import re_default_answer
################################################################################
[docs]def check_valid(overrides: list[dict], session) -> bool:
    """Check if section and priority for new overrides exist in database.
    Additionally does sanity checks:
      - debian-installer packages have to be udeb (or source)
      - non debian-installer packages cannot be udeb
    :param overrides: list of overrides to check. The overrides need
                      to be given in form of a dict with the following keys:
                      - package: package name
                      - priority
                      - section
                      - component
                      - type: type of requested override ('dsc', 'deb' or 'udeb')
                      All values are strings.
    :return: :const:`True` if all overrides are valid, :const:`False` if there is any
             invalid override.
    """
    all_valid = True
    for o in overrides:
        o["valid"] = True
        if session.query(Priority).filter_by(priority=o["priority"]).first() is None:
            o["valid"] = False
        if session.query(Section).filter_by(section=o["section"]).first() is None:
            o["valid"] = False
        if get_mapped_component(o["component"], session) is None:
            o["valid"] = False
        if o["type"] not in ("dsc", "deb", "udeb"):
            raise Exception("Unknown override type {0}".format(o["type"]))
        if o["type"] == "udeb" and o["section"].split("/", 1)[-1] != "debian-installer":
            o["valid"] = False
        if o["section"].split("/", 1)[-1] == "debian-installer" and o["type"] not in (
            "dsc",
            "udeb",
        ):
            o["valid"] = False
        all_valid = all_valid and o["valid"]
    return all_valid 
###############################################################################
[docs]def prod_maintainer(notes, upload: PolicyQueueUpload, session, trainee=False):
    cnf = Config()
    changes = upload.changes
    whitelists = [upload.target_suite.mail_whitelist]
    # Here we prepare an editor and get them ready to prod...
    prod_message = "\n\n=====\n\n".join([note.comment for note in notes])
    answer = "E"
    while answer == "E":
        prod_message = utils.call_editor(prod_message)
        print("Prod message:")
        print(
            utils.prefix_multi_line_string(prod_message, "  ", include_blank_lines=True)
        )
        prompt = "[P]rod, Bug, Edit, Abandon, Quit ?"
        answer = "XXX"
        while prompt.find(answer) == -1:
            answer = utils.input_or_exit(prompt)
            m = re_default_answer.search(prompt)
            if answer == "":
                answer = m.group(1)
            answer = answer[:1].upper()
    if answer == "A":
        return
    elif answer == "Q":
        return 0
    # Otherwise, do the proding...
    user_email_address = utils.whoami() + " <%s>" % (cnf["Dinstall::MyAdminAddress"])
    is_bug = answer == "B"
    changed_by = changes.changedby or changes.maintainer
    maintainer = changes.maintainer
    maintainer_to = utils.mail_addresses_for_upload(
        maintainer, changed_by, changes.fingerprint, changes.authorized_by_fingerprint
    )
    Subst = {
        "__SOURCE__": upload.changes.source,
        "__VERSION__": upload.changes.version,
        "__ARCHITECTURE__": upload.changes.architecture,
        "__CHANGES_FILENAME__": upload.changes.changesname,
        "__MAINTAINER_TO__": ", ".join(maintainer_to),
    }
    Subst["__FROM_ADDRESS__"] = user_email_address
    Subst["__PROD_MESSAGE__"] = prod_message
    Subst["__CC__"] = "Cc: " + cnf["Dinstall::MyEmailAddress"]
    if is_bug:
        Subst["__DEBBUGS_CC__"] = ", ".join(
            maintainer_to + [cnf["Dinstall::MyEmailAddress"]]
        )
        prod_mail_message = utils.TemplateSubst(
            Subst, cnf["Dir::Templates"] + "/process-new.bug"
        )
    else:
        prod_mail_message = utils.TemplateSubst(
            Subst, cnf["Dir::Templates"] + "/process-new.prod"
        )
    # Send the prod mail
    utils.send_mail(prod_mail_message, whitelists=whitelists)
    if is_bug:
        print("Filed bug against source package")
    else:
        print("Sent prodding message")
    answer = utils.input_or_exit("Store prod message as note? (Y/n)?").lower()
    if answer != "n":
        comment = NewComment()
        comment.policy_queue = upload.policy_queue
        comment.package = upload.changes.source
        comment.version = upload.changes.version
        comment.comment = prod_mail_message
        comment.author = utils.whoami()
        comment.trainee = trainee
        session.add(comment)
        session.commit() 
################################################################################
[docs]def edit_note(note, upload, session, trainee=False):
    newnote = ""
    answer = "E"
    while answer == "E":
        newnote = utils.call_editor(newnote).rstrip()
        print("New Note:")
        print(utils.prefix_multi_line_string(newnote, "  "))
        empty_note = not newnote.strip()
        if empty_note:
            prompt = "Done, Edit, [A]bandon, Quit ?"
        else:
            prompt = "[D]one, Edit, Abandon, Quit ?"
        answer = "XXX"
        while prompt.find(answer) == -1:
            answer = utils.input_or_exit(prompt)
            m = re_default_answer.search(prompt)
            if answer == "":
                answer = m.group(1)
            answer = answer[:1].upper()
    if answer == "A":
        return
    elif answer == "Q":
        return 0
    comment = NewComment()
    comment.policy_queue = upload.policy_queue
    comment.package = upload.changes.source
    comment.version = upload.changes.version
    comment.comment = newnote
    comment.author = utils.whoami()
    comment.trainee = trainee
    session.add(comment)
    session.commit() 
###############################################################################
[docs]def get_suite_version_by_source(source: str, session) -> list[tuple[str, str]]:
    "returns a list of tuples (suite_name, version) for source package"
    q = (
        session.query(Suite.suite_name, DBSource.version)
        .join(Suite.sources)
        .filter_by(source=source)
    )
    return q.all() 
[docs]def get_suite_version_by_package(
    package: str, arch_string: str, session
) -> list[tuple[str, str]]:
    """
    returns a list of tuples (suite_name, version) for binary package and
    arch_string
    """
    return (
        session.query(Suite.suite_name, DBBinary.version)
        .join(Suite.binaries)
        .filter_by(package=package)
        .join(DBBinary.architecture)
        .filter(Architecture.arch_string.in_([arch_string, "all"]))
        .all()
    )