Coverage for daklib/gpg.py: 76%

232 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1"""Utilities for signed files 

2 

3@contact: Debian FTP Master <ftpmaster@debian.org> 

4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org> 

5@license: GNU General Public License version 2 or later 

6""" 

7 

8# This program is free software; you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation; either version 2 of the License, or 

11# (at your option) any later version. 

12 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17 

18# You should have received a copy of the GNU General Public License 

19# along with this program; if not, write to the Free Software 

20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

21 

22import datetime 

23import fcntl 

24import os 

25import select 

26import subprocess 

27import sys 

28from collections.abc import Collection, Iterable, Mapping 

29from contextlib import ExitStack 

30from tempfile import NamedTemporaryFile 

31from typing import Any, NoReturn, Optional 

32 

33import apt_pkg 

34 

35try: 

36 _MAXFD = os.sysconf("SC_OPEN_MAX") 

37except: 

38 _MAXFD = 256 

39 

40 

41class GpgException(Exception): 

42 pass 

43 

44 

45class _Pipe: 

46 """context manager for pipes 

47 

48 Note: When the pipe is closed by other means than the close_r and close_w 

49 methods, you have to set self.r (self.w) to None. 

50 """ 

51 

52 r: int | None 

53 w: int | None 

54 

55 def __enter__(self): 

56 (self.r, self.w) = os.pipe() 

57 return self 

58 

59 def __exit__(self, type, value, traceback): 

60 self.close_w() 

61 self.close_r() 

62 return False 

63 

64 def close_r(self): 

65 """close reading side of the pipe""" 

66 if self.r: 

67 os.close(self.r) 

68 self.r = None 

69 

70 def close_w(self): 

71 """close writing part of the pipe""" 

72 if self.w: 

73 os.close(self.w) 

74 self.w = None 

75 

76 

77# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9) 

78def waitstatus_to_exitcode(status): 

79 if os.WIFEXITED(status): 79 ↛ 81line 79 didn't jump to line 81 because the condition on line 79 was always true

80 return os.WEXITSTATUS(status) 

81 elif os.WIFSIGNALED(status): 

82 return -os.WTERMSIG(status) 

83 else: 

84 raise ValueError(f"Unexpected status code '{status}'") 

85 

86 

87def _create_named_temporary_file_with_contents(contents: bytes): 

88 f = NamedTemporaryFile() 

89 f.write(contents) 

90 f.flush() 

91 return f 

92 

93 

94class SignedFile: 

95 """handle files signed with PGP 

96 

97 The following attributes are available: 

98 contents - byte-string with the content (after removing PGP armor) 

99 valid - Boolean indicating a valid signature was found 

100 weak_signature - signature uses a weak algorithm (e.g. SHA-1) 

101 fingerprint - fingerprint of the key used for signing 

102 primary_fingerprint - fingerprint of the primary key associated to the key used for signing 

103 """ 

104 

105 def __init__( 

106 self, 

107 data: bytes, 

108 keyrings: Collection[str], 

109 require_signature: bool = True, 

110 *, 

111 detached_signature: Optional[bytes] = None, 

112 gpg: str = "/usr/bin/gpg", 

113 ): 

114 """ 

115 :param data: byte-string containing the message 

116 :param keyrings: sequence of keyrings 

117 :param require_signature: if True (the default), will raise an exception if no valid signature was found 

118 :param detached_signature: validate detached signature if set; otherwise an inline signature is expected 

119 :param gpg: location of the gpg binary 

120 """ 

121 self.gpg = gpg 

122 self.keyrings = keyrings 

123 

124 self.valid: bool = False #: valid signature 

125 self.expired = False 

126 self.invalid = False 

127 self.weak_signature = False 

128 self.fingerprints: list[str] = [] 

129 self.primary_fingerprints: list[str] = [] 

130 self.signature_ids: list[str] = [] 

131 

132 self._verify(data, detached_signature, require_signature) 

133 

134 @property 

135 def fingerprint(self) -> str: 

136 """fingerprint of the (sub)key used for the signature""" 

137 assert len(self.fingerprints) == 1 

138 return self.fingerprints[0] 

139 

140 @property 

141 def primary_fingerprint(self) -> str: 

142 """fingerprint of the primary key used for the signature""" 

143 assert len(self.primary_fingerprints) == 1 

144 return self.primary_fingerprints[0] 

145 

146 @property 

147 def signature_id(self) -> str: 

148 assert len(self.signature_ids) == 1 

149 return self.signature_ids[0] 

150 

151 def _verify( 

152 self, data: bytes, detached_signature: Optional[bytes], require_signature: bool 

153 ) -> None: 

154 with ( 

155 _Pipe() as stdin, 

156 _Pipe() as contents, 

157 _Pipe() as status, 

158 _Pipe() as stderr, 

159 ExitStack() as stack, 

160 ): 

161 detached_signature_file = ( 

162 stack.enter_context( 

163 _create_named_temporary_file_with_contents(detached_signature) 

164 ) 

165 if detached_signature is not None 

166 else None 

167 ) 

168 

169 pid = os.fork() 

170 if pid == 0: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 self._exec_gpg( 

172 stdin.r, 

173 contents.w, 

174 stderr.w, 

175 status.w, 

176 ( 

177 detached_signature_file.name 

178 if detached_signature_file is not None 

179 else None 

180 ), 

181 ) 

182 else: 

183 stdin.close_r() 

184 contents.close_w() 

185 stderr.close_w() 

186 status.close_w() 

187 

188 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data}) 

189 stdin.w = None # was closed by _do_io 

190 

191 (pid_, status_code, usage_) = os.wait4(pid, 0) 

192 if pid_ != pid: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 raise Exception( 

194 f"wait4() waited for pid {pid_}, but we expected {pid}" 

195 ) 

196 exit_code = waitstatus_to_exitcode(status_code) 

197 

198 if detached_signature is None: 

199 self.contents = read[contents.r] 

200 elif read[contents.r]: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 raise GpgException( 

202 "GPG wrote data to stdout when verifying detached signature." 

203 ) 

204 else: 

205 self.contents = data 

206 

207 self.status = read[status.r] 

208 self.stderr = read[stderr.r] 

209 

210 if self.status == b"": 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 stderr = self.stderr.decode("ascii", errors="replace") 

212 raise GpgException( 

213 "No status output from GPG. (GPG exited with status code %s)\n%s" 

214 % (exit_code, stderr) 

215 ) 

216 

217 # gpg exits with 0 (no error), 1 (at least one invalid sig), 

218 # or anything else (fatal error). Even though status 2 

219 # indicates a fatal error, we still allow it as it is also 

220 # returned when the public key is not known. 

221 if exit_code not in (0, 1, 2): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 stderr = self.stderr.decode("ascii", errors="replace") 

223 raise GpgException( 

224 f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}" 

225 ) 

226 

227 self._seen_goodsig = 0 

228 

229 for line in self.status.splitlines(): 

230 self._parse_status(line) 

231 

232 if self.invalid or self._seen_goodsig != len(self.fingerprints): 

233 self.valid = False 

234 

235 if require_signature and not self.valid: 

236 stderr = self.stderr.decode("ascii", errors="replace") 

237 raise GpgException( 

238 "No valid signature found. (GPG exited with status code %s)\n%s" 

239 % (exit_code, stderr) 

240 ) 

241 

242 assert len(self.fingerprints) == len(self.primary_fingerprints) 

243 assert len(self.fingerprints) == len(self.signature_ids) 

244 

245 def _do_io( 

246 self, read: Collection[int], write: Mapping[int, bytes] 

247 ) -> dict[int, bytes]: 

248 for fd in write: 

249 old = fcntl.fcntl(fd, fcntl.F_GETFL) 

250 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK) 

251 

252 read_lines: dict[int, list[bytes]] = dict((fd, []) for fd in read) 

253 write_pos = dict((fd, 0) for fd in write) 

254 

255 read_set = list(read) 

256 write_set = list(write) 

257 while len(read_set) > 0 or len(write_set) > 0: 

258 r, w, x_ = select.select(read_set, write_set, ()) 

259 for fd in r: 

260 data = os.read(fd, 4096) 

261 if len(data) == 0: 

262 read_set.remove(fd) 

263 else: 

264 read_lines[fd].append(data) 

265 for fd in w: 

266 data = write[fd][write_pos[fd] :] 

267 if len(data) == 0: 

268 os.close(fd) 

269 write_set.remove(fd) 

270 else: 

271 bytes_written = os.write(fd, data) 

272 write_pos[fd] += bytes_written 

273 

274 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines) 

275 

276 def _parse_timestamp( 

277 self, timestamp: bytes, datestring: bytes | None = None 

278 ) -> datetime.datetime: 

279 """parse timestamp in GnuPG's format 

280 

281 :return: datetime object for the given timestamp 

282 """ 

283 # The old implementation did only return the date. As we already 

284 # used this for replay production, return the legacy value for 

285 # old signatures. 

286 if datestring is not None: 286 ↛ 293line 286 didn't jump to line 293 because the condition on line 286 was always true

287 year, month, day = datestring.split(b"-") 

288 date = datetime.date(int(year), int(month), int(day)) 

289 time = datetime.time(0, 0) 

290 if date < datetime.date(2014, 8, 4): 

291 return datetime.datetime.combine(date, time, tzinfo=datetime.UTC) 

292 

293 if b"T" in timestamp: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 raise Exception("No support for ISO 8601 timestamps.") 

295 return datetime.datetime.fromtimestamp(int(timestamp), datetime.UTC) 

296 

297 def _parse_status(self, line: bytes) -> None: 

298 fields = line.split() 

299 if fields[0] != b"[GNUPG:]": 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 raise GpgException("Unexpected output on status-fd: %r" % line) 

301 

302 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp> 

303 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo> 

304 # <hash-algo> <sig-class> <primary-key-fpr> 

305 if fields[1] == b"VALIDSIG": 

306 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, 

307 # which Debian 8 does not yet include. We want to make sure 

308 # to not accept uploads covered by a MD5-based signature. 

309 # RFC 4880, table 9.4: 

310 # 1 - MD5 

311 # 2 - SHA-1 

312 # 3 - RIPE-MD/160 

313 if fields[9] == b"1": 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 raise GpgException("Digest algorithm MD5 is not trusted.") 

315 if fields[9] in (b"2", b"3"): 

316 self.weak_signature = True 

317 

318 self.valid = True 

319 self.fingerprints.append(fields[2].decode("ascii")) 

320 self.primary_fingerprints.append(fields[11].decode("ascii")) 

321 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) 

322 

323 elif fields[1] == b"BADARMOR": 

324 raise GpgException("Bad armor.") 

325 

326 elif fields[1] == b"NODATA": 

327 raise GpgException("No data.") 

328 

329 elif fields[1] == b"DECRYPTION_FAILED": 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 raise GpgException("Decryption failed.") 

331 

332 elif fields[1] == b"ERROR": 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 f2 = fields[2].decode("ascii", errors="replace") 

334 f3 = fields[3].decode("ascii", errors="replace") 

335 raise GpgException("Other error: %s %s" % (f2, f3)) 

336 

337 elif fields[1] == b"SIG_ID": 

338 self.signature_ids.append(fields[2].decode("ascii")) 

339 

340 elif fields[1] == b"GOODSIG": 

341 self._seen_goodsig += 1 

342 

343 elif fields[1] in ( 

344 b"PLAINTEXT", 

345 b"KEY_CONSIDERED", 

346 b"NEWSIG", 

347 b"NOTATION_NAME", 

348 b"NOTATION_FLAGS", 

349 b"NOTATION_DATA", 

350 b"SIGEXPIRED", 

351 b"KEYEXPIRED", 

352 b"POLICY_URL", 

353 b"PROGRESS", 

354 b"VERIFICATION_COMPLIANCE_MODE", 

355 ): 

356 pass 

357 

358 elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"): 

359 self.expired = True 

360 self.invalid = True 

361 

362 elif fields[1] in ( 

363 b"REVKEYSIG", 

364 b"BADSIG", 

365 b"ERRSIG", 

366 b"FAILURE", 

367 b"KEYREVOKED", 

368 b"NO_PUBKEY", 

369 ): 

370 self.invalid = True 

371 

372 else: 

373 field = fields[1].decode("ascii", errors="replace") 

374 raise GpgException( 

375 "Keyword '{0}' from GnuPG was not expected.".format(field) 

376 ) 

377 

378 def _exec_gpg( 

379 self, 

380 stdin: int, 

381 stdout: int, 

382 stderr: int, 

383 statusfd: int, 

384 detached_signature_path: str | None, 

385 ) -> NoReturn: 

386 try: 

387 if stdin != 0: 

388 os.dup2(stdin, 0) 

389 if stdout != 1: 

390 os.dup2(stdout, 1) 

391 if stderr != 2: 

392 os.dup2(stderr, 2) 

393 if statusfd != 3: 

394 os.dup2(statusfd, 3) 

395 for fd in range(4): 

396 old = fcntl.fcntl(fd, fcntl.F_GETFD) 

397 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) 

398 os.closerange(4, _MAXFD) 

399 

400 args = [ 

401 self.gpg, 

402 "--status-fd=3", 

403 "--no-default-keyring", 

404 "--batch", 

405 "--no-tty", 

406 "--trust-model", 

407 "always", 

408 "--fixed-list-mode", 

409 ] 

410 for k in self.keyrings: 

411 args.extend(["--keyring", k]) 

412 

413 if detached_signature_path is not None: 

414 args.extend(["--verify", detached_signature_path, "-"]) 

415 else: 

416 args.extend(["--output", "-", "--verify", "-"]) 

417 

418 os.execvp(self.gpg, args) 

419 finally: 

420 try: 

421 print("Failed to execute gpg.", file=sys.stderr) 

422 sys.stderr.flush() 

423 except: 

424 # Ignore errors, we want to reach the `exit` call below. 

425 pass 

426 os._exit(3) 

427 

428 @property 

429 def contents_sha1(self) -> str: 

430 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined] 

431 

432 

433def sign( 

434 infile, 

435 outfile=None, 

436 keyids: Iterable[str] = [], 

437 inline=False, 

438 pubring: str | None = None, 

439 homedir: str | None = None, 

440 passphrase_file: str | None = None, 

441 *, 

442 digest_algorithm="SHA256", 

443) -> bytes | None: 

444 args = [ 

445 "/usr/bin/gpg", 

446 "--no-options", 

447 "--no-tty", 

448 "--batch", 

449 "--armour", 

450 "--personal-digest-preferences", 

451 digest_algorithm, 

452 ] 

453 

454 for keyid in keyids: 

455 args.extend(["--local-user", keyid]) 

456 if pubring is not None: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 args.extend(["--keyring", pubring]) 

458 if homedir is not None: 458 ↛ 460line 458 didn't jump to line 460 because the condition on line 458 was always true

459 args.extend(["--homedir", homedir]) 

460 if passphrase_file is not None: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 args.extend( 

462 ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file] 

463 ) 

464 

465 args.append("--clearsign" if inline else "--detach-sign") 

466 

467 kwargs: dict[str, Any] = {} 

468 if isinstance(infile, str): 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 infile = infile.encode("utf-8") 

470 if isinstance(infile, bytes): 

471 kwargs["input"] = infile 

472 else: 

473 kwargs["stdin"] = infile 

474 if outfile is None: 

475 kwargs["stdout"] = subprocess.PIPE 

476 else: 

477 kwargs["stdout"] = outfile 

478 

479 result = subprocess.run(args, check=True, **kwargs) 

480 if outfile is None: 

481 return result.stdout 

482 return None 

483 

484 

485# vim: set sw=4 et: