1"""
2functions related to handling uploads via tag2upload
4@contact: Debian FTP Master <ftpmaster@debian.org>
5@copyright: 2025, Ansgar <ansgar@debian.org>
6@license: GNU General Public License version 2 or later
7"""
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23from __future__ import annotations
25import base64
26import json
27import os.path
28import re
29import subprocess
30from collections.abc import Iterable, Mapping
31from dataclasses import dataclass
32from typing import TYPE_CHECKING, Union
34from daklib.config import Config
35from daklib.gpg import SignedFile
37if TYPE_CHECKING: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true
38 import daklib.archive
39 import daklib.upload
42@dataclass
43class Tag:
44 object: str
45 type: str
46 tag: str
47 tagger: str
48 message: str
51def _parse_tag(data: bytes) -> Tag:
52 """
53 parse a Git `tag`
54 """
56 header: dict[str, str] = {}
57 message = ""
59 lines = iter(data.decode("utf-8").split("\n"))
61 for line in lines: 61 ↛ 76line 61 didn't jump to line 76, because the loop on line 61 didn't complete
62 if not line:
63 # end of header, optional message follows
64 break
66 fields = line.split(" ", 1)
67 if len(fields) != 2: 67 ↛ 68line 67 didn't jump to line 68, because the condition on line 67 was never true
68 raise ValueError("Invalid header line.")
70 if fields[0] in header: 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true
71 raise ValueError(f"Repeated header field '{fields[0]}'")
73 header[fields[0]] = fields[1]
75 # According to man:git-mktag(1), a tag has exactly these header fields:
76 if header.keys() != {"object", "type", "tag", "tagger"}: 76 ↛ 77line 76 didn't jump to line 77, because the condition on line 76 was never true
77 raise ValueError("Invalid set of header fields.")
79 message = "\n".join(lines)
81 return Tag(
82 object=header["object"],
83 type=header["type"],
84 tag=header["tag"],
85 tagger=header["tagger"],
86 message=message,
87 )
90Metadata = dict[str, Union[str, bool]]
91_re_dgit_metadata = re.compile(r'\A\[dgit ([^"].*)\]\Z')
94def _dgit_metadata(message: str) -> Metadata:
95 """
96 extract dgit metadata from tag message
98 The metadata can be of the form `keyword` or `keyword=value`. For
99 metadata without a value, we store `True` in the returned dict;
100 otherwise the provided value is stored.
101 """
103 # See man:tag2upload(2) for the format.
104 info: Metadata = {}
106 for line in message.split("\n"):
107 m = _re_dgit_metadata.match(line)
108 if not m:
109 continue
111 for item in m.group(1).split():
112 kv = item.split("=", 1)
113 if kv[0] in info: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true
114 raise ValueError(f"Key '{kv[0]}' seen twice.")
115 if len(kv) == 2:
116 info[kv[0]] = kv[1]
117 else:
118 info[kv[0]] = True
120 return info
123@dataclass
124class TagInfo:
125 tag: Tag
126 metadata: Metadata
127 signed_file: SignedFile
130def verify_and_parse_git_tag(
131 tag: bytes, signature: bytes, *, keyrings: Iterable[str]
132) -> TagInfo:
133 """
134 verify and parse information from Git tag and its detached signature
136 Raises:
137 daklib.gpg.GpgException: when signature is invalid
138 ValueError: when passed information is syntactically invalid
139 """
140 signed_file = SignedFile(tag, detached_signature=signature, keyrings=keyrings)
141 tag = _parse_tag(signed_file.contents)
142 metadata = _dgit_metadata(tag.message)
144 if metadata.get("please-upload") is not True: 144 ↛ 145line 144 didn't jump to line 145, because the condition on line 144 was never true
145 raise ValueError("tag2upload: tag misses `please-upload`")
146 if not (distro := metadata.get("distro")): 146 ↛ 147line 146 didn't jump to line 147, because the condition on line 146 was never true
147 raise ValueError("tag2upload: tag misses `distro`")
148 if not metadata.get("source"): 148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true
149 raise ValueError("tag2upload: tag misses `source`")
150 if not (version := metadata.get("version")): 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true
151 raise ValueError("tag2upload: tag misses `version`")
153 expected_tag = f"{distro}/{mangle_version_dep14(version)}"
154 if tag.tag != expected_tag: 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true
155 raise ValueError("tag2upload: tag name does not match expected value")
157 return TagInfo(tag=tag, metadata=metadata, signed_file=signed_file)
160@dataclass
161class GitTagInfo:
162 tag: str
163 fp: str
166def parse_git_tag_info(value: str) -> GitTagInfo:
167 """
168 parse Git-Tag-Info field from .changes files
169 """
171 info: dict[str, str] = {}
173 for item in value.split():
174 kv = item.split("=", 1)
175 if len(kv) != 2: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true
176 raise ValueError("Item not in `key=value` form.")
177 if kv[0] in info: 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true
178 raise ValueError(f"Key '{kv[0]}' seen twice.")
179 info[kv[0]] = kv[1]
181 if not {"tag", "fp"} <= info.keys():
182 raise ValueError("Missing required key.")
184 return GitTagInfo(tag=info["tag"], fp=info["fp"])
187_re_mangle_dot = re.compile(r"\.(?=\.|(?:lock)?\Z)")
190def mangle_version_dep14(version: str) -> str:
191 """
192 mangle version according to DEP-14
193 """
195 # https://dep-team.pages.debian.net/deps/dep14/
196 version = version.translate({ord(":"): "%", ord("~"): "_"})
197 return _re_mangle_dot.sub(".#", version)
200class HelperException(Exception):
201 """
202 raised when the helper program returned an error or wrong information
203 """
206def _call_helper(path: str) -> dict:
207 config = Config()
208 cmd = config.get("Dinstall::Tag2UploadHelper").split()
209 if not cmd: 209 ↛ 210line 209 didn't jump to line 210, because the condition on line 209 was never true
210 raise HelperException("tag2upload helper not configured")
211 if unpriv_user := config.get("Dinstall::UnprivUser"): 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true
212 cmd = ["sudo", "-H", "-u", unpriv_user, *cmd]
213 cmd.extend(["--audit", "/dev/stdin"])
214 with open(path, "rb") as fh:
215 process = subprocess.run(cmd, stdin=fh, capture_output=True, check=True)
216 result = json.loads(process.stdout)
217 if not isinstance(result, dict): 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true
218 raise HelperException("tag2upload helper did not return a dictionary")
219 if result_error := result.get("error"): 219 ↛ 220line 219 didn't jump to line 220, because the condition on line 219 was never true
220 raise HelperException(f"tag2upload helper reported an error: {result_error}")
221 result_result = result.get("result")
222 if not isinstance(result_result, dict): 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true
223 raise HelperException("tag2upload helper did not return a dict in the `result`")
224 return result_result
227def _get_base64(d: Mapping, key: str) -> bytes:
228 """
229 retrieve `key` from the mapping and decode it as base64
231 Raises:
232 KeyError: when the requested key is not in the mapping
233 ValueError: when the requested entry is not a string
234 """
235 encoded = d[key]
236 if not isinstance(encoded, str): 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true
237 raise ValueError("value is not a str")
238 return base64.standard_b64decode(encoded)
241def get_tag2upload_info_for_upload(
242 upload: daklib.archive.ArchiveUpload,
243) -> tuple[daklib.upload.HashedFile, TagInfo]:
244 """
245 extract tag2upload information for an upload
246 """
247 tag2upload_files = upload.changes.source_tag2upload_files
248 if not tag2upload_files: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true
249 raise ValueError("tag2upload: upload does not include tag2upload information")
250 if len(tag2upload_files) > 1: 250 ↛ 251line 250 didn't jump to line 251, because the condition on line 250 was never true
251 raise ValueError("tag2upload: upload includes multiple tag2upload files")
252 filename = tag2upload_files[0].filename
253 path = os.path.join(upload.directory, filename)
254 result = _call_helper(path)
255 tag = _get_base64(result, "tag")
256 if not tag: 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true
257 raise ValueError(f"{filename}: missing tag")
258 signature = _get_base64(result, "signature")
259 if not signature: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true
260 raise ValueError(f"{filename}: missing signature")
261 return tag2upload_files[0], verify_and_parse_git_tag(
262 tag, signature, keyrings=upload.keyrings
263 )