"""
functions related to handling uploads via tag2upload
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2025, Ansgar <ansgar@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
from __future__ import annotations
import base64
import json
import os.path
import re
import subprocess
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Union
from daklib.config import Config
from daklib.gpg import SignedFile
if TYPE_CHECKING:
    import daklib.archive
    import daklib.upload
[docs]@dataclass
class Tag:
    object: str
    type: str
    tag: str
    tagger: str
    message: str 
[docs]def _parse_tag(data: bytes) -> Tag:
    """
    parse a Git `tag`
    """
    header: dict[str, str] = {}
    message = ""
    lines = iter(data.decode("utf-8").split("\n"))
    for line in lines:
        if not line:
            # end of header, optional message follows
            break
        fields = line.split(" ", 1)
        if len(fields) != 2:
            raise ValueError("Invalid header line.")
        if fields[0] in header:
            raise ValueError(f"Repeated header field '{fields[0]}'")
        header[fields[0]] = fields[1]
    # According to man:git-mktag(1), a tag has exactly these header fields:
    if header.keys() != {"object", "type", "tag", "tagger"}:
        raise ValueError("Invalid set of header fields.")
    message = "\n".join(lines)
    return Tag(
        object=header["object"],
        type=header["type"],
        tag=header["tag"],
        tagger=header["tagger"],
        message=message,
    ) 
Metadata = dict[str, Union[str, bool]]
_re_dgit_metadata = re.compile(r'\A\[dgit ([^"].*)\]\Z')
[docs]@dataclass
class TagInfo:
    tag: Tag
    metadata: Metadata
    signed_file: SignedFile 
[docs]def verify_and_parse_git_tag(
    tag: bytes, signature: bytes, *, keyrings: Iterable[str]
) -> TagInfo:
    """
    verify and parse information from Git tag and its detached signature
    Raises:
        daklib.gpg.GpgException: when signature is invalid
        ValueError: when passed information is syntactically invalid
    """
    signed_file = SignedFile(tag, detached_signature=signature, keyrings=keyrings)
    tag = _parse_tag(signed_file.contents)
    metadata = _dgit_metadata(tag.message)
    if metadata.get("please-upload") is not True:
        raise ValueError("tag2upload: tag misses `please-upload`")
    if not (distro := metadata.get("distro")):
        raise ValueError("tag2upload: tag misses `distro`")
    if not metadata.get("source"):
        raise ValueError("tag2upload: tag misses `source`")
    if not (version := metadata.get("version")):
        raise ValueError("tag2upload: tag misses `version`")
    expected_tag = f"{distro}/{mangle_version_dep14(version)}"
    if tag.tag != expected_tag:
        raise ValueError("tag2upload: tag name does not match expected value")
    return TagInfo(tag=tag, metadata=metadata, signed_file=signed_file) 
[docs]@dataclass
class GitTagInfo:
    tag: str
    fp: str 
[docs]def parse_git_tag_info(value: str) -> GitTagInfo:
    """
    parse Git-Tag-Info field from .changes files
    """
    info: dict[str, str] = {}
    for item in value.split():
        kv = item.split("=", 1)
        if len(kv) != 2:
            raise ValueError("Item not in `key=value` form.")
        if kv[0] in info:
            raise ValueError(f"Key '{kv[0]}' seen twice.")
        info[kv[0]] = kv[1]
    if not {"tag", "fp"} <= info.keys():
        raise ValueError("Missing required key.")
    return GitTagInfo(tag=info["tag"], fp=info["fp"]) 
_re_mangle_dot = re.compile(r"\.(?=\.|(?:lock)?\Z)")
[docs]def mangle_version_dep14(version: str) -> str:
    """
    mangle version according to DEP-14
    """
    # https://dep-team.pages.debian.net/deps/dep14/
    version = version.translate({ord(":"): "%", ord("~"): "_"})
    return _re_mangle_dot.sub(".#", version) 
[docs]class HelperException(Exception):
    """
    raised when the helper program returned an error or wrong information
    """ 
[docs]def _call_helper(path: str) -> dict:
    config = Config()
    cmd = config.get("Dinstall::Tag2UploadHelper").split()
    if not cmd:
        raise HelperException("tag2upload helper not configured")
    if unpriv_user := config.get("Dinstall::UnprivUser"):
        cmd = ["sudo", "-H", "-u", unpriv_user, *cmd]
    cmd.extend(["--audit", "/dev/stdin"])
    with open(path, "rb") as fh:
        process = subprocess.run(cmd, stdin=fh, capture_output=True, check=True)
    result = json.loads(process.stdout)
    if not isinstance(result, dict):
        raise HelperException("tag2upload helper did not return a dictionary")
    if result_error := result.get("error"):
        raise HelperException(f"tag2upload helper reported an error: {result_error}")
    result_result = result.get("result")
    if not isinstance(result_result, dict):
        raise HelperException("tag2upload helper did not return a dict in the `result`")
    return result_result 
[docs]def _get_base64(d: Mapping, key: str) -> bytes:
    """
    retrieve `key` from the mapping and decode it as base64
    Raises:
        KeyError: when the requested key is not in the mapping
        ValueError: when the requested entry is not a string
    """
    encoded = d[key]
    if not isinstance(encoded, str):
        raise ValueError("value is not a str")
    return base64.standard_b64decode(encoded) 
[docs]def get_tag2upload_info_for_upload(
    upload: daklib.archive.ArchiveUpload,
) -> tuple[daklib.upload.HashedFile, TagInfo]:
    """
    extract tag2upload information for an upload
    """
    tag2upload_files = upload.changes.source_tag2upload_files
    if not tag2upload_files:
        raise ValueError("tag2upload: upload does not include tag2upload information")
    if len(tag2upload_files) > 1:
        raise ValueError("tag2upload: upload includes multiple tag2upload files")
    filename = tag2upload_files[0].filename
    path = os.path.join(upload.directory, filename)
    result = _call_helper(path)
    tag = _get_base64(result, "tag")
    if not tag:
        raise ValueError(f"{filename}: missing tag")
    signature = _get_base64(result, "signature")
    if not signature:
        raise ValueError(f"{filename}: missing signature")
    return tag2upload_files[0], verify_and_parse_git_tag(
        tag, signature, keyrings=upload.keyrings
    )