Coverage for daklib/tag2upload.py: 78%

133 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1""" 

2functions related to handling uploads via tag2upload 

3 

4@contact: Debian FTP Master <ftpmaster@debian.org> 

5@copyright: 2025, Ansgar <ansgar@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9# This program is free software; you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation; either version 2 of the License, or 

12# (at your option) any later version. 

13 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18 

19# You should have received a copy of the GNU General Public License 

20# along with this program; if not, write to the Free Software 

21# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

22 

23from __future__ import annotations 

24 

25import base64 

26import json 

27import os.path 

28import re 

29import subprocess 

30from collections.abc import Collection, Mapping 

31from dataclasses import dataclass 

32from typing import TYPE_CHECKING, Union 

33 

34from daklib.config import Config 

35from daklib.gpg import SignedFile 

36 

37if TYPE_CHECKING: 

38 import daklib.archive 

39 import daklib.upload 

40 

41 

42@dataclass 

43class Tag: 

44 object: str 

45 type: str 

46 tag: str 

47 tagger: str 

48 message: str 

49 

50 

51def _parse_tag(data: bytes) -> Tag: 

52 """ 

53 parse a Git `tag` 

54 """ 

55 

56 header: dict[str, str] = {} 

57 message = "" 

58 

59 lines = iter(data.decode("utf-8").split("\n")) 

60 

61 for line in lines: 61 ↛ 76line 61 didn't jump to line 76 because the loop on line 61 didn't complete

62 if not line: 

63 # end of header, optional message follows 

64 break 

65 

66 fields = line.split(" ", 1) 

67 if len(fields) != 2: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 raise ValueError("Invalid header line.") 

69 

70 if fields[0] in header: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 raise ValueError(f"Repeated header field '{fields[0]}'") 

72 

73 header[fields[0]] = fields[1] 

74 

75 # According to man:git-mktag(1), a tag has exactly these header fields: 

76 if header.keys() != {"object", "type", "tag", "tagger"}: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 raise ValueError("Invalid set of header fields.") 

78 

79 message = "\n".join(lines) 

80 

81 return Tag( 

82 object=header["object"], 

83 type=header["type"], 

84 tag=header["tag"], 

85 tagger=header["tagger"], 

86 message=message, 

87 ) 

88 

89 

90Metadata = dict[str, Union[str, bool]] 

91_re_dgit_metadata = re.compile(r'\A\[dgit ([^"].*)\]\Z') 

92 

93 

94def _dgit_metadata(message: str) -> Metadata: 

95 """ 

96 extract dgit metadata from tag message 

97 

98 The metadata can be of the form `keyword` or `keyword=value`. For 

99 metadata without a value, we store `True` in the returned dict; 

100 otherwise the provided value is stored. 

101 """ 

102 

103 # See man:tag2upload(2) for the format. 

104 info: Metadata = {} 

105 

106 for line in message.split("\n"): 

107 m = _re_dgit_metadata.match(line) 

108 if not m: 

109 continue 

110 

111 for item in m.group(1).split(): 

112 kv = item.split("=", 1) 

113 if kv[0] in info: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 raise ValueError(f"Key '{kv[0]}' seen twice.") 

115 if len(kv) == 2: 

116 info[kv[0]] = kv[1] 

117 else: 

118 info[kv[0]] = True 

119 

120 return info 

121 

122 

123@dataclass 

124class TagInfo: 

125 tag: Tag 

126 metadata: Metadata 

127 signed_file: SignedFile 

128 

129 

130def verify_and_parse_git_tag( 

131 tag_contents: bytes, signature: bytes, *, keyrings: Collection[str] 

132) -> TagInfo: 

133 """ 

134 verify and parse information from Git tag and its detached signature 

135 

136 Raises: 

137 daklib.gpg.GpgException: when signature is invalid 

138 ValueError: when passed information is syntactically invalid 

139 """ 

140 signed_file = SignedFile( 

141 tag_contents, detached_signature=signature, keyrings=keyrings 

142 ) 

143 tag = _parse_tag(signed_file.contents) 

144 metadata = _dgit_metadata(tag.message) 

145 

146 if metadata.get("please-upload") is not True: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 raise ValueError("tag2upload: tag misses `please-upload`") 

148 if not (distro := metadata.get("distro")): 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 raise ValueError("tag2upload: tag misses `distro`") 

150 if not metadata.get("source"): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 raise ValueError("tag2upload: tag misses `source`") 

152 if not (version := metadata.get("version")) or not isinstance(version, str): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 raise ValueError("tag2upload: tag misses `version`") 

154 

155 expected_tag = f"{distro}/{mangle_version_dep14(version)}" 

156 if tag.tag != expected_tag: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 raise ValueError("tag2upload: tag name does not match expected value") 

158 

159 return TagInfo(tag=tag, metadata=metadata, signed_file=signed_file) 

160 

161 

162@dataclass 

163class GitTagInfo: 

164 tag: str 

165 fp: str 

166 

167 

168def parse_git_tag_info(value: str) -> GitTagInfo: 

169 """ 

170 parse Git-Tag-Info field from .changes files 

171 """ 

172 

173 info: dict[str, str] = {} 

174 

175 for item in value.split(): 

176 kv = item.split("=", 1) 

177 if len(kv) != 2: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 raise ValueError("Item not in `key=value` form.") 

179 if kv[0] in info: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 raise ValueError(f"Key '{kv[0]}' seen twice.") 

181 info[kv[0]] = kv[1] 

182 

183 if not {"tag", "fp"} <= info.keys(): 

184 raise ValueError("Missing required key.") 

185 

186 return GitTagInfo(tag=info["tag"], fp=info["fp"]) 

187 

188 

189_re_mangle_dot = re.compile(r"\.(?=\.|(?:lock)?\Z)") 

190 

191 

192def mangle_version_dep14(version: str) -> str: 

193 """ 

194 mangle version according to DEP-14 

195 """ 

196 

197 # https://dep-team.pages.debian.net/deps/dep14/ 

198 version = version.translate({ord(":"): "%", ord("~"): "_"}) 

199 return _re_mangle_dot.sub(".#", version) 

200 

201 

202class HelperException(Exception): 

203 """ 

204 raised when the helper program returned an error or wrong information 

205 """ 

206 

207 

208def _call_helper(path: str) -> dict: 

209 config = Config() 

210 cmd = config.get("Dinstall::Tag2UploadHelper", "").split() 

211 if not cmd: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise HelperException("tag2upload helper not configured") 

213 if unpriv_user := config.get("Dinstall::UnprivUser"): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 cmd = ["sudo", "-H", "-u", unpriv_user, *cmd] 

215 cmd.extend(["--audit", "/dev/stdin"]) 

216 with open(path, "rb") as fh: 

217 process = subprocess.run(cmd, stdin=fh, capture_output=True, check=True) 

218 result = json.loads(process.stdout) 

219 if not isinstance(result, dict): 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 raise HelperException("tag2upload helper did not return a dictionary") 

221 if result_error := result.get("error"): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 raise HelperException(f"tag2upload helper reported an error: {result_error}") 

223 result_result = result.get("result") 

224 if not isinstance(result_result, dict): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 raise HelperException("tag2upload helper did not return a dict in the `result`") 

226 return result_result 

227 

228 

229def _get_base64(d: Mapping, key: str) -> bytes: 

230 """ 

231 retrieve `key` from the mapping and decode it as base64 

232 

233 Raises: 

234 KeyError: when the requested key is not in the mapping 

235 ValueError: when the requested entry is not a string 

236 """ 

237 encoded = d[key] 

238 if not isinstance(encoded, str): 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 raise ValueError("value is not a str") 

240 return base64.standard_b64decode(encoded) 

241 

242 

243def get_tag2upload_info_for_upload( 

244 upload: daklib.archive.ArchiveUpload, 

245) -> tuple[daklib.upload.HashedFile, TagInfo]: 

246 """ 

247 extract tag2upload information for an upload 

248 """ 

249 tag2upload_files = upload.changes.source_tag2upload_files 

250 if not tag2upload_files: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 raise ValueError("tag2upload: upload does not include tag2upload information") 

252 if len(tag2upload_files) > 1: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 raise ValueError("tag2upload: upload includes multiple tag2upload files") 

254 filename = tag2upload_files[0].filename 

255 path = os.path.join(upload.directory, filename) 

256 result = _call_helper(path) 

257 tag = _get_base64(result, "tag") 

258 if not tag: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 raise ValueError(f"{filename}: missing tag") 

260 signature = _get_base64(result, "signature") 

261 if not signature: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 raise ValueError(f"{filename}: missing signature") 

263 return tag2upload_files[0], verify_and_parse_git_tag( 

264 tag, signature, keyrings=upload.keyrings 

265 )