1"""Utilities for signed files
3@contact: Debian FTP Master <ftpmaster@debian.org>
4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
5@license: GNU General Public License version 2 or later
6"""
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
18# You should have received a copy of the GNU General Public License
19# along with this program; if not, write to the Free Software
20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22import apt_pkg
23import datetime
24import fcntl
25import os
26import select
27import subprocess
28from collections.abc import Iterable
30try:
31 _MAXFD = os.sysconf("SC_OPEN_MAX")
32except:
33 _MAXFD = 256
36class GpgException(Exception):
37 pass
40class _Pipe:
41 """context manager for pipes
43 Note: When the pipe is closed by other means than the close_r and close_w
44 methods, you have to set self.r (self.w) to None.
45 """
47 def __enter__(self):
48 (self.r, self.w) = os.pipe()
49 return self
51 def __exit__(self, type, value, traceback):
52 self.close_w()
53 self.close_r()
54 return False
56 def close_r(self):
57 """close reading side of the pipe"""
58 if self.r:
59 os.close(self.r)
60 self.r = None
62 def close_w(self):
63 """close writing part of the pipe"""
64 if self.w:
65 os.close(self.w)
66 self.w = None
69# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
70def waitstatus_to_exitcode(status):
71 if os.WIFEXITED(status): 71 ↛ 73line 71 didn't jump to line 73, because the condition on line 71 was never false
72 return os.WEXITSTATUS(status)
73 elif os.WIFSIGNALED(status):
74 return -os.WTERMSIG(status)
75 else:
76 raise ValueError(f"Unexpected status code '{status}'")
79class SignedFile:
80 """handle files signed with PGP
82 The following attributes are available:
83 contents - byte-string with the content (after removing PGP armor)
84 valid - Boolean indicating a valid signature was found
85 weak_signature - signature uses a weak algorithm (e.g. SHA-1)
86 fingerprint - fingerprint of the key used for signing
87 primary_fingerprint - fingerprint of the primary key associated to the key used for signing
88 """
90 def __init__(self, data: bytes, keyrings: Iterable[str], require_signature: bool = True, gpg: str = "/usr/bin/gpg"):
91 """
92 :param data: byte-string containing the message
93 :param keyrings: sequence of keyrings
94 :param require_signature: if True (the default), will raise an exception if no valid signature was found
95 :param gpg: location of the gpg binary
96 """
97 self.gpg = gpg
98 self.keyrings = keyrings
100 self.valid: bool = False #: valid signature
101 self.expired = False
102 self.invalid = False
103 self.weak_signature = False
104 self.fingerprints: list[str] = []
105 self.primary_fingerprints: list[str] = []
106 self.signature_ids: list[str] = []
108 self._verify(data, require_signature)
110 @property
111 def fingerprint(self) -> str:
112 """fingerprint of the (sub)key used for the signature"""
113 assert len(self.fingerprints) == 1
114 return self.fingerprints[0]
116 @property
117 def primary_fingerprint(self) -> str:
118 """fingerprint of the primary key used for the signature"""
119 assert len(self.primary_fingerprints) == 1
120 return self.primary_fingerprints[0]
122 @property
123 def signature_id(self):
124 assert len(self.signature_ids) == 1
125 return self.signature_ids[0]
127 def _verify(self, data, require_signature):
128 with _Pipe() as stdin, \
129 _Pipe() as contents, \
130 _Pipe() as status, \
131 _Pipe() as stderr:
132 pid = os.fork()
133 if pid == 0: 133 ↛ 134line 133 didn't jump to line 134, because the condition on line 133 was never true
134 self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
135 else:
136 stdin.close_r()
137 contents.close_w()
138 stderr.close_w()
139 status.close_w()
141 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
142 stdin.w = None # was closed by _do_io
144 (pid_, status_code, usage_) = os.wait4(pid, 0)
145 if pid_ != pid: 145 ↛ 146line 145 didn't jump to line 146, because the condition on line 145 was never true
146 raise Exception(f"wait4() waited for pid {pid_}, but we expected {pid}")
147 exit_code = waitstatus_to_exitcode(status_code)
149 self.contents = read[contents.r]
150 self.status = read[status.r]
151 self.stderr = read[stderr.r]
153 if self.status == b"": 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true
154 stderr = self.stderr.decode('ascii', errors='replace')
155 raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
157 # gpg exits with 0 (no error), 1 (at least one invalid sig),
158 # or anything else (fatal error). Even though status 2
159 # indicates a fatal error, we still allow it as it is also
160 # returned when the public key is not known.
161 if exit_code not in (0, 1, 2): 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true
162 stderr = self.stderr.decode('ascii', errors='replace')
163 raise GpgException(f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}")
165 for line in self.status.splitlines():
166 self._parse_status(line)
168 if self.invalid:
169 self.valid = False
171 if require_signature and not self.valid:
172 stderr = self.stderr.decode('ascii', errors='replace')
173 raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
175 assert len(self.fingerprints) == len(self.primary_fingerprints)
176 assert len(self.fingerprints) == len(self.signature_ids)
178 def _do_io(self, read, write):
179 for fd in write:
180 old = fcntl.fcntl(fd, fcntl.F_GETFL)
181 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
183 read_lines = dict((fd, []) for fd in read)
184 write_pos = dict((fd, 0) for fd in write)
186 read_set = list(read)
187 write_set = list(write)
188 while len(read_set) > 0 or len(write_set) > 0:
189 r, w, x_ = select.select(read_set, write_set, ())
190 for fd in r:
191 data = os.read(fd, 4096)
192 if len(data) == 0:
193 read_set.remove(fd)
194 else:
195 read_lines[fd].append(data)
196 for fd in w:
197 data = write[fd][write_pos[fd]:]
198 if len(data) == 0:
199 os.close(fd)
200 write_set.remove(fd)
201 else:
202 bytes_written = os.write(fd, data)
203 write_pos[fd] += bytes_written
205 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
207 def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime:
208 """parse timestamp in GnuPG's format
210 :return: datetime object for the given timestamp
211 """
212 # The old implementation did only return the date. As we already
213 # used this for replay production, return the legacy value for
214 # old signatures.
215 if datestring is not None: 215 ↛ 222line 215 didn't jump to line 222, because the condition on line 215 was never false
216 year, month, day = datestring.split(b'-')
217 date = datetime.date(int(year), int(month), int(day))
218 time = datetime.time(0, 0)
219 if date < datetime.date(2014, 8, 4):
220 return datetime.datetime.combine(date, time)
222 if b'T' in timestamp: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true
223 raise Exception('No support for ISO 8601 timestamps.')
224 return datetime.datetime.utcfromtimestamp(int(timestamp))
226 def _parse_status(self, line):
227 fields = line.split()
228 if fields[0] != b"[GNUPG:]": 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true
229 raise GpgException("Unexpected output on status-fd: %s" % line)
231 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
232 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
233 # <hash-algo> <sig-class> <primary-key-fpr>
234 if fields[1] == b"VALIDSIG":
235 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
236 # which Debian 8 does not yet include. We want to make sure
237 # to not accept uploads covered by a MD5-based signature.
238 # RFC 4880, table 9.4:
239 # 1 - MD5
240 # 2 - SHA-1
241 # 3 - RIPE-MD/160
242 if fields[9] == b"1": 242 ↛ 243line 242 didn't jump to line 243, because the condition on line 242 was never true
243 raise GpgException("Digest algorithm MD5 is not trusted.")
244 if fields[9] in (b"2", b"3"):
245 self.weak_signature = True
247 self.valid = True
248 self.fingerprints.append(fields[2].decode('ascii'))
249 self.primary_fingerprints.append(fields[11].decode('ascii'))
250 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
252 elif fields[1] == b"BADARMOR":
253 raise GpgException("Bad armor.")
255 elif fields[1] == b"NODATA":
256 raise GpgException("No data.")
258 elif fields[1] == b"DECRYPTION_FAILED": 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true
259 raise GpgException("Decryption failed.")
261 elif fields[1] == b"ERROR": 261 ↛ 262line 261 didn't jump to line 262, because the condition on line 261 was never true
262 f2 = fields[2].decode('ascii', errors='replace')
263 f3 = fields[3].decode('ascii', errors='replace')
264 raise GpgException("Other error: %s %s" % (f2, f3))
266 elif fields[1] == b"SIG_ID":
267 self.signature_ids.append(fields[2])
269 elif fields[1] in (b'PLAINTEXT', b'GOODSIG', b'KEY_CONSIDERED',
270 b'NEWSIG', b'NOTATION_NAME', b'NOTATION_FLAGS',
271 b'NOTATION_DATA', b'SIGEXPIRED', b'KEYEXPIRED',
272 b'POLICY_URL', b'PROGRESS', b'VERIFICATION_COMPLIANCE_MODE'):
273 pass
275 elif fields[1] in (b'EXPSIG', b'EXPKEYSIG'):
276 self.expired = True
277 self.invalid = True
279 elif fields[1] in (b'REVKEYSIG', b'BADSIG', b'ERRSIG', b'KEYREVOKED', b'NO_PUBKEY'):
280 self.invalid = True
282 else:
283 field = fields[1].decode('ascii', errors='replace')
284 raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(field))
286 def _exec_gpg(self, stdin, stdout, stderr, statusfd):
287 try:
288 if stdin != 0:
289 os.dup2(stdin, 0)
290 if stdout != 1:
291 os.dup2(stdout, 1)
292 if stderr != 2:
293 os.dup2(stderr, 2)
294 if statusfd != 3:
295 os.dup2(statusfd, 3)
296 for fd in range(4):
297 old = fcntl.fcntl(fd, fcntl.F_GETFD)
298 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
299 os.closerange(4, _MAXFD)
301 args = [self.gpg,
302 "--status-fd=3",
303 "--no-default-keyring",
304 "--batch",
305 "--no-tty",
306 "--trust-model", "always",
307 "--fixed-list-mode"]
308 for k in self.keyrings:
309 args.extend(["--keyring", k])
310 args.extend(["--decrypt", "-"])
312 os.execvp(self.gpg, args)
313 finally:
314 try:
315 print("Failed to execute gpg.", file=sys.stderr)
316 sys.stderr.flush()
317 except:
318 # Ignore errors, we want to reach the `exit` call below.
319 pass
320 os._exit(3)
322 @property
323 def contents_sha1(self) -> str:
324 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
327def sign(infile, outfile=None, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None, *, digest_algorithm="SHA256"):
328 args = [
329 '/usr/bin/gpg',
330 '--no-options', '--no-tty', '--batch', '--armour',
331 '--personal-digest-preferences', digest_algorithm,
332 ]
334 for keyid in keyids:
335 args.extend(['--local-user', keyid])
336 if pubring is not None: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true
337 args.extend(['--keyring', pubring])
338 if secring is not None: 338 ↛ 339line 338 didn't jump to line 339, because the condition on line 338 was never true
339 args.extend(['--secret-keyring', secring])
340 if homedir is not None: 340 ↛ 342line 340 didn't jump to line 342, because the condition on line 340 was never false
341 args.extend(['--homedir', homedir])
342 if passphrase_file is not None: 342 ↛ 343line 342 didn't jump to line 343, because the condition on line 342 was never true
343 args.extend(['--pinentry-mode', 'loopback',
344 '--passphrase-file', passphrase_file])
346 args.append('--clearsign' if inline else '--detach-sign')
348 kwargs = {}
349 if isinstance(infile, str): 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true
350 infile = infile.encode('utf-8')
351 if isinstance(infile, bytes):
352 kwargs['input'] = infile
353 else:
354 kwargs['stdin'] = infile
355 if outfile is None:
356 kwargs['stdout'] = subprocess.PIPE
357 else:
358 kwargs['stdout'] = outfile
360 result = subprocess.run(args, check=True, **kwargs)
361 if outfile is None:
362 return result.stdout
364# vim: set sw=4 et: