Source code for daklib.tag2upload

"""
functions related to handling uploads via tag2upload

@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2025, Ansgar <ansgar@debian.org>
@license: GNU General Public License version 2 or later
"""

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

from __future__ import annotations

import base64
import json
import os.path
import re
import subprocess
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Union

from daklib.config import Config
from daklib.gpg import SignedFile

if TYPE_CHECKING:
    import daklib.archive
    import daklib.upload


[docs]@dataclass class Tag: object: str type: str tag: str tagger: str message: str
[docs]def _parse_tag(data: bytes) -> Tag: """ parse a Git `tag` """ header: dict[str, str] = {} message = "" lines = iter(data.decode("utf-8").split("\n")) for line in lines: if not line: # end of header, optional message follows break fields = line.split(" ", 1) if len(fields) != 2: raise ValueError("Invalid header line.") if fields[0] in header: raise ValueError(f"Repeated header field '{fields[0]}'") header[fields[0]] = fields[1] # According to man:git-mktag(1), a tag has exactly these header fields: if header.keys() != {"object", "type", "tag", "tagger"}: raise ValueError("Invalid set of header fields.") message = "\n".join(lines) return Tag( object=header["object"], type=header["type"], tag=header["tag"], tagger=header["tagger"], message=message, )
Metadata = dict[str, Union[str, bool]] _re_dgit_metadata = re.compile(r'\A\[dgit ([^"].*)\]\Z')
[docs]def _dgit_metadata(message: str) -> Metadata: """ extract dgit metadata from tag message The metadata can be of the form `keyword` or `keyword=value`. For metadata without a value, we store `True` in the returned dict; otherwise the provided value is stored. """ # See man:tag2upload(2) for the format. info: Metadata = {} for line in message.split("\n"): m = _re_dgit_metadata.match(line) if not m: continue for item in m.group(1).split(): kv = item.split("=", 1) if kv[0] in info: raise ValueError(f"Key '{kv[0]}' seen twice.") if len(kv) == 2: info[kv[0]] = kv[1] else: info[kv[0]] = True return info
[docs]@dataclass class TagInfo: tag: Tag metadata: Metadata signed_file: SignedFile
[docs]def verify_and_parse_git_tag( tag: bytes, signature: bytes, *, keyrings: Iterable[str] ) -> TagInfo: """ verify and parse information from Git tag and its detached signature Raises: daklib.gpg.GpgException: when signature is invalid ValueError: when passed information is syntactically invalid """ signed_file = SignedFile(tag, detached_signature=signature, keyrings=keyrings) tag = _parse_tag(signed_file.contents) metadata = _dgit_metadata(tag.message) if metadata.get("please-upload") is not True: raise ValueError("tag2upload: tag misses `please-upload`") if not (distro := metadata.get("distro")): raise ValueError("tag2upload: tag misses `distro`") if not metadata.get("source"): raise ValueError("tag2upload: tag misses `source`") if not (version := metadata.get("version")): raise ValueError("tag2upload: tag misses `version`") expected_tag = f"{distro}/{mangle_version_dep14(version)}" if tag.tag != expected_tag: raise ValueError("tag2upload: tag name does not match expected value") return TagInfo(tag=tag, metadata=metadata, signed_file=signed_file)
[docs]@dataclass class GitTagInfo: tag: str fp: str
[docs]def parse_git_tag_info(value: str) -> GitTagInfo: """ parse Git-Tag-Info field from .changes files """ info: dict[str, str] = {} for item in value.split(): kv = item.split("=", 1) if len(kv) != 2: raise ValueError("Item not in `key=value` form.") if kv[0] in info: raise ValueError(f"Key '{kv[0]}' seen twice.") info[kv[0]] = kv[1] if not {"tag", "fp"} <= info.keys(): raise ValueError("Missing required key.") return GitTagInfo(tag=info["tag"], fp=info["fp"])
_re_mangle_dot = re.compile(r"\.(?=\.|(?:lock)?\Z)")
[docs]def mangle_version_dep14(version: str) -> str: """ mangle version according to DEP-14 """ # https://dep-team.pages.debian.net/deps/dep14/ version = version.translate({ord(":"): "%", ord("~"): "_"}) return _re_mangle_dot.sub(".#", version)
[docs]class HelperException(Exception): """ raised when the helper program returned an error or wrong information """
[docs]def _call_helper(path: str) -> dict: config = Config() cmd = config.get("Dinstall::Tag2UploadHelper").split() if not cmd: raise HelperException("tag2upload helper not configured") if unpriv_user := config.get("Dinstall::UnprivUser"): cmd = ["sudo", "-H", "-u", unpriv_user, *cmd] cmd.extend(["--audit", "/dev/stdin"]) with open(path, "rb") as fh: process = subprocess.run(cmd, stdin=fh, capture_output=True, check=True) result = json.loads(process.stdout) if not isinstance(result, dict): raise HelperException("tag2upload helper did not return a dictionary") if result_error := result.get("error"): raise HelperException(f"tag2upload helper reported an error: {result_error}") result_result = result.get("result") if not isinstance(result_result, dict): raise HelperException("tag2upload helper did not return a dict in the `result`") return result_result
[docs]def _get_base64(d: Mapping, key: str) -> bytes: """ retrieve `key` from the mapping and decode it as base64 Raises: KeyError: when the requested key is not in the mapping ValueError: when the requested entry is not a string """ encoded = d[key] if not isinstance(encoded, str): raise ValueError("value is not a str") return base64.standard_b64decode(encoded)
[docs]def get_tag2upload_info_for_upload( upload: daklib.archive.ArchiveUpload, ) -> tuple[daklib.upload.HashedFile, TagInfo]: """ extract tag2upload information for an upload """ tag2upload_files = upload.changes.source_tag2upload_files if not tag2upload_files: raise ValueError("tag2upload: upload does not include tag2upload information") if len(tag2upload_files) > 1: raise ValueError("tag2upload: upload includes multiple tag2upload files") filename = tag2upload_files[0].filename path = os.path.join(upload.directory, filename) result = _call_helper(path) tag = _get_base64(result, "tag") if not tag: raise ValueError(f"{filename}: missing tag") signature = _get_base64(result, "signature") if not signature: raise ValueError(f"{filename}: missing signature") return tag2upload_files[0], verify_and_parse_git_tag( tag, signature, keyrings=upload.keyrings )