Coverage for daklib/tag2upload.py: 78%
133 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1"""
2functions related to handling uploads via tag2upload
4@contact: Debian FTP Master <ftpmaster@debian.org>
5@copyright: 2025, Ansgar <ansgar@debian.org>
6@license: GNU General Public License version 2 or later
7"""
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
19# You should have received a copy of the GNU General Public License
20# along with this program; if not, write to the Free Software
21# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23from __future__ import annotations
25import base64
26import json
27import os.path
28import re
29import subprocess
30from collections.abc import Collection, Mapping
31from dataclasses import dataclass
32from typing import TYPE_CHECKING, Union
34from daklib.config import Config
35from daklib.gpg import SignedFile
37if TYPE_CHECKING:
38 import daklib.archive
39 import daklib.upload
42@dataclass
43class Tag:
44 object: str
45 type: str
46 tag: str
47 tagger: str
48 message: str
51def _parse_tag(data: bytes) -> Tag:
52 """
53 parse a Git `tag`
54 """
56 header: dict[str, str] = {}
57 message = ""
59 lines = iter(data.decode("utf-8").split("\n"))
61 for line in lines: 61 ↛ 76line 61 didn't jump to line 76 because the loop on line 61 didn't complete
62 if not line:
63 # end of header, optional message follows
64 break
66 fields = line.split(" ", 1)
67 if len(fields) != 2: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 raise ValueError("Invalid header line.")
70 if fields[0] in header: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 raise ValueError(f"Repeated header field '{fields[0]}'")
73 header[fields[0]] = fields[1]
75 # According to man:git-mktag(1), a tag has exactly these header fields:
76 if header.keys() != {"object", "type", "tag", "tagger"}: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 raise ValueError("Invalid set of header fields.")
79 message = "\n".join(lines)
81 return Tag(
82 object=header["object"],
83 type=header["type"],
84 tag=header["tag"],
85 tagger=header["tagger"],
86 message=message,
87 )
90Metadata = dict[str, Union[str, bool]]
91_re_dgit_metadata = re.compile(r'\A\[dgit ([^"].*)\]\Z')
94def _dgit_metadata(message: str) -> Metadata:
95 """
96 extract dgit metadata from tag message
98 The metadata can be of the form `keyword` or `keyword=value`. For
99 metadata without a value, we store `True` in the returned dict;
100 otherwise the provided value is stored.
101 """
103 # See man:tag2upload(2) for the format.
104 info: Metadata = {}
106 for line in message.split("\n"):
107 m = _re_dgit_metadata.match(line)
108 if not m:
109 continue
111 for item in m.group(1).split():
112 kv = item.split("=", 1)
113 if kv[0] in info: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 raise ValueError(f"Key '{kv[0]}' seen twice.")
115 if len(kv) == 2:
116 info[kv[0]] = kv[1]
117 else:
118 info[kv[0]] = True
120 return info
123@dataclass
124class TagInfo:
125 tag: Tag
126 metadata: Metadata
127 signed_file: SignedFile
130def verify_and_parse_git_tag(
131 tag_contents: bytes, signature: bytes, *, keyrings: Collection[str]
132) -> TagInfo:
133 """
134 verify and parse information from Git tag and its detached signature
136 Raises:
137 daklib.gpg.GpgException: when signature is invalid
138 ValueError: when passed information is syntactically invalid
139 """
140 signed_file = SignedFile(
141 tag_contents, detached_signature=signature, keyrings=keyrings
142 )
143 tag = _parse_tag(signed_file.contents)
144 metadata = _dgit_metadata(tag.message)
146 if metadata.get("please-upload") is not True: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 raise ValueError("tag2upload: tag misses `please-upload`")
148 if not (distro := metadata.get("distro")): 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 raise ValueError("tag2upload: tag misses `distro`")
150 if not metadata.get("source"): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 raise ValueError("tag2upload: tag misses `source`")
152 if not (version := metadata.get("version")) or not isinstance(version, str): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 raise ValueError("tag2upload: tag misses `version`")
155 expected_tag = f"{distro}/{mangle_version_dep14(version)}"
156 if tag.tag != expected_tag: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 raise ValueError("tag2upload: tag name does not match expected value")
159 return TagInfo(tag=tag, metadata=metadata, signed_file=signed_file)
162@dataclass
163class GitTagInfo:
164 tag: str
165 fp: str
168def parse_git_tag_info(value: str) -> GitTagInfo:
169 """
170 parse Git-Tag-Info field from .changes files
171 """
173 info: dict[str, str] = {}
175 for item in value.split():
176 kv = item.split("=", 1)
177 if len(kv) != 2: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 raise ValueError("Item not in `key=value` form.")
179 if kv[0] in info: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 raise ValueError(f"Key '{kv[0]}' seen twice.")
181 info[kv[0]] = kv[1]
183 if not {"tag", "fp"} <= info.keys():
184 raise ValueError("Missing required key.")
186 return GitTagInfo(tag=info["tag"], fp=info["fp"])
189_re_mangle_dot = re.compile(r"\.(?=\.|(?:lock)?\Z)")
192def mangle_version_dep14(version: str) -> str:
193 """
194 mangle version according to DEP-14
195 """
197 # https://dep-team.pages.debian.net/deps/dep14/
198 version = version.translate({ord(":"): "%", ord("~"): "_"})
199 return _re_mangle_dot.sub(".#", version)
202class HelperException(Exception):
203 """
204 raised when the helper program returned an error or wrong information
205 """
208def _call_helper(path: str) -> dict:
209 config = Config()
210 cmd = config.get("Dinstall::Tag2UploadHelper", "").split()
211 if not cmd: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise HelperException("tag2upload helper not configured")
213 if unpriv_user := config.get("Dinstall::UnprivUser"): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 cmd = ["sudo", "-H", "-u", unpriv_user, *cmd]
215 cmd.extend(["--audit", "/dev/stdin"])
216 with open(path, "rb") as fh:
217 process = subprocess.run(cmd, stdin=fh, capture_output=True, check=True)
218 result = json.loads(process.stdout)
219 if not isinstance(result, dict): 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 raise HelperException("tag2upload helper did not return a dictionary")
221 if result_error := result.get("error"): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 raise HelperException(f"tag2upload helper reported an error: {result_error}")
223 result_result = result.get("result")
224 if not isinstance(result_result, dict): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 raise HelperException("tag2upload helper did not return a dict in the `result`")
226 return result_result
229def _get_base64(d: Mapping, key: str) -> bytes:
230 """
231 retrieve `key` from the mapping and decode it as base64
233 Raises:
234 KeyError: when the requested key is not in the mapping
235 ValueError: when the requested entry is not a string
236 """
237 encoded = d[key]
238 if not isinstance(encoded, str): 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 raise ValueError("value is not a str")
240 return base64.standard_b64decode(encoded)
243def get_tag2upload_info_for_upload(
244 upload: daklib.archive.ArchiveUpload,
245) -> tuple[daklib.upload.HashedFile, TagInfo]:
246 """
247 extract tag2upload information for an upload
248 """
249 tag2upload_files = upload.changes.source_tag2upload_files
250 if not tag2upload_files: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 raise ValueError("tag2upload: upload does not include tag2upload information")
252 if len(tag2upload_files) > 1: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 raise ValueError("tag2upload: upload includes multiple tag2upload files")
254 filename = tag2upload_files[0].filename
255 path = os.path.join(upload.directory, filename)
256 result = _call_helper(path)
257 tag = _get_base64(result, "tag")
258 if not tag: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 raise ValueError(f"{filename}: missing tag")
260 signature = _get_base64(result, "signature")
261 if not signature: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 raise ValueError(f"{filename}: missing signature")
263 return tag2upload_files[0], verify_and_parse_git_tag(
264 tag, signature, keyrings=upload.keyrings
265 )