1"""Utilities for signed files
3@contact: Debian FTP Master <ftpmaster@debian.org>
4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
5@license: GNU General Public License version 2 or later
6"""
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22import datetime
23import fcntl
24import os
25import select
26import subprocess
27import sys
28from collections.abc import Iterable
29from contextlib import ExitStack
30from tempfile import NamedTemporaryFile
31from typing import Optional
33import apt_pkg
35try:
36 _MAXFD = os.sysconf("SC_OPEN_MAX")
37except:
38 _MAXFD = 256
41class GpgException(Exception):
42 pass
45class _Pipe:
46 """context manager for pipes
48 Note: When the pipe is closed by other means than the close_r and close_w
49 methods, you have to set self.r (self.w) to None.
50 """
52 def __enter__(self):
53 (self.r, self.w) = os.pipe()
54 return self
56 def __exit__(self, type, value, traceback):
57 self.close_w()
58 self.close_r()
59 return False
61 def close_r(self):
62 """close reading side of the pipe"""
63 if self.r:
64 os.close(self.r)
65 self.r = None
67 def close_w(self):
68 """close writing part of the pipe"""
69 if self.w:
70 os.close(self.w)
71 self.w = None
74# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
75def waitstatus_to_exitcode(status):
76 if os.WIFEXITED(status): 76 ↛ 78line 76 didn't jump to line 78, because the condition on line 76 was never false
77 return os.WEXITSTATUS(status)
78 elif os.WIFSIGNALED(status):
79 return -os.WTERMSIG(status)
80 else:
81 raise ValueError(f"Unexpected status code '{status}'")
84def _create_named_temporary_file_with_contents(contents: bytes) -> NamedTemporaryFile:
85 f = NamedTemporaryFile()
86 f.write(contents)
87 f.flush()
88 return f
91class SignedFile:
92 """handle files signed with PGP
94 The following attributes are available:
95 contents - byte-string with the content (after removing PGP armor)
96 valid - Boolean indicating a valid signature was found
97 weak_signature - signature uses a weak algorithm (e.g. SHA-1)
98 fingerprint - fingerprint of the key used for signing
99 primary_fingerprint - fingerprint of the primary key associated to the key used for signing
100 """
102 def __init__(
103 self,
104 data: bytes,
105 keyrings: Iterable[str],
106 require_signature: bool = True,
107 *,
108 detached_signature: Optional[bytes] = None,
109 gpg: str = "/usr/bin/gpg",
110 ):
111 """
112 :param data: byte-string containing the message
113 :param keyrings: sequence of keyrings
114 :param require_signature: if True (the default), will raise an exception if no valid signature was found
115 :param detached_signature: validate detached signature if set; otherwise an inline signature is expected
116 :param gpg: location of the gpg binary
117 """
118 self.gpg = gpg
119 self.keyrings = keyrings
121 self.valid: bool = False #: valid signature
122 self.expired = False
123 self.invalid = False
124 self.weak_signature = False
125 self.fingerprints: list[str] = []
126 self.primary_fingerprints: list[str] = []
127 self.signature_ids: list[str] = []
129 self._verify(data, detached_signature, require_signature)
131 @property
132 def fingerprint(self) -> str:
133 """fingerprint of the (sub)key used for the signature"""
134 assert len(self.fingerprints) == 1
135 return self.fingerprints[0]
137 @property
138 def primary_fingerprint(self) -> str:
139 """fingerprint of the primary key used for the signature"""
140 assert len(self.primary_fingerprints) == 1
141 return self.primary_fingerprints[0]
143 @property
144 def signature_id(self):
145 assert len(self.signature_ids) == 1
146 return self.signature_ids[0]
148 def _verify(
149 self, data: bytes, detached_signature: Optional[bytes], require_signature: bool
150 ):
151 with _Pipe() as stdin, _Pipe() as contents, _Pipe() as status, _Pipe() as stderr, ExitStack() as stack:
152 detached_signature_file = (
153 stack.enter_context(
154 _create_named_temporary_file_with_contents(detached_signature)
155 )
156 if detached_signature is not None
157 else None
158 )
160 pid = os.fork()
161 if pid == 0: 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true
162 self._exec_gpg(
163 stdin.r, contents.w, stderr.w, status.w, detached_signature_file
164 )
165 else:
166 stdin.close_r()
167 contents.close_w()
168 stderr.close_w()
169 status.close_w()
171 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
172 stdin.w = None # was closed by _do_io
174 (pid_, status_code, usage_) = os.wait4(pid, 0)
175 if pid_ != pid: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true
176 raise Exception(
177 f"wait4() waited for pid {pid_}, but we expected {pid}"
178 )
179 exit_code = waitstatus_to_exitcode(status_code)
181 if detached_signature is None:
182 self.contents = read[contents.r]
183 elif read[contents.r]: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true
184 raise GpgException(
185 "GPG wrote data to stdout when verifying detached signature."
186 )
187 else:
188 self.contents = data
190 self.status = read[status.r]
191 self.stderr = read[stderr.r]
193 if self.status == b"": 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true
194 stderr = self.stderr.decode("ascii", errors="replace")
195 raise GpgException(
196 "No status output from GPG. (GPG exited with status code %s)\n%s"
197 % (exit_code, stderr)
198 )
200 # gpg exits with 0 (no error), 1 (at least one invalid sig),
201 # or anything else (fatal error). Even though status 2
202 # indicates a fatal error, we still allow it as it is also
203 # returned when the public key is not known.
204 if exit_code not in (0, 1, 2): 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true
205 stderr = self.stderr.decode("ascii", errors="replace")
206 raise GpgException(
207 f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}"
208 )
210 self._seen_goodsig = 0
212 for line in self.status.splitlines():
213 self._parse_status(line)
215 if self.invalid or self._seen_goodsig != len(self.fingerprints):
216 self.valid = False
218 if require_signature and not self.valid:
219 stderr = self.stderr.decode("ascii", errors="replace")
220 raise GpgException(
221 "No valid signature found. (GPG exited with status code %s)\n%s"
222 % (exit_code, stderr)
223 )
225 assert len(self.fingerprints) == len(self.primary_fingerprints)
226 assert len(self.fingerprints) == len(self.signature_ids)
228 def _do_io(self, read, write):
229 for fd in write:
230 old = fcntl.fcntl(fd, fcntl.F_GETFL)
231 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
233 read_lines = dict((fd, []) for fd in read)
234 write_pos = dict((fd, 0) for fd in write)
236 read_set = list(read)
237 write_set = list(write)
238 while len(read_set) > 0 or len(write_set) > 0:
239 r, w, x_ = select.select(read_set, write_set, ())
240 for fd in r:
241 data = os.read(fd, 4096)
242 if len(data) == 0:
243 read_set.remove(fd)
244 else:
245 read_lines[fd].append(data)
246 for fd in w:
247 data = write[fd][write_pos[fd] :]
248 if len(data) == 0:
249 os.close(fd)
250 write_set.remove(fd)
251 else:
252 bytes_written = os.write(fd, data)
253 write_pos[fd] += bytes_written
255 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
257 def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime:
258 """parse timestamp in GnuPG's format
260 :return: datetime object for the given timestamp
261 """
262 # The old implementation did only return the date. As we already
263 # used this for replay production, return the legacy value for
264 # old signatures.
265 if datestring is not None: 265 ↛ 272line 265 didn't jump to line 272, because the condition on line 265 was never false
266 year, month, day = datestring.split(b"-")
267 date = datetime.date(int(year), int(month), int(day))
268 time = datetime.time(0, 0)
269 if date < datetime.date(2014, 8, 4):
270 return datetime.datetime.combine(date, time)
272 if b"T" in timestamp: 272 ↛ 273line 272 didn't jump to line 273, because the condition on line 272 was never true
273 raise Exception("No support for ISO 8601 timestamps.")
274 return datetime.datetime.utcfromtimestamp(int(timestamp))
276 def _parse_status(self, line):
277 fields = line.split()
278 if fields[0] != b"[GNUPG:]": 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true
279 raise GpgException("Unexpected output on status-fd: %s" % line)
281 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
282 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
283 # <hash-algo> <sig-class> <primary-key-fpr>
284 if fields[1] == b"VALIDSIG":
285 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
286 # which Debian 8 does not yet include. We want to make sure
287 # to not accept uploads covered by a MD5-based signature.
288 # RFC 4880, table 9.4:
289 # 1 - MD5
290 # 2 - SHA-1
291 # 3 - RIPE-MD/160
292 if fields[9] == b"1": 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true
293 raise GpgException("Digest algorithm MD5 is not trusted.")
294 if fields[9] in (b"2", b"3"):
295 self.weak_signature = True
297 self.valid = True
298 self.fingerprints.append(fields[2].decode("ascii"))
299 self.primary_fingerprints.append(fields[11].decode("ascii"))
300 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
302 elif fields[1] == b"BADARMOR":
303 raise GpgException("Bad armor.")
305 elif fields[1] == b"NODATA":
306 raise GpgException("No data.")
308 elif fields[1] == b"DECRYPTION_FAILED": 308 ↛ 309line 308 didn't jump to line 309, because the condition on line 308 was never true
309 raise GpgException("Decryption failed.")
311 elif fields[1] == b"ERROR": 311 ↛ 312line 311 didn't jump to line 312, because the condition on line 311 was never true
312 f2 = fields[2].decode("ascii", errors="replace")
313 f3 = fields[3].decode("ascii", errors="replace")
314 raise GpgException("Other error: %s %s" % (f2, f3))
316 elif fields[1] == b"SIG_ID":
317 self.signature_ids.append(fields[2])
319 elif fields[1] == b"GOODSIG":
320 self._seen_goodsig += 1
322 elif fields[1] in (
323 b"PLAINTEXT",
324 b"KEY_CONSIDERED",
325 b"NEWSIG",
326 b"NOTATION_NAME",
327 b"NOTATION_FLAGS",
328 b"NOTATION_DATA",
329 b"SIGEXPIRED",
330 b"KEYEXPIRED",
331 b"POLICY_URL",
332 b"PROGRESS",
333 b"VERIFICATION_COMPLIANCE_MODE",
334 ):
335 pass
337 elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"):
338 self.expired = True
339 self.invalid = True
341 elif fields[1] in (
342 b"REVKEYSIG",
343 b"BADSIG",
344 b"ERRSIG",
345 b"KEYREVOKED",
346 b"NO_PUBKEY",
347 ):
348 self.invalid = True
350 else:
351 field = fields[1].decode("ascii", errors="replace")
352 raise GpgException(
353 "Keyword '{0}' from GnuPG was not expected.".format(field)
354 )
356 def _exec_gpg(
357 self,
358 stdin,
359 stdout,
360 stderr,
361 statusfd,
362 detached_signature_file: Optional[NamedTemporaryFile],
363 ):
364 try:
365 if stdin != 0:
366 os.dup2(stdin, 0)
367 if stdout != 1:
368 os.dup2(stdout, 1)
369 if stderr != 2:
370 os.dup2(stderr, 2)
371 if statusfd != 3:
372 os.dup2(statusfd, 3)
373 for fd in range(4):
374 old = fcntl.fcntl(fd, fcntl.F_GETFD)
375 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
376 os.closerange(4, _MAXFD)
378 args = [
379 self.gpg,
380 "--status-fd=3",
381 "--no-default-keyring",
382 "--batch",
383 "--no-tty",
384 "--trust-model",
385 "always",
386 "--fixed-list-mode",
387 ]
388 for k in self.keyrings:
389 args.extend(["--keyring", k])
391 if detached_signature_file is not None:
392 args.extend(["--verify", detached_signature_file.name, "-"])
393 else:
394 args.extend(["--output", "-", "--verify", "-"])
396 os.execvp(self.gpg, args)
397 finally:
398 try:
399 print("Failed to execute gpg.", file=sys.stderr)
400 sys.stderr.flush()
401 except:
402 # Ignore errors, we want to reach the `exit` call below.
403 pass
404 os._exit(3)
406 @property
407 def contents_sha1(self) -> str:
408 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
411def sign(
412 infile,
413 outfile=None,
414 keyids=[],
415 inline=False,
416 pubring=None,
417 secring=None,
418 homedir=None,
419 passphrase_file=None,
420 *,
421 digest_algorithm="SHA256",
422):
423 args = [
424 "/usr/bin/gpg",
425 "--no-options",
426 "--no-tty",
427 "--batch",
428 "--armour",
429 "--personal-digest-preferences",
430 digest_algorithm,
431 ]
433 for keyid in keyids:
434 args.extend(["--local-user", keyid])
435 if pubring is not None: 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true
436 args.extend(["--keyring", pubring])
437 if secring is not None: 437 ↛ 438line 437 didn't jump to line 438, because the condition on line 437 was never true
438 args.extend(["--secret-keyring", secring])
439 if homedir is not None: 439 ↛ 441line 439 didn't jump to line 441, because the condition on line 439 was never false
440 args.extend(["--homedir", homedir])
441 if passphrase_file is not None: 441 ↛ 442line 441 didn't jump to line 442, because the condition on line 441 was never true
442 args.extend(
443 ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file]
444 )
446 args.append("--clearsign" if inline else "--detach-sign")
448 kwargs = {}
449 if isinstance(infile, str): 449 ↛ 450line 449 didn't jump to line 450, because the condition on line 449 was never true
450 infile = infile.encode("utf-8")
451 if isinstance(infile, bytes):
452 kwargs["input"] = infile
453 else:
454 kwargs["stdin"] = infile
455 if outfile is None:
456 kwargs["stdout"] = subprocess.PIPE
457 else:
458 kwargs["stdout"] = outfile
460 result = subprocess.run(args, check=True, **kwargs)
461 if outfile is None:
462 return result.stdout
465# vim: set sw=4 et: