Coverage for daklib/gpg.py: 76%
232 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-02-10 22:10 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-02-10 22:10 +0000
1"""Utilities for signed files
3@contact: Debian FTP Master <ftpmaster@debian.org>
4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
5@license: GNU General Public License version 2 or later
6"""
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22import datetime
23import fcntl
24import os
25import select
26import subprocess
27import sys
28from collections.abc import Collection, Iterable, Mapping
29from contextlib import ExitStack
30from tempfile import NamedTemporaryFile
31from typing import Any, NoReturn, Optional
33import apt_pkg
35try:
36 _MAXFD = os.sysconf("SC_OPEN_MAX")
37except:
38 _MAXFD = 256
41class GpgException(Exception):
42 pass
45class _Pipe:
46 """context manager for pipes
48 Note: When the pipe is closed by other means than the close_r and close_w
49 methods, you have to set self.r (self.w) to None.
50 """
52 r: int | None
53 w: int | None
55 def __enter__(self):
56 (self.r, self.w) = os.pipe()
57 return self
59 def __exit__(self, type, value, traceback):
60 self.close_w()
61 self.close_r()
62 return False
64 def close_r(self):
65 """close reading side of the pipe"""
66 if self.r:
67 os.close(self.r)
68 self.r = None
70 def close_w(self):
71 """close writing part of the pipe"""
72 if self.w:
73 os.close(self.w)
74 self.w = None
77# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
78def waitstatus_to_exitcode(status):
79 if os.WIFEXITED(status): 79 ↛ 81line 79 didn't jump to line 81 because the condition on line 79 was always true
80 return os.WEXITSTATUS(status)
81 elif os.WIFSIGNALED(status):
82 return -os.WTERMSIG(status)
83 else:
84 raise ValueError(f"Unexpected status code '{status}'")
87def _create_named_temporary_file_with_contents(contents: bytes):
88 f = NamedTemporaryFile()
89 f.write(contents)
90 f.flush()
91 return f
94class SignedFile:
95 """handle files signed with PGP
97 The following attributes are available:
98 contents - byte-string with the content (after removing PGP armor)
99 valid - Boolean indicating a valid signature was found
100 weak_signature - signature uses a weak algorithm (e.g. SHA-1)
101 fingerprint - fingerprint of the key used for signing
102 primary_fingerprint - fingerprint of the primary key associated to the key used for signing
103 """
105 def __init__(
106 self,
107 data: bytes,
108 keyrings: Collection[str],
109 require_signature: bool = True,
110 *,
111 detached_signature: Optional[bytes] = None,
112 gpg: str = "/usr/bin/gpg",
113 ):
114 """
115 :param data: byte-string containing the message
116 :param keyrings: sequence of keyrings
117 :param require_signature: if True (the default), will raise an exception if no valid signature was found
118 :param detached_signature: validate detached signature if set; otherwise an inline signature is expected
119 :param gpg: location of the gpg binary
120 """
121 self.gpg = gpg
122 self.keyrings = keyrings
124 self.valid: bool = False #: valid signature
125 self.expired = False
126 self.invalid = False
127 self.weak_signature = False
128 self.fingerprints: list[str] = []
129 self.primary_fingerprints: list[str] = []
130 self.signature_ids: list[str] = []
132 self._verify(data, detached_signature, require_signature)
134 @property
135 def fingerprint(self) -> str:
136 """fingerprint of the (sub)key used for the signature"""
137 assert len(self.fingerprints) == 1
138 return self.fingerprints[0]
140 @property
141 def primary_fingerprint(self) -> str:
142 """fingerprint of the primary key used for the signature"""
143 assert len(self.primary_fingerprints) == 1
144 return self.primary_fingerprints[0]
146 @property
147 def signature_id(self) -> str:
148 assert len(self.signature_ids) == 1
149 return self.signature_ids[0]
151 def _verify(
152 self, data: bytes, detached_signature: Optional[bytes], require_signature: bool
153 ) -> None:
154 with (
155 _Pipe() as stdin,
156 _Pipe() as contents,
157 _Pipe() as status,
158 _Pipe() as stderr,
159 ExitStack() as stack,
160 ):
161 detached_signature_file = (
162 stack.enter_context(
163 _create_named_temporary_file_with_contents(detached_signature)
164 )
165 if detached_signature is not None
166 else None
167 )
169 pid = os.fork()
170 if pid == 0: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 self._exec_gpg(
172 stdin.r,
173 contents.w,
174 stderr.w,
175 status.w,
176 (
177 detached_signature_file.name
178 if detached_signature_file is not None
179 else None
180 ),
181 )
182 else:
183 stdin.close_r()
184 contents.close_w()
185 stderr.close_w()
186 status.close_w()
188 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
189 stdin.w = None # was closed by _do_io
191 (pid_, status_code, usage_) = os.wait4(pid, 0)
192 if pid_ != pid: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 raise Exception(
194 f"wait4() waited for pid {pid_}, but we expected {pid}"
195 )
196 exit_code = waitstatus_to_exitcode(status_code)
198 if detached_signature is None:
199 self.contents = read[contents.r]
200 elif read[contents.r]: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 raise GpgException(
202 "GPG wrote data to stdout when verifying detached signature."
203 )
204 else:
205 self.contents = data
207 self.status = read[status.r]
208 self.stderr = read[stderr.r]
210 if self.status == b"": 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 stderr = self.stderr.decode("ascii", errors="replace")
212 raise GpgException(
213 "No status output from GPG. (GPG exited with status code %s)\n%s"
214 % (exit_code, stderr)
215 )
217 # gpg exits with 0 (no error), 1 (at least one invalid sig),
218 # or anything else (fatal error). Even though status 2
219 # indicates a fatal error, we still allow it as it is also
220 # returned when the public key is not known.
221 if exit_code not in (0, 1, 2): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 stderr = self.stderr.decode("ascii", errors="replace")
223 raise GpgException(
224 f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}"
225 )
227 self._seen_goodsig = 0
229 for line in self.status.splitlines():
230 self._parse_status(line)
232 if self.invalid or self._seen_goodsig != len(self.fingerprints):
233 self.valid = False
235 if require_signature and not self.valid:
236 stderr = self.stderr.decode("ascii", errors="replace")
237 raise GpgException(
238 "No valid signature found. (GPG exited with status code %s)\n%s"
239 % (exit_code, stderr)
240 )
242 assert len(self.fingerprints) == len(self.primary_fingerprints)
243 assert len(self.fingerprints) == len(self.signature_ids)
245 def _do_io(
246 self, read: Collection[int], write: Mapping[int, bytes]
247 ) -> dict[int, bytes]:
248 for fd in write:
249 old = fcntl.fcntl(fd, fcntl.F_GETFL)
250 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
252 read_lines: dict[int, list[bytes]] = dict((fd, []) for fd in read)
253 write_pos = dict((fd, 0) for fd in write)
255 read_set = list(read)
256 write_set = list(write)
257 while len(read_set) > 0 or len(write_set) > 0:
258 r, w, x_ = select.select(read_set, write_set, ())
259 for fd in r:
260 data = os.read(fd, 4096)
261 if len(data) == 0:
262 read_set.remove(fd)
263 else:
264 read_lines[fd].append(data)
265 for fd in w:
266 data = write[fd][write_pos[fd] :]
267 if len(data) == 0:
268 os.close(fd)
269 write_set.remove(fd)
270 else:
271 bytes_written = os.write(fd, data)
272 write_pos[fd] += bytes_written
274 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
276 def _parse_timestamp(
277 self, timestamp: bytes, datestring: bytes | None = None
278 ) -> datetime.datetime:
279 """parse timestamp in GnuPG's format
281 :return: datetime object for the given timestamp
282 """
283 # The old implementation did only return the date. As we already
284 # used this for replay production, return the legacy value for
285 # old signatures.
286 if datestring is not None: 286 ↛ 293line 286 didn't jump to line 293 because the condition on line 286 was always true
287 year, month, day = datestring.split(b"-")
288 date = datetime.date(int(year), int(month), int(day))
289 time = datetime.time(0, 0)
290 if date < datetime.date(2014, 8, 4):
291 return datetime.datetime.combine(date, time, tzinfo=datetime.UTC)
293 if b"T" in timestamp: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 raise Exception("No support for ISO 8601 timestamps.")
295 return datetime.datetime.fromtimestamp(int(timestamp), datetime.UTC)
297 def _parse_status(self, line: bytes) -> None:
298 fields = line.split()
299 if fields[0] != b"[GNUPG:]": 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 raise GpgException("Unexpected output on status-fd: %r" % line)
302 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
303 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
304 # <hash-algo> <sig-class> <primary-key-fpr>
305 if fields[1] == b"VALIDSIG":
306 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
307 # which Debian 8 does not yet include. We want to make sure
308 # to not accept uploads covered by a MD5-based signature.
309 # RFC 4880, table 9.4:
310 # 1 - MD5
311 # 2 - SHA-1
312 # 3 - RIPE-MD/160
313 if fields[9] == b"1": 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 raise GpgException("Digest algorithm MD5 is not trusted.")
315 if fields[9] in (b"2", b"3"):
316 self.weak_signature = True
318 self.valid = True
319 self.fingerprints.append(fields[2].decode("ascii"))
320 self.primary_fingerprints.append(fields[11].decode("ascii"))
321 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
323 elif fields[1] == b"BADARMOR":
324 raise GpgException("Bad armor.")
326 elif fields[1] == b"NODATA":
327 raise GpgException("No data.")
329 elif fields[1] == b"DECRYPTION_FAILED": 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 raise GpgException("Decryption failed.")
332 elif fields[1] == b"ERROR": 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 f2 = fields[2].decode("ascii", errors="replace")
334 f3 = fields[3].decode("ascii", errors="replace")
335 raise GpgException("Other error: %s %s" % (f2, f3))
337 elif fields[1] == b"SIG_ID":
338 self.signature_ids.append(fields[2].decode("ascii"))
340 elif fields[1] == b"GOODSIG":
341 self._seen_goodsig += 1
343 elif fields[1] in (
344 b"PLAINTEXT",
345 b"KEY_CONSIDERED",
346 b"NEWSIG",
347 b"NOTATION_NAME",
348 b"NOTATION_FLAGS",
349 b"NOTATION_DATA",
350 b"SIG_SUBPACKET",
351 b"SIGEXPIRED",
352 b"KEYEXPIRED",
353 b"POLICY_URL",
354 b"PROGRESS",
355 b"VERIFICATION_COMPLIANCE_MODE",
356 ):
357 pass
359 elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"):
360 self.expired = True
361 self.invalid = True
363 elif fields[1] in (
364 b"REVKEYSIG",
365 b"BADSIG",
366 b"ERRSIG",
367 b"FAILURE",
368 b"KEYREVOKED",
369 b"NO_PUBKEY",
370 ):
371 self.invalid = True
373 else:
374 field = fields[1].decode("ascii", errors="replace")
375 raise GpgException(
376 "Keyword '{0}' from GnuPG was not expected.".format(field)
377 )
379 def _exec_gpg(
380 self,
381 stdin: int,
382 stdout: int,
383 stderr: int,
384 statusfd: int,
385 detached_signature_path: str | None,
386 ) -> NoReturn:
387 try:
388 if stdin != 0:
389 os.dup2(stdin, 0)
390 if stdout != 1:
391 os.dup2(stdout, 1)
392 if stderr != 2:
393 os.dup2(stderr, 2)
394 if statusfd != 3:
395 os.dup2(statusfd, 3)
396 for fd in range(4):
397 old = fcntl.fcntl(fd, fcntl.F_GETFD)
398 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
399 os.closerange(4, _MAXFD)
401 args = [
402 self.gpg,
403 "--status-fd=3",
404 "--no-default-keyring",
405 "--batch",
406 "--no-tty",
407 "--trust-model",
408 "always",
409 "--fixed-list-mode",
410 ]
411 for k in self.keyrings:
412 args.extend(["--keyring", k])
414 if detached_signature_path is not None:
415 args.extend(["--verify", detached_signature_path, "-"])
416 else:
417 args.extend(["--output", "-", "--verify", "-"])
419 os.execvp(self.gpg, args)
420 finally:
421 try:
422 print("Failed to execute gpg.", file=sys.stderr)
423 sys.stderr.flush()
424 except:
425 # Ignore errors, we want to reach the `exit` call below.
426 pass
427 os._exit(3)
429 @property
430 def contents_sha1(self) -> str:
431 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
434def sign(
435 infile,
436 outfile=None,
437 keyids: Iterable[str] = [],
438 inline=False,
439 pubring: str | None = None,
440 homedir: str | None = None,
441 passphrase_file: str | None = None,
442 *,
443 digest_algorithm="SHA256",
444) -> bytes | None:
445 args = [
446 "/usr/bin/gpg",
447 "--no-options",
448 "--no-tty",
449 "--batch",
450 "--armour",
451 "--personal-digest-preferences",
452 digest_algorithm,
453 ]
455 for keyid in keyids:
456 args.extend(["--local-user", keyid])
457 if pubring is not None: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true
458 args.extend(["--keyring", pubring])
459 if homedir is not None: 459 ↛ 461line 459 didn't jump to line 461 because the condition on line 459 was always true
460 args.extend(["--homedir", homedir])
461 if passphrase_file is not None: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 args.extend(
463 ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file]
464 )
466 args.append("--clearsign" if inline else "--detach-sign")
468 kwargs: dict[str, Any] = {}
469 if isinstance(infile, str): 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true
470 infile = infile.encode("utf-8")
471 if isinstance(infile, bytes):
472 kwargs["input"] = infile
473 else:
474 kwargs["stdin"] = infile
475 if outfile is None:
476 kwargs["stdout"] = subprocess.PIPE
477 else:
478 kwargs["stdout"] = outfile
480 result = subprocess.run(args, check=True, **kwargs)
481 if outfile is None:
482 return result.stdout
483 return None
486# vim: set sw=4 et: