1"""Utilities for signed files 

2 

3@contact: Debian FTP Master <ftpmaster@debian.org> 

4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org> 

5@license: GNU General Public License version 2 or later 

6""" 

7 

8# This program is free software; you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation; either version 2 of the License, or 

11# (at your option) any later version. 

12 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17 

18# You should have received a copy of the GNU General Public License 

19# along with this program; if not, write to the Free Software 

20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

21 

22import apt_pkg 

23import datetime 

24import fcntl 

25import os 

26import select 

27import subprocess 

28from collections.abc import Iterable 

29 

30try: 

31 _MAXFD = os.sysconf("SC_OPEN_MAX") 

32except: 

33 _MAXFD = 256 

34 

35 

36class GpgException(Exception): 

37 pass 

38 

39 

40class _Pipe: 

41 """context manager for pipes 

42 

43 Note: When the pipe is closed by other means than the close_r and close_w 

44 methods, you have to set self.r (self.w) to None. 

45 """ 

46 

47 def __enter__(self): 

48 (self.r, self.w) = os.pipe() 

49 return self 

50 

51 def __exit__(self, type, value, traceback): 

52 self.close_w() 

53 self.close_r() 

54 return False 

55 

56 def close_r(self): 

57 """close reading side of the pipe""" 

58 if self.r: 

59 os.close(self.r) 

60 self.r = None 

61 

62 def close_w(self): 

63 """close writing part of the pipe""" 

64 if self.w: 

65 os.close(self.w) 

66 self.w = None 

67 

68 

69# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9) 

70def waitstatus_to_exitcode(status): 

71 if os.WIFEXITED(status): 71 ↛ 73line 71 didn't jump to line 73, because the condition on line 71 was never false

72 return os.WEXITSTATUS(status) 

73 elif os.WIFSIGNALED(status): 

74 return -os.WTERMSIG(status) 

75 else: 

76 raise ValueError(f"Unexpected status code '{status}'") 

77 

78 

79class SignedFile: 

80 """handle files signed with PGP 

81 

82 The following attributes are available: 

83 contents - byte-string with the content (after removing PGP armor) 

84 valid - Boolean indicating a valid signature was found 

85 weak_signature - signature uses a weak algorithm (e.g. SHA-1) 

86 fingerprint - fingerprint of the key used for signing 

87 primary_fingerprint - fingerprint of the primary key associated to the key used for signing 

88 """ 

89 

90 def __init__(self, data: bytes, keyrings: Iterable[str], require_signature: bool = True, gpg: str = "/usr/bin/gpg"): 

91 """ 

92 :param data: byte-string containing the message 

93 :param keyrings: sequence of keyrings 

94 :param require_signature: if True (the default), will raise an exception if no valid signature was found 

95 :param gpg: location of the gpg binary 

96 """ 

97 self.gpg = gpg 

98 self.keyrings = keyrings 

99 

100 self.valid: bool = False #: valid signature 

101 self.expired = False 

102 self.invalid = False 

103 self.weak_signature = False 

104 self.fingerprints: list[str] = [] 

105 self.primary_fingerprints: list[str] = [] 

106 self.signature_ids: list[str] = [] 

107 

108 self._verify(data, require_signature) 

109 

110 @property 

111 def fingerprint(self) -> str: 

112 """fingerprint of the (sub)key used for the signature""" 

113 assert len(self.fingerprints) == 1 

114 return self.fingerprints[0] 

115 

116 @property 

117 def primary_fingerprint(self) -> str: 

118 """fingerprint of the primary key used for the signature""" 

119 assert len(self.primary_fingerprints) == 1 

120 return self.primary_fingerprints[0] 

121 

122 @property 

123 def signature_id(self): 

124 assert len(self.signature_ids) == 1 

125 return self.signature_ids[0] 

126 

127 def _verify(self, data, require_signature): 

128 with _Pipe() as stdin, \ 

129 _Pipe() as contents, \ 

130 _Pipe() as status, \ 

131 _Pipe() as stderr: 

132 pid = os.fork() 

133 if pid == 0: 133 ↛ 134line 133 didn't jump to line 134, because the condition on line 133 was never true

134 self._exec_gpg(stdin.r, contents.w, stderr.w, status.w) 

135 else: 

136 stdin.close_r() 

137 contents.close_w() 

138 stderr.close_w() 

139 status.close_w() 

140 

141 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data}) 

142 stdin.w = None # was closed by _do_io 

143 

144 (pid_, status_code, usage_) = os.wait4(pid, 0) 

145 if pid_ != pid: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true

146 raise Exception(f"wait4() waited for pid {pid_}, but we expected {pid}") 

147 exit_code = waitstatus_to_exitcode(status_code) 

148 

149 self.contents = read[contents.r] 

150 self.status = read[status.r] 

151 self.stderr = read[stderr.r] 

152 

153 if self.status == b"": 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true

154 stderr = self.stderr.decode('ascii', errors='replace') 

155 raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr)) 

156 

157 # gpg exits with 0 (no error), 1 (at least one invalid sig), 

158 # or anything else (fatal error). Even though status 2 

159 # indicates a fatal error, we still allow it as it is also 

160 # returned when the public key is not known. 

161 if exit_code not in (0, 1, 2): 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true

162 stderr = self.stderr.decode('ascii', errors='replace') 

163 raise GpgException(f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}") 

164 

165 for line in self.status.splitlines(): 

166 self._parse_status(line) 

167 

168 if self.invalid: 

169 self.valid = False 

170 

171 if require_signature and not self.valid: 

172 stderr = self.stderr.decode('ascii', errors='replace') 

173 raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr)) 

174 

175 assert len(self.fingerprints) == len(self.primary_fingerprints) 

176 assert len(self.fingerprints) == len(self.signature_ids) 

177 

178 def _do_io(self, read, write): 

179 for fd in write: 

180 old = fcntl.fcntl(fd, fcntl.F_GETFL) 

181 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK) 

182 

183 read_lines = dict((fd, []) for fd in read) 

184 write_pos = dict((fd, 0) for fd in write) 

185 

186 read_set = list(read) 

187 write_set = list(write) 

188 while len(read_set) > 0 or len(write_set) > 0: 

189 r, w, x_ = select.select(read_set, write_set, ()) 

190 for fd in r: 

191 data = os.read(fd, 4096) 

192 if len(data) == 0: 

193 read_set.remove(fd) 

194 else: 

195 read_lines[fd].append(data) 

196 for fd in w: 

197 data = write[fd][write_pos[fd]:] 

198 if len(data) == 0: 

199 os.close(fd) 

200 write_set.remove(fd) 

201 else: 

202 bytes_written = os.write(fd, data) 

203 write_pos[fd] += bytes_written 

204 

205 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines) 

206 

207 def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime: 

208 """parse timestamp in GnuPG's format 

209 

210 :return: datetime object for the given timestamp 

211 """ 

212 # The old implementation did only return the date. As we already 

213 # used this for replay production, return the legacy value for 

214 # old signatures. 

215 if datestring is not None: 215 ↛ 222line 215 didn't jump to line 222, because the condition on line 215 was never false

216 year, month, day = datestring.split(b'-') 

217 date = datetime.date(int(year), int(month), int(day)) 

218 time = datetime.time(0, 0) 

219 if date < datetime.date(2014, 8, 4): 

220 return datetime.datetime.combine(date, time) 

221 

222 if b'T' in timestamp: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true

223 raise Exception('No support for ISO 8601 timestamps.') 

224 return datetime.datetime.utcfromtimestamp(int(timestamp)) 

225 

226 def _parse_status(self, line): 

227 fields = line.split() 

228 if fields[0] != b"[GNUPG:]": 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true

229 raise GpgException("Unexpected output on status-fd: %s" % line) 

230 

231 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp> 

232 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo> 

233 # <hash-algo> <sig-class> <primary-key-fpr> 

234 if fields[1] == b"VALIDSIG": 

235 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, 

236 # which Debian 8 does not yet include. We want to make sure 

237 # to not accept uploads covered by a MD5-based signature. 

238 # RFC 4880, table 9.4: 

239 # 1 - MD5 

240 # 2 - SHA-1 

241 # 3 - RIPE-MD/160 

242 if fields[9] == b"1": 242 ↛ 243line 242 didn't jump to line 243, because the condition on line 242 was never true

243 raise GpgException("Digest algorithm MD5 is not trusted.") 

244 if fields[9] in (b"2", b"3"): 

245 self.weak_signature = True 

246 

247 self.valid = True 

248 self.fingerprints.append(fields[2].decode('ascii')) 

249 self.primary_fingerprints.append(fields[11].decode('ascii')) 

250 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) 

251 

252 elif fields[1] == b"BADARMOR": 

253 raise GpgException("Bad armor.") 

254 

255 elif fields[1] == b"NODATA": 

256 raise GpgException("No data.") 

257 

258 elif fields[1] == b"DECRYPTION_FAILED": 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true

259 raise GpgException("Decryption failed.") 

260 

261 elif fields[1] == b"ERROR": 261 ↛ 262line 261 didn't jump to line 262, because the condition on line 261 was never true

262 f2 = fields[2].decode('ascii', errors='replace') 

263 f3 = fields[3].decode('ascii', errors='replace') 

264 raise GpgException("Other error: %s %s" % (f2, f3)) 

265 

266 elif fields[1] == b"SIG_ID": 

267 self.signature_ids.append(fields[2]) 

268 

269 elif fields[1] in (b'PLAINTEXT', b'GOODSIG', b'KEY_CONSIDERED', 

270 b'NEWSIG', b'NOTATION_NAME', b'NOTATION_FLAGS', 

271 b'NOTATION_DATA', b'SIGEXPIRED', b'KEYEXPIRED', 

272 b'POLICY_URL', b'PROGRESS', b'VERIFICATION_COMPLIANCE_MODE'): 

273 pass 

274 

275 elif fields[1] in (b'EXPSIG', b'EXPKEYSIG'): 

276 self.expired = True 

277 self.invalid = True 

278 

279 elif fields[1] in (b'REVKEYSIG', b'BADSIG', b'ERRSIG', b'KEYREVOKED', b'NO_PUBKEY'): 

280 self.invalid = True 

281 

282 else: 

283 field = fields[1].decode('ascii', errors='replace') 

284 raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(field)) 

285 

286 def _exec_gpg(self, stdin, stdout, stderr, statusfd): 

287 try: 

288 if stdin != 0: 

289 os.dup2(stdin, 0) 

290 if stdout != 1: 

291 os.dup2(stdout, 1) 

292 if stderr != 2: 

293 os.dup2(stderr, 2) 

294 if statusfd != 3: 

295 os.dup2(statusfd, 3) 

296 for fd in range(4): 

297 old = fcntl.fcntl(fd, fcntl.F_GETFD) 

298 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) 

299 os.closerange(4, _MAXFD) 

300 

301 args = [self.gpg, 

302 "--status-fd=3", 

303 "--no-default-keyring", 

304 "--batch", 

305 "--no-tty", 

306 "--trust-model", "always", 

307 "--fixed-list-mode"] 

308 for k in self.keyrings: 

309 args.extend(["--keyring", k]) 

310 args.extend(["--decrypt", "-"]) 

311 

312 os.execvp(self.gpg, args) 

313 finally: 

314 try: 

315 print("Failed to execute gpg.", file=sys.stderr) 

316 sys.stderr.flush() 

317 except: 

318 # Ignore errors, we want to reach the `exit` call below. 

319 pass 

320 os._exit(3) 

321 

322 @property 

323 def contents_sha1(self) -> str: 

324 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined] 

325 

326 

327def sign(infile, outfile=None, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None, *, digest_algorithm="SHA256"): 

328 args = [ 

329 '/usr/bin/gpg', 

330 '--no-options', '--no-tty', '--batch', '--armour', 

331 '--personal-digest-preferences', digest_algorithm, 

332 ] 

333 

334 for keyid in keyids: 

335 args.extend(['--local-user', keyid]) 

336 if pubring is not None: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true

337 args.extend(['--keyring', pubring]) 

338 if secring is not None: 338 ↛ 339line 338 didn't jump to line 339, because the condition on line 338 was never true

339 args.extend(['--secret-keyring', secring]) 

340 if homedir is not None: 340 ↛ 342line 340 didn't jump to line 342, because the condition on line 340 was never false

341 args.extend(['--homedir', homedir]) 

342 if passphrase_file is not None: 342 ↛ 343line 342 didn't jump to line 343, because the condition on line 342 was never true

343 args.extend(['--pinentry-mode', 'loopback', 

344 '--passphrase-file', passphrase_file]) 

345 

346 args.append('--clearsign' if inline else '--detach-sign') 

347 

348 kwargs = {} 

349 if isinstance(infile, str): 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true

350 infile = infile.encode('utf-8') 

351 if isinstance(infile, bytes): 

352 kwargs['input'] = infile 

353 else: 

354 kwargs['stdin'] = infile 

355 if outfile is None: 

356 kwargs['stdout'] = subprocess.PIPE 

357 else: 

358 kwargs['stdout'] = outfile 

359 

360 result = subprocess.run(args, check=True, **kwargs) 

361 if outfile is None: 

362 return result.stdout 

363 

364# vim: set sw=4 et: