"""
functions related to handling uploads via tag2upload
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2025, Ansgar <ansgar@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from __future__ import annotations
import base64
import json
import os.path
import re
import subprocess
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Union
from daklib.config import Config
from daklib.gpg import SignedFile
if TYPE_CHECKING:
import daklib.archive
import daklib.upload
[docs]@dataclass
class Tag:
object: str
type: str
tag: str
tagger: str
message: str
[docs]def _parse_tag(data: bytes) -> Tag:
"""
parse a Git `tag`
"""
header: dict[str, str] = {}
message = ""
lines = iter(data.decode("utf-8").split("\n"))
for line in lines:
if not line:
# end of header, optional message follows
break
fields = line.split(" ", 1)
if len(fields) != 2:
raise ValueError("Invalid header line.")
if fields[0] in header:
raise ValueError(f"Repeated header field '{fields[0]}'")
header[fields[0]] = fields[1]
# According to man:git-mktag(1), a tag has exactly these header fields:
if header.keys() != {"object", "type", "tag", "tagger"}:
raise ValueError("Invalid set of header fields.")
message = "\n".join(lines)
return Tag(
object=header["object"],
type=header["type"],
tag=header["tag"],
tagger=header["tagger"],
message=message,
)
Metadata = dict[str, Union[str, bool]]
_re_dgit_metadata = re.compile(r'\A\[dgit ([^"].*)\]\Z')
[docs]@dataclass
class TagInfo:
tag: Tag
metadata: Metadata
signed_file: SignedFile
[docs]def verify_and_parse_git_tag(
tag: bytes, signature: bytes, *, keyrings: Iterable[str]
) -> TagInfo:
"""
verify and parse information from Git tag and its detached signature
Raises:
daklib.gpg.GpgException: when signature is invalid
ValueError: when passed information is syntactically invalid
"""
signed_file = SignedFile(tag, detached_signature=signature, keyrings=keyrings)
tag = _parse_tag(signed_file.contents)
metadata = _dgit_metadata(tag.message)
if metadata.get("please-upload") is not True:
raise ValueError("tag2upload: tag misses `please-upload`")
if not (distro := metadata.get("distro")):
raise ValueError("tag2upload: tag misses `distro`")
if not metadata.get("source"):
raise ValueError("tag2upload: tag misses `source`")
if not (version := metadata.get("version")):
raise ValueError("tag2upload: tag misses `version`")
expected_tag = f"{distro}/{mangle_version_dep14(version)}"
if tag.tag != expected_tag:
raise ValueError("tag2upload: tag name does not match expected value")
return TagInfo(tag=tag, metadata=metadata, signed_file=signed_file)
[docs]@dataclass
class GitTagInfo:
tag: str
fp: str
[docs]def parse_git_tag_info(value: str) -> GitTagInfo:
"""
parse Git-Tag-Info field from .changes files
"""
info: dict[str, str] = {}
for item in value.split():
kv = item.split("=", 1)
if len(kv) != 2:
raise ValueError("Item not in `key=value` form.")
if kv[0] in info:
raise ValueError(f"Key '{kv[0]}' seen twice.")
info[kv[0]] = kv[1]
if not {"tag", "fp"} <= info.keys():
raise ValueError("Missing required key.")
return GitTagInfo(tag=info["tag"], fp=info["fp"])
_re_mangle_dot = re.compile(r"\.(?=\.|(?:lock)?\Z)")
[docs]def mangle_version_dep14(version: str) -> str:
"""
mangle version according to DEP-14
"""
# https://dep-team.pages.debian.net/deps/dep14/
version = version.translate({ord(":"): "%", ord("~"): "_"})
return _re_mangle_dot.sub(".#", version)
[docs]class HelperException(Exception):
"""
raised when the helper program returned an error or wrong information
"""
[docs]def _call_helper(path: str) -> dict:
config = Config()
cmd = config.get("Dinstall::Tag2UploadHelper").split()
if not cmd:
raise HelperException("tag2upload helper not configured")
if unpriv_user := config.get("Dinstall::UnprivUser"):
cmd = ["sudo", "-H", "-u", unpriv_user, *cmd]
cmd.extend(["--audit", "/dev/stdin"])
with open(path, "rb") as fh:
process = subprocess.run(cmd, stdin=fh, capture_output=True, check=True)
result = json.loads(process.stdout)
if not isinstance(result, dict):
raise HelperException("tag2upload helper did not return a dictionary")
if result_error := result.get("error"):
raise HelperException(f"tag2upload helper reported an error: {result_error}")
result_result = result.get("result")
if not isinstance(result_result, dict):
raise HelperException("tag2upload helper did not return a dict in the `result`")
return result_result
[docs]def _get_base64(d: Mapping, key: str) -> bytes:
"""
retrieve `key` from the mapping and decode it as base64
Raises:
KeyError: when the requested key is not in the mapping
ValueError: when the requested entry is not a string
"""
encoded = d[key]
if not isinstance(encoded, str):
raise ValueError("value is not a str")
return base64.standard_b64decode(encoded)
[docs]def get_tag2upload_info_for_upload(
upload: daklib.archive.ArchiveUpload,
) -> tuple[daklib.upload.HashedFile, TagInfo]:
"""
extract tag2upload information for an upload
"""
tag2upload_files = upload.changes.source_tag2upload_files
if not tag2upload_files:
raise ValueError("tag2upload: upload does not include tag2upload information")
if len(tag2upload_files) > 1:
raise ValueError("tag2upload: upload includes multiple tag2upload files")
filename = tag2upload_files[0].filename
path = os.path.join(upload.directory, filename)
result = _call_helper(path)
tag = _get_base64(result, "tag")
if not tag:
raise ValueError(f"{filename}: missing tag")
signature = _get_base64(result, "signature")
if not signature:
raise ValueError(f"{filename}: missing signature")
return tag2upload_files[0], verify_and_parse_git_tag(
tag, signature, keyrings=upload.keyrings
)