1"""Utilities for signed files
3@contact: Debian FTP Master <ftpmaster@debian.org>
4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
5@license: GNU General Public License version 2 or later
6"""
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22import datetime
23import fcntl
24import os
25import select
26import subprocess
27import sys
28from collections.abc import Iterable
30import apt_pkg
32try:
33 _MAXFD = os.sysconf("SC_OPEN_MAX")
34except:
35 _MAXFD = 256
38class GpgException(Exception):
39 pass
42class _Pipe:
43 """context manager for pipes
45 Note: When the pipe is closed by other means than the close_r and close_w
46 methods, you have to set self.r (self.w) to None.
47 """
49 def __enter__(self):
50 (self.r, self.w) = os.pipe()
51 return self
53 def __exit__(self, type, value, traceback):
54 self.close_w()
55 self.close_r()
56 return False
58 def close_r(self):
59 """close reading side of the pipe"""
60 if self.r:
61 os.close(self.r)
62 self.r = None
64 def close_w(self):
65 """close writing part of the pipe"""
66 if self.w:
67 os.close(self.w)
68 self.w = None
71# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
72def waitstatus_to_exitcode(status):
73 if os.WIFEXITED(status): 73 ↛ 75line 73 didn't jump to line 75, because the condition on line 73 was never false
74 return os.WEXITSTATUS(status)
75 elif os.WIFSIGNALED(status):
76 return -os.WTERMSIG(status)
77 else:
78 raise ValueError(f"Unexpected status code '{status}'")
81class SignedFile:
82 """handle files signed with PGP
84 The following attributes are available:
85 contents - byte-string with the content (after removing PGP armor)
86 valid - Boolean indicating a valid signature was found
87 weak_signature - signature uses a weak algorithm (e.g. SHA-1)
88 fingerprint - fingerprint of the key used for signing
89 primary_fingerprint - fingerprint of the primary key associated to the key used for signing
90 """
92 def __init__(
93 self,
94 data: bytes,
95 keyrings: Iterable[str],
96 require_signature: bool = True,
97 gpg: str = "/usr/bin/gpg",
98 ):
99 """
100 :param data: byte-string containing the message
101 :param keyrings: sequence of keyrings
102 :param require_signature: if True (the default), will raise an exception if no valid signature was found
103 :param gpg: location of the gpg binary
104 """
105 self.gpg = gpg
106 self.keyrings = keyrings
108 self.valid: bool = False #: valid signature
109 self.expired = False
110 self.invalid = False
111 self.weak_signature = False
112 self.fingerprints: list[str] = []
113 self.primary_fingerprints: list[str] = []
114 self.signature_ids: list[str] = []
116 self._verify(data, require_signature)
118 @property
119 def fingerprint(self) -> str:
120 """fingerprint of the (sub)key used for the signature"""
121 assert len(self.fingerprints) == 1
122 return self.fingerprints[0]
124 @property
125 def primary_fingerprint(self) -> str:
126 """fingerprint of the primary key used for the signature"""
127 assert len(self.primary_fingerprints) == 1
128 return self.primary_fingerprints[0]
130 @property
131 def signature_id(self):
132 assert len(self.signature_ids) == 1
133 return self.signature_ids[0]
135 def _verify(self, data, require_signature):
136 with _Pipe() as stdin, _Pipe() as contents, _Pipe() as status, _Pipe() as stderr:
137 pid = os.fork()
138 if pid == 0: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true
139 self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
140 else:
141 stdin.close_r()
142 contents.close_w()
143 stderr.close_w()
144 status.close_w()
146 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
147 stdin.w = None # was closed by _do_io
149 (pid_, status_code, usage_) = os.wait4(pid, 0)
150 if pid_ != pid: 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true
151 raise Exception(
152 f"wait4() waited for pid {pid_}, but we expected {pid}"
153 )
154 exit_code = waitstatus_to_exitcode(status_code)
156 self.contents = read[contents.r]
157 self.status = read[status.r]
158 self.stderr = read[stderr.r]
160 if self.status == b"": 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true
161 stderr = self.stderr.decode("ascii", errors="replace")
162 raise GpgException(
163 "No status output from GPG. (GPG exited with status code %s)\n%s"
164 % (exit_code, stderr)
165 )
167 # gpg exits with 0 (no error), 1 (at least one invalid sig),
168 # or anything else (fatal error). Even though status 2
169 # indicates a fatal error, we still allow it as it is also
170 # returned when the public key is not known.
171 if exit_code not in (0, 1, 2): 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true
172 stderr = self.stderr.decode("ascii", errors="replace")
173 raise GpgException(
174 f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}"
175 )
177 for line in self.status.splitlines():
178 self._parse_status(line)
180 if self.invalid:
181 self.valid = False
183 if require_signature and not self.valid:
184 stderr = self.stderr.decode("ascii", errors="replace")
185 raise GpgException(
186 "No valid signature found. (GPG exited with status code %s)\n%s"
187 % (exit_code, stderr)
188 )
190 assert len(self.fingerprints) == len(self.primary_fingerprints)
191 assert len(self.fingerprints) == len(self.signature_ids)
193 def _do_io(self, read, write):
194 for fd in write:
195 old = fcntl.fcntl(fd, fcntl.F_GETFL)
196 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
198 read_lines = dict((fd, []) for fd in read)
199 write_pos = dict((fd, 0) for fd in write)
201 read_set = list(read)
202 write_set = list(write)
203 while len(read_set) > 0 or len(write_set) > 0:
204 r, w, x_ = select.select(read_set, write_set, ())
205 for fd in r:
206 data = os.read(fd, 4096)
207 if len(data) == 0:
208 read_set.remove(fd)
209 else:
210 read_lines[fd].append(data)
211 for fd in w:
212 data = write[fd][write_pos[fd] :]
213 if len(data) == 0:
214 os.close(fd)
215 write_set.remove(fd)
216 else:
217 bytes_written = os.write(fd, data)
218 write_pos[fd] += bytes_written
220 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
222 def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime:
223 """parse timestamp in GnuPG's format
225 :return: datetime object for the given timestamp
226 """
227 # The old implementation did only return the date. As we already
228 # used this for replay production, return the legacy value for
229 # old signatures.
230 if datestring is not None: 230 ↛ 237line 230 didn't jump to line 237, because the condition on line 230 was never false
231 year, month, day = datestring.split(b"-")
232 date = datetime.date(int(year), int(month), int(day))
233 time = datetime.time(0, 0)
234 if date < datetime.date(2014, 8, 4):
235 return datetime.datetime.combine(date, time)
237 if b"T" in timestamp: 237 ↛ 238line 237 didn't jump to line 238, because the condition on line 237 was never true
238 raise Exception("No support for ISO 8601 timestamps.")
239 return datetime.datetime.utcfromtimestamp(int(timestamp))
241 def _parse_status(self, line):
242 fields = line.split()
243 if fields[0] != b"[GNUPG:]": 243 ↛ 244line 243 didn't jump to line 244, because the condition on line 243 was never true
244 raise GpgException("Unexpected output on status-fd: %s" % line)
246 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
247 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
248 # <hash-algo> <sig-class> <primary-key-fpr>
249 if fields[1] == b"VALIDSIG":
250 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
251 # which Debian 8 does not yet include. We want to make sure
252 # to not accept uploads covered by a MD5-based signature.
253 # RFC 4880, table 9.4:
254 # 1 - MD5
255 # 2 - SHA-1
256 # 3 - RIPE-MD/160
257 if fields[9] == b"1": 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true
258 raise GpgException("Digest algorithm MD5 is not trusted.")
259 if fields[9] in (b"2", b"3"):
260 self.weak_signature = True
262 self.valid = True
263 self.fingerprints.append(fields[2].decode("ascii"))
264 self.primary_fingerprints.append(fields[11].decode("ascii"))
265 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
267 elif fields[1] == b"BADARMOR":
268 raise GpgException("Bad armor.")
270 elif fields[1] == b"NODATA":
271 raise GpgException("No data.")
273 elif fields[1] == b"DECRYPTION_FAILED": 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true
274 raise GpgException("Decryption failed.")
276 elif fields[1] == b"ERROR": 276 ↛ 277line 276 didn't jump to line 277, because the condition on line 276 was never true
277 f2 = fields[2].decode("ascii", errors="replace")
278 f3 = fields[3].decode("ascii", errors="replace")
279 raise GpgException("Other error: %s %s" % (f2, f3))
281 elif fields[1] == b"SIG_ID":
282 self.signature_ids.append(fields[2])
284 elif fields[1] in (
285 b"PLAINTEXT",
286 b"GOODSIG",
287 b"KEY_CONSIDERED",
288 b"NEWSIG",
289 b"NOTATION_NAME",
290 b"NOTATION_FLAGS",
291 b"NOTATION_DATA",
292 b"SIGEXPIRED",
293 b"KEYEXPIRED",
294 b"POLICY_URL",
295 b"PROGRESS",
296 b"VERIFICATION_COMPLIANCE_MODE",
297 ):
298 pass
300 elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"):
301 self.expired = True
302 self.invalid = True
304 elif fields[1] in (
305 b"REVKEYSIG",
306 b"BADSIG",
307 b"ERRSIG",
308 b"KEYREVOKED",
309 b"NO_PUBKEY",
310 ):
311 self.invalid = True
313 else:
314 field = fields[1].decode("ascii", errors="replace")
315 raise GpgException(
316 "Keyword '{0}' from GnuPG was not expected.".format(field)
317 )
319 def _exec_gpg(self, stdin, stdout, stderr, statusfd):
320 try:
321 if stdin != 0:
322 os.dup2(stdin, 0)
323 if stdout != 1:
324 os.dup2(stdout, 1)
325 if stderr != 2:
326 os.dup2(stderr, 2)
327 if statusfd != 3:
328 os.dup2(statusfd, 3)
329 for fd in range(4):
330 old = fcntl.fcntl(fd, fcntl.F_GETFD)
331 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
332 os.closerange(4, _MAXFD)
334 args = [
335 self.gpg,
336 "--status-fd=3",
337 "--no-default-keyring",
338 "--batch",
339 "--no-tty",
340 "--trust-model",
341 "always",
342 "--fixed-list-mode",
343 ]
344 for k in self.keyrings:
345 args.extend(["--keyring", k])
346 args.extend(["--decrypt", "-"])
348 os.execvp(self.gpg, args)
349 finally:
350 try:
351 print("Failed to execute gpg.", file=sys.stderr)
352 sys.stderr.flush()
353 except:
354 # Ignore errors, we want to reach the `exit` call below.
355 pass
356 os._exit(3)
358 @property
359 def contents_sha1(self) -> str:
360 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
363def sign(
364 infile,
365 outfile=None,
366 keyids=[],
367 inline=False,
368 pubring=None,
369 secring=None,
370 homedir=None,
371 passphrase_file=None,
372 *,
373 digest_algorithm="SHA256",
374):
375 args = [
376 "/usr/bin/gpg",
377 "--no-options",
378 "--no-tty",
379 "--batch",
380 "--armour",
381 "--personal-digest-preferences",
382 digest_algorithm,
383 ]
385 for keyid in keyids:
386 args.extend(["--local-user", keyid])
387 if pubring is not None: 387 ↛ 388line 387 didn't jump to line 388, because the condition on line 387 was never true
388 args.extend(["--keyring", pubring])
389 if secring is not None: 389 ↛ 390line 389 didn't jump to line 390, because the condition on line 389 was never true
390 args.extend(["--secret-keyring", secring])
391 if homedir is not None: 391 ↛ 393line 391 didn't jump to line 393, because the condition on line 391 was never false
392 args.extend(["--homedir", homedir])
393 if passphrase_file is not None: 393 ↛ 394line 393 didn't jump to line 394, because the condition on line 393 was never true
394 args.extend(
395 ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file]
396 )
398 args.append("--clearsign" if inline else "--detach-sign")
400 kwargs = {}
401 if isinstance(infile, str): 401 ↛ 402line 401 didn't jump to line 402, because the condition on line 401 was never true
402 infile = infile.encode("utf-8")
403 if isinstance(infile, bytes):
404 kwargs["input"] = infile
405 else:
406 kwargs["stdin"] = infile
407 if outfile is None:
408 kwargs["stdout"] = subprocess.PIPE
409 else:
410 kwargs["stdout"] = outfile
412 result = subprocess.run(args, check=True, **kwargs)
413 if outfile is None:
414 return result.stdout
417# vim: set sw=4 et: