1""" 

2functions related to handling uploads via tag2upload 

3 

4@contact: Debian FTP Master <ftpmaster@debian.org> 

5@copyright: 2025, Ansgar <ansgar@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9# This program is free software; you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation; either version 2 of the License, or 

12# (at your option) any later version. 

13 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18 

19# You should have received a copy of the GNU General Public License 

20# along with this program; if not, write to the Free Software 

21# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

22 

23from __future__ import annotations 

24 

25import base64 

26import json 

27import os.path 

28import re 

29import subprocess 

30from collections.abc import Iterable, Mapping 

31from dataclasses import dataclass 

32from typing import TYPE_CHECKING, Union 

33 

34from daklib.config import Config 

35from daklib.gpg import SignedFile 

36 

37if TYPE_CHECKING: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 import daklib.archive 

39 import daklib.upload 

40 

41 

42@dataclass 

43class Tag: 

44 object: str 

45 type: str 

46 tag: str 

47 tagger: str 

48 message: str 

49 

50 

51def _parse_tag(data: bytes) -> Tag: 

52 """ 

53 parse a Git `tag` 

54 """ 

55 

56 header: dict[str, str] = {} 

57 message = "" 

58 

59 lines = iter(data.decode("utf-8").split("\n")) 

60 

61 for line in lines: 61 ↛ 76line 61 didn't jump to line 76, because the loop on line 61 didn't complete

62 if not line: 

63 # end of header, optional message follows 

64 break 

65 

66 fields = line.split(" ", 1) 

67 if len(fields) != 2: 67 ↛ 68line 67 didn't jump to line 68, because the condition on line 67 was never true

68 raise ValueError("Invalid header line.") 

69 

70 if fields[0] in header: 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true

71 raise ValueError(f"Repeated header field '{fields[0]}'") 

72 

73 header[fields[0]] = fields[1] 

74 

75 # According to man:git-mktag(1), a tag has exactly these header fields: 

76 if header.keys() != {"object", "type", "tag", "tagger"}: 76 ↛ 77line 76 didn't jump to line 77, because the condition on line 76 was never true

77 raise ValueError("Invalid set of header fields.") 

78 

79 message = "\n".join(lines) 

80 

81 return Tag( 

82 object=header["object"], 

83 type=header["type"], 

84 tag=header["tag"], 

85 tagger=header["tagger"], 

86 message=message, 

87 ) 

88 

89 

90Metadata = dict[str, Union[str, bool]] 

91_re_dgit_metadata = re.compile(r'\A\[dgit ([^"].*)\]\Z') 

92 

93 

94def _dgit_metadata(message: str) -> Metadata: 

95 """ 

96 extract dgit metadata from tag message 

97 

98 The metadata can be of the form `keyword` or `keyword=value`. For 

99 metadata without a value, we store `True` in the returned dict; 

100 otherwise the provided value is stored. 

101 """ 

102 

103 # See man:tag2upload(2) for the format. 

104 info: Metadata = {} 

105 

106 for line in message.split("\n"): 

107 m = _re_dgit_metadata.match(line) 

108 if not m: 

109 continue 

110 

111 for item in m.group(1).split(): 

112 kv = item.split("=", 1) 

113 if kv[0] in info: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true

114 raise ValueError(f"Key '{kv[0]}' seen twice.") 

115 if len(kv) == 2: 

116 info[kv[0]] = kv[1] 

117 else: 

118 info[kv[0]] = True 

119 

120 return info 

121 

122 

123@dataclass 

124class TagInfo: 

125 tag: Tag 

126 metadata: Metadata 

127 signed_file: SignedFile 

128 

129 

130def verify_and_parse_git_tag( 

131 tag: bytes, signature: bytes, *, keyrings: Iterable[str] 

132) -> TagInfo: 

133 """ 

134 verify and parse information from Git tag and its detached signature 

135 

136 Raises: 

137 daklib.gpg.GpgException: when signature is invalid 

138 ValueError: when passed information is syntactically invalid 

139 """ 

140 signed_file = SignedFile(tag, detached_signature=signature, keyrings=keyrings) 

141 tag = _parse_tag(signed_file.contents) 

142 metadata = _dgit_metadata(tag.message) 

143 

144 if metadata.get("please-upload") is not True: 144 ↛ 145line 144 didn't jump to line 145, because the condition on line 144 was never true

145 raise ValueError("tag2upload: tag misses `please-upload`") 

146 if not (distro := metadata.get("distro")): 146 ↛ 147line 146 didn't jump to line 147, because the condition on line 146 was never true

147 raise ValueError("tag2upload: tag misses `distro`") 

148 if not metadata.get("source"): 148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true

149 raise ValueError("tag2upload: tag misses `source`") 

150 if not (version := metadata.get("version")): 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true

151 raise ValueError("tag2upload: tag misses `version`") 

152 

153 expected_tag = f"{distro}/{mangle_version_dep14(version)}" 

154 if tag.tag != expected_tag: 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true

155 raise ValueError("tag2upload: tag name does not match expected value") 

156 

157 return TagInfo(tag=tag, metadata=metadata, signed_file=signed_file) 

158 

159 

160@dataclass 

161class GitTagInfo: 

162 tag: str 

163 fp: str 

164 

165 

166def parse_git_tag_info(value: str) -> GitTagInfo: 

167 """ 

168 parse Git-Tag-Info field from .changes files 

169 """ 

170 

171 info: dict[str, str] = {} 

172 

173 for item in value.split(): 

174 kv = item.split("=", 1) 

175 if len(kv) != 2: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true

176 raise ValueError("Item not in `key=value` form.") 

177 if kv[0] in info: 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true

178 raise ValueError(f"Key '{kv[0]}' seen twice.") 

179 info[kv[0]] = kv[1] 

180 

181 if not {"tag", "fp"} <= info.keys(): 

182 raise ValueError("Missing required key.") 

183 

184 return GitTagInfo(tag=info["tag"], fp=info["fp"]) 

185 

186 

187_re_mangle_dot = re.compile(r"\.(?=\.|(?:lock)?\Z)") 

188 

189 

190def mangle_version_dep14(version: str) -> str: 

191 """ 

192 mangle version according to DEP-14 

193 """ 

194 

195 # https://dep-team.pages.debian.net/deps/dep14/ 

196 version = version.translate({ord(":"): "%", ord("~"): "_"}) 

197 return _re_mangle_dot.sub(".#", version) 

198 

199 

200class HelperException(Exception): 

201 """ 

202 raised when the helper program returned an error or wrong information 

203 """ 

204 

205 

206def _call_helper(path: str) -> dict: 

207 config = Config() 

208 cmd = config.get("Dinstall::Tag2UploadHelper").split() 

209 if not cmd: 209 ↛ 210line 209 didn't jump to line 210, because the condition on line 209 was never true

210 raise HelperException("tag2upload helper not configured") 

211 if unpriv_user := config.get("Dinstall::UnprivUser"): 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true

212 cmd = ["sudo", "-H", "-u", unpriv_user, *cmd] 

213 cmd.extend(["--audit", "/dev/stdin"]) 

214 with open(path, "rb") as fh: 

215 process = subprocess.run(cmd, stdin=fh, capture_output=True, check=True) 

216 result = json.loads(process.stdout) 

217 if not isinstance(result, dict): 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true

218 raise HelperException("tag2upload helper did not return a dictionary") 

219 if result_error := result.get("error"): 219 ↛ 220line 219 didn't jump to line 220, because the condition on line 219 was never true

220 raise HelperException(f"tag2upload helper reported an error: {result_error}") 

221 result_result = result.get("result") 

222 if not isinstance(result_result, dict): 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true

223 raise HelperException("tag2upload helper did not return a dict in the `result`") 

224 return result_result 

225 

226 

227def _get_base64(d: Mapping, key: str) -> bytes: 

228 """ 

229 retrieve `key` from the mapping and decode it as base64 

230 

231 Raises: 

232 KeyError: when the requested key is not in the mapping 

233 ValueError: when the requested entry is not a string 

234 """ 

235 encoded = d[key] 

236 if not isinstance(encoded, str): 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true

237 raise ValueError("value is not a str") 

238 return base64.standard_b64decode(encoded) 

239 

240 

241def get_tag2upload_info_for_upload( 

242 upload: daklib.archive.ArchiveUpload, 

243) -> tuple[daklib.upload.HashedFile, TagInfo]: 

244 """ 

245 extract tag2upload information for an upload 

246 """ 

247 tag2upload_files = upload.changes.source_tag2upload_files 

248 if not tag2upload_files: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true

249 raise ValueError("tag2upload: upload does not include tag2upload information") 

250 if len(tag2upload_files) > 1: 250 ↛ 251line 250 didn't jump to line 251, because the condition on line 250 was never true

251 raise ValueError("tag2upload: upload includes multiple tag2upload files") 

252 filename = tag2upload_files[0].filename 

253 path = os.path.join(upload.directory, filename) 

254 result = _call_helper(path) 

255 tag = _get_base64(result, "tag") 

256 if not tag: 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true

257 raise ValueError(f"{filename}: missing tag") 

258 signature = _get_base64(result, "signature") 

259 if not signature: 259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true

260 raise ValueError(f"{filename}: missing signature") 

261 return tag2upload_files[0], verify_and_parse_git_tag( 

262 tag, signature, keyrings=upload.keyrings 

263 )