Coverage for daklib/gpg.py: 76%

232 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-02-10 22:10 +0000

1"""Utilities for signed files 

2 

3@contact: Debian FTP Master <ftpmaster@debian.org> 

4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org> 

5@license: GNU General Public License version 2 or later 

6""" 

7 

8# This program is free software; you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation; either version 2 of the License, or 

11# (at your option) any later version. 

12 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17 

18# You should have received a copy of the GNU General Public License 

19# along with this program; if not, write to the Free Software 

20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

21 

22import datetime 

23import fcntl 

24import os 

25import select 

26import subprocess 

27import sys 

28from collections.abc import Collection, Iterable, Mapping 

29from contextlib import ExitStack 

30from tempfile import NamedTemporaryFile 

31from typing import Any, NoReturn, Optional 

32 

33import apt_pkg 

34 

35try: 

36 _MAXFD = os.sysconf("SC_OPEN_MAX") 

37except: 

38 _MAXFD = 256 

39 

40 

41class GpgException(Exception): 

42 pass 

43 

44 

45class _Pipe: 

46 """context manager for pipes 

47 

48 Note: When the pipe is closed by other means than the close_r and close_w 

49 methods, you have to set self.r (self.w) to None. 

50 """ 

51 

52 r: int | None 

53 w: int | None 

54 

55 def __enter__(self): 

56 (self.r, self.w) = os.pipe() 

57 return self 

58 

59 def __exit__(self, type, value, traceback): 

60 self.close_w() 

61 self.close_r() 

62 return False 

63 

64 def close_r(self): 

65 """close reading side of the pipe""" 

66 if self.r: 

67 os.close(self.r) 

68 self.r = None 

69 

70 def close_w(self): 

71 """close writing part of the pipe""" 

72 if self.w: 

73 os.close(self.w) 

74 self.w = None 

75 

76 

77# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9) 

78def waitstatus_to_exitcode(status): 

79 if os.WIFEXITED(status): 79 ↛ 81line 79 didn't jump to line 81 because the condition on line 79 was always true

80 return os.WEXITSTATUS(status) 

81 elif os.WIFSIGNALED(status): 

82 return -os.WTERMSIG(status) 

83 else: 

84 raise ValueError(f"Unexpected status code '{status}'") 

85 

86 

87def _create_named_temporary_file_with_contents(contents: bytes): 

88 f = NamedTemporaryFile() 

89 f.write(contents) 

90 f.flush() 

91 return f 

92 

93 

94class SignedFile: 

95 """handle files signed with PGP 

96 

97 The following attributes are available: 

98 contents - byte-string with the content (after removing PGP armor) 

99 valid - Boolean indicating a valid signature was found 

100 weak_signature - signature uses a weak algorithm (e.g. SHA-1) 

101 fingerprint - fingerprint of the key used for signing 

102 primary_fingerprint - fingerprint of the primary key associated to the key used for signing 

103 """ 

104 

105 def __init__( 

106 self, 

107 data: bytes, 

108 keyrings: Collection[str], 

109 require_signature: bool = True, 

110 *, 

111 detached_signature: Optional[bytes] = None, 

112 gpg: str = "/usr/bin/gpg", 

113 ): 

114 """ 

115 :param data: byte-string containing the message 

116 :param keyrings: sequence of keyrings 

117 :param require_signature: if True (the default), will raise an exception if no valid signature was found 

118 :param detached_signature: validate detached signature if set; otherwise an inline signature is expected 

119 :param gpg: location of the gpg binary 

120 """ 

121 self.gpg = gpg 

122 self.keyrings = keyrings 

123 

124 self.valid: bool = False #: valid signature 

125 self.expired = False 

126 self.invalid = False 

127 self.weak_signature = False 

128 self.fingerprints: list[str] = [] 

129 self.primary_fingerprints: list[str] = [] 

130 self.signature_ids: list[str] = [] 

131 

132 self._verify(data, detached_signature, require_signature) 

133 

134 @property 

135 def fingerprint(self) -> str: 

136 """fingerprint of the (sub)key used for the signature""" 

137 assert len(self.fingerprints) == 1 

138 return self.fingerprints[0] 

139 

140 @property 

141 def primary_fingerprint(self) -> str: 

142 """fingerprint of the primary key used for the signature""" 

143 assert len(self.primary_fingerprints) == 1 

144 return self.primary_fingerprints[0] 

145 

146 @property 

147 def signature_id(self) -> str: 

148 assert len(self.signature_ids) == 1 

149 return self.signature_ids[0] 

150 

151 def _verify( 

152 self, data: bytes, detached_signature: Optional[bytes], require_signature: bool 

153 ) -> None: 

154 with ( 

155 _Pipe() as stdin, 

156 _Pipe() as contents, 

157 _Pipe() as status, 

158 _Pipe() as stderr, 

159 ExitStack() as stack, 

160 ): 

161 detached_signature_file = ( 

162 stack.enter_context( 

163 _create_named_temporary_file_with_contents(detached_signature) 

164 ) 

165 if detached_signature is not None 

166 else None 

167 ) 

168 

169 pid = os.fork() 

170 if pid == 0: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 self._exec_gpg( 

172 stdin.r, 

173 contents.w, 

174 stderr.w, 

175 status.w, 

176 ( 

177 detached_signature_file.name 

178 if detached_signature_file is not None 

179 else None 

180 ), 

181 ) 

182 else: 

183 stdin.close_r() 

184 contents.close_w() 

185 stderr.close_w() 

186 status.close_w() 

187 

188 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data}) 

189 stdin.w = None # was closed by _do_io 

190 

191 (pid_, status_code, usage_) = os.wait4(pid, 0) 

192 if pid_ != pid: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 raise Exception( 

194 f"wait4() waited for pid {pid_}, but we expected {pid}" 

195 ) 

196 exit_code = waitstatus_to_exitcode(status_code) 

197 

198 if detached_signature is None: 

199 self.contents = read[contents.r] 

200 elif read[contents.r]: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 raise GpgException( 

202 "GPG wrote data to stdout when verifying detached signature." 

203 ) 

204 else: 

205 self.contents = data 

206 

207 self.status = read[status.r] 

208 self.stderr = read[stderr.r] 

209 

210 if self.status == b"": 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 stderr = self.stderr.decode("ascii", errors="replace") 

212 raise GpgException( 

213 "No status output from GPG. (GPG exited with status code %s)\n%s" 

214 % (exit_code, stderr) 

215 ) 

216 

217 # gpg exits with 0 (no error), 1 (at least one invalid sig), 

218 # or anything else (fatal error). Even though status 2 

219 # indicates a fatal error, we still allow it as it is also 

220 # returned when the public key is not known. 

221 if exit_code not in (0, 1, 2): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 stderr = self.stderr.decode("ascii", errors="replace") 

223 raise GpgException( 

224 f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}" 

225 ) 

226 

227 self._seen_goodsig = 0 

228 

229 for line in self.status.splitlines(): 

230 self._parse_status(line) 

231 

232 if self.invalid or self._seen_goodsig != len(self.fingerprints): 

233 self.valid = False 

234 

235 if require_signature and not self.valid: 

236 stderr = self.stderr.decode("ascii", errors="replace") 

237 raise GpgException( 

238 "No valid signature found. (GPG exited with status code %s)\n%s" 

239 % (exit_code, stderr) 

240 ) 

241 

242 assert len(self.fingerprints) == len(self.primary_fingerprints) 

243 assert len(self.fingerprints) == len(self.signature_ids) 

244 

245 def _do_io( 

246 self, read: Collection[int], write: Mapping[int, bytes] 

247 ) -> dict[int, bytes]: 

248 for fd in write: 

249 old = fcntl.fcntl(fd, fcntl.F_GETFL) 

250 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK) 

251 

252 read_lines: dict[int, list[bytes]] = dict((fd, []) for fd in read) 

253 write_pos = dict((fd, 0) for fd in write) 

254 

255 read_set = list(read) 

256 write_set = list(write) 

257 while len(read_set) > 0 or len(write_set) > 0: 

258 r, w, x_ = select.select(read_set, write_set, ()) 

259 for fd in r: 

260 data = os.read(fd, 4096) 

261 if len(data) == 0: 

262 read_set.remove(fd) 

263 else: 

264 read_lines[fd].append(data) 

265 for fd in w: 

266 data = write[fd][write_pos[fd] :] 

267 if len(data) == 0: 

268 os.close(fd) 

269 write_set.remove(fd) 

270 else: 

271 bytes_written = os.write(fd, data) 

272 write_pos[fd] += bytes_written 

273 

274 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines) 

275 

276 def _parse_timestamp( 

277 self, timestamp: bytes, datestring: bytes | None = None 

278 ) -> datetime.datetime: 

279 """parse timestamp in GnuPG's format 

280 

281 :return: datetime object for the given timestamp 

282 """ 

283 # The old implementation did only return the date. As we already 

284 # used this for replay production, return the legacy value for 

285 # old signatures. 

286 if datestring is not None: 286 ↛ 293line 286 didn't jump to line 293 because the condition on line 286 was always true

287 year, month, day = datestring.split(b"-") 

288 date = datetime.date(int(year), int(month), int(day)) 

289 time = datetime.time(0, 0) 

290 if date < datetime.date(2014, 8, 4): 

291 return datetime.datetime.combine(date, time, tzinfo=datetime.UTC) 

292 

293 if b"T" in timestamp: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 raise Exception("No support for ISO 8601 timestamps.") 

295 return datetime.datetime.fromtimestamp(int(timestamp), datetime.UTC) 

296 

297 def _parse_status(self, line: bytes) -> None: 

298 fields = line.split() 

299 if fields[0] != b"[GNUPG:]": 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 raise GpgException("Unexpected output on status-fd: %r" % line) 

301 

302 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp> 

303 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo> 

304 # <hash-algo> <sig-class> <primary-key-fpr> 

305 if fields[1] == b"VALIDSIG": 

306 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, 

307 # which Debian 8 does not yet include. We want to make sure 

308 # to not accept uploads covered by a MD5-based signature. 

309 # RFC 4880, table 9.4: 

310 # 1 - MD5 

311 # 2 - SHA-1 

312 # 3 - RIPE-MD/160 

313 if fields[9] == b"1": 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 raise GpgException("Digest algorithm MD5 is not trusted.") 

315 if fields[9] in (b"2", b"3"): 

316 self.weak_signature = True 

317 

318 self.valid = True 

319 self.fingerprints.append(fields[2].decode("ascii")) 

320 self.primary_fingerprints.append(fields[11].decode("ascii")) 

321 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) 

322 

323 elif fields[1] == b"BADARMOR": 

324 raise GpgException("Bad armor.") 

325 

326 elif fields[1] == b"NODATA": 

327 raise GpgException("No data.") 

328 

329 elif fields[1] == b"DECRYPTION_FAILED": 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 raise GpgException("Decryption failed.") 

331 

332 elif fields[1] == b"ERROR": 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 f2 = fields[2].decode("ascii", errors="replace") 

334 f3 = fields[3].decode("ascii", errors="replace") 

335 raise GpgException("Other error: %s %s" % (f2, f3)) 

336 

337 elif fields[1] == b"SIG_ID": 

338 self.signature_ids.append(fields[2].decode("ascii")) 

339 

340 elif fields[1] == b"GOODSIG": 

341 self._seen_goodsig += 1 

342 

343 elif fields[1] in ( 

344 b"PLAINTEXT", 

345 b"KEY_CONSIDERED", 

346 b"NEWSIG", 

347 b"NOTATION_NAME", 

348 b"NOTATION_FLAGS", 

349 b"NOTATION_DATA", 

350 b"SIG_SUBPACKET", 

351 b"SIGEXPIRED", 

352 b"KEYEXPIRED", 

353 b"POLICY_URL", 

354 b"PROGRESS", 

355 b"VERIFICATION_COMPLIANCE_MODE", 

356 ): 

357 pass 

358 

359 elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"): 

360 self.expired = True 

361 self.invalid = True 

362 

363 elif fields[1] in ( 

364 b"REVKEYSIG", 

365 b"BADSIG", 

366 b"ERRSIG", 

367 b"FAILURE", 

368 b"KEYREVOKED", 

369 b"NO_PUBKEY", 

370 ): 

371 self.invalid = True 

372 

373 else: 

374 field = fields[1].decode("ascii", errors="replace") 

375 raise GpgException( 

376 "Keyword '{0}' from GnuPG was not expected.".format(field) 

377 ) 

378 

379 def _exec_gpg( 

380 self, 

381 stdin: int, 

382 stdout: int, 

383 stderr: int, 

384 statusfd: int, 

385 detached_signature_path: str | None, 

386 ) -> NoReturn: 

387 try: 

388 if stdin != 0: 

389 os.dup2(stdin, 0) 

390 if stdout != 1: 

391 os.dup2(stdout, 1) 

392 if stderr != 2: 

393 os.dup2(stderr, 2) 

394 if statusfd != 3: 

395 os.dup2(statusfd, 3) 

396 for fd in range(4): 

397 old = fcntl.fcntl(fd, fcntl.F_GETFD) 

398 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) 

399 os.closerange(4, _MAXFD) 

400 

401 args = [ 

402 self.gpg, 

403 "--status-fd=3", 

404 "--no-default-keyring", 

405 "--batch", 

406 "--no-tty", 

407 "--trust-model", 

408 "always", 

409 "--fixed-list-mode", 

410 ] 

411 for k in self.keyrings: 

412 args.extend(["--keyring", k]) 

413 

414 if detached_signature_path is not None: 

415 args.extend(["--verify", detached_signature_path, "-"]) 

416 else: 

417 args.extend(["--output", "-", "--verify", "-"]) 

418 

419 os.execvp(self.gpg, args) 

420 finally: 

421 try: 

422 print("Failed to execute gpg.", file=sys.stderr) 

423 sys.stderr.flush() 

424 except: 

425 # Ignore errors, we want to reach the `exit` call below. 

426 pass 

427 os._exit(3) 

428 

429 @property 

430 def contents_sha1(self) -> str: 

431 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined] 

432 

433 

434def sign( 

435 infile, 

436 outfile=None, 

437 keyids: Iterable[str] = [], 

438 inline=False, 

439 pubring: str | None = None, 

440 homedir: str | None = None, 

441 passphrase_file: str | None = None, 

442 *, 

443 digest_algorithm="SHA256", 

444) -> bytes | None: 

445 args = [ 

446 "/usr/bin/gpg", 

447 "--no-options", 

448 "--no-tty", 

449 "--batch", 

450 "--armour", 

451 "--personal-digest-preferences", 

452 digest_algorithm, 

453 ] 

454 

455 for keyid in keyids: 

456 args.extend(["--local-user", keyid]) 

457 if pubring is not None: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true

458 args.extend(["--keyring", pubring]) 

459 if homedir is not None: 459 ↛ 461line 459 didn't jump to line 461 because the condition on line 459 was always true

460 args.extend(["--homedir", homedir]) 

461 if passphrase_file is not None: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 args.extend( 

463 ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file] 

464 ) 

465 

466 args.append("--clearsign" if inline else "--detach-sign") 

467 

468 kwargs: dict[str, Any] = {} 

469 if isinstance(infile, str): 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true

470 infile = infile.encode("utf-8") 

471 if isinstance(infile, bytes): 

472 kwargs["input"] = infile 

473 else: 

474 kwargs["stdin"] = infile 

475 if outfile is None: 

476 kwargs["stdout"] = subprocess.PIPE 

477 else: 

478 kwargs["stdout"] = outfile 

479 

480 result = subprocess.run(args, check=True, **kwargs) 

481 if outfile is None: 

482 return result.stdout 

483 return None 

484 

485 

486# vim: set sw=4 et: