Source code for daklib.gpg

"""Utilities for signed files

@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2011-2018  Ansgar Burchardt <ansgar@debian.org>
@license: GNU General Public License version 2 or later
"""

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

import apt_pkg
import datetime
import fcntl
import os
import select
import subprocess
from collections.abc import Iterable

try:
    _MAXFD = os.sysconf("SC_OPEN_MAX")
except:
    _MAXFD = 256


[docs]class GpgException(Exception): pass
[docs]class _Pipe: """context manager for pipes Note: When the pipe is closed by other means than the close_r and close_w methods, you have to set self.r (self.w) to None. """ def __enter__(self): (self.r, self.w) = os.pipe() return self def __exit__(self, type, value, traceback): self.close_w() self.close_r() return False
[docs] def close_r(self): """close reading side of the pipe""" if self.r: os.close(self.r) self.r = None
[docs] def close_w(self): """close writing part of the pipe""" if self.w: os.close(self.w) self.w = None
# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
[docs]def waitstatus_to_exitcode(status): if os.WIFEXITED(status): return os.WEXITSTATUS(status) elif os.WIFSIGNALED(status): return -os.WTERMSIG(status) else: raise ValueError(f"Unexpected status code '{status}'")
[docs]class SignedFile: """handle files signed with PGP The following attributes are available: contents - byte-string with the content (after removing PGP armor) valid - Boolean indicating a valid signature was found weak_signature - signature uses a weak algorithm (e.g. SHA-1) fingerprint - fingerprint of the key used for signing primary_fingerprint - fingerprint of the primary key associated to the key used for signing """ def __init__(self, data: bytes, keyrings: Iterable[str], require_signature: bool = True, gpg: str = "/usr/bin/gpg"): """ :param data: byte-string containing the message :param keyrings: sequence of keyrings :param require_signature: if True (the default), will raise an exception if no valid signature was found :param gpg: location of the gpg binary """ self.gpg = gpg self.keyrings = keyrings self.valid: bool = False #: valid signature self.expired = False self.invalid = False self.weak_signature = False self.fingerprints: list[str] = [] self.primary_fingerprints: list[str] = [] self.signature_ids: list[str] = [] self._verify(data, require_signature) @property def fingerprint(self) -> str: """fingerprint of the (sub)key used for the signature""" assert len(self.fingerprints) == 1 return self.fingerprints[0] @property def primary_fingerprint(self) -> str: """fingerprint of the primary key used for the signature""" assert len(self.primary_fingerprints) == 1 return self.primary_fingerprints[0] @property def signature_id(self): assert len(self.signature_ids) == 1 return self.signature_ids[0]
[docs] def _verify(self, data, require_signature): with _Pipe() as stdin, \ _Pipe() as contents, \ _Pipe() as status, \ _Pipe() as stderr: pid = os.fork() if pid == 0: self._exec_gpg(stdin.r, contents.w, stderr.w, status.w) else: stdin.close_r() contents.close_w() stderr.close_w() status.close_w() read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data}) stdin.w = None # was closed by _do_io (pid_, status_code, usage_) = os.wait4(pid, 0) if pid_ != pid: raise Exception(f"wait4() waited for pid {pid_}, but we expected {pid}") exit_code = waitstatus_to_exitcode(status_code) self.contents = read[contents.r] self.status = read[status.r] self.stderr = read[stderr.r] if self.status == b"": stderr = self.stderr.decode('ascii', errors='replace') raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr)) # gpg exits with 0 (no error), 1 (at least one invalid sig), # or anything else (fatal error). Even though status 2 # indicates a fatal error, we still allow it as it is also # returned when the public key is not known. if exit_code not in (0, 1, 2): stderr = self.stderr.decode('ascii', errors='replace') raise GpgException(f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}") for line in self.status.splitlines(): self._parse_status(line) if self.invalid: self.valid = False if require_signature and not self.valid: stderr = self.stderr.decode('ascii', errors='replace') raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr)) assert len(self.fingerprints) == len(self.primary_fingerprints) assert len(self.fingerprints) == len(self.signature_ids)
[docs] def _do_io(self, read, write): for fd in write: old = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK) read_lines = dict((fd, []) for fd in read) write_pos = dict((fd, 0) for fd in write) read_set = list(read) write_set = list(write) while len(read_set) > 0 or len(write_set) > 0: r, w, x_ = select.select(read_set, write_set, ()) for fd in r: data = os.read(fd, 4096) if len(data) == 0: read_set.remove(fd) else: read_lines[fd].append(data) for fd in w: data = write[fd][write_pos[fd]:] if len(data) == 0: os.close(fd) write_set.remove(fd) else: bytes_written = os.write(fd, data) write_pos[fd] += bytes_written return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
[docs] def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime: """parse timestamp in GnuPG's format :return: datetime object for the given timestamp """ # The old implementation did only return the date. As we already # used this for replay production, return the legacy value for # old signatures. if datestring is not None: year, month, day = datestring.split(b'-') date = datetime.date(int(year), int(month), int(day)) time = datetime.time(0, 0) if date < datetime.date(2014, 8, 4): return datetime.datetime.combine(date, time) if b'T' in timestamp: raise Exception('No support for ISO 8601 timestamps.') return datetime.datetime.utcfromtimestamp(int(timestamp))
[docs] def _parse_status(self, line): fields = line.split() if fields[0] != b"[GNUPG:]": raise GpgException("Unexpected output on status-fd: %s" % line) # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp> # <expire-timestamp> <sig-version> <reserved> <pubkey-algo> # <hash-algo> <sig-class> <primary-key-fpr> if fields[1] == b"VALIDSIG": # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, # which Debian 8 does not yet include. We want to make sure # to not accept uploads covered by a MD5-based signature. # RFC 4880, table 9.4: # 1 - MD5 # 2 - SHA-1 # 3 - RIPE-MD/160 if fields[9] == b"1": raise GpgException("Digest algorithm MD5 is not trusted.") if fields[9] in (b"2", b"3"): self.weak_signature = True self.valid = True self.fingerprints.append(fields[2].decode('ascii')) self.primary_fingerprints.append(fields[11].decode('ascii')) self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) elif fields[1] == b"BADARMOR": raise GpgException("Bad armor.") elif fields[1] == b"NODATA": raise GpgException("No data.") elif fields[1] == b"DECRYPTION_FAILED": raise GpgException("Decryption failed.") elif fields[1] == b"ERROR": f2 = fields[2].decode('ascii', errors='replace') f3 = fields[3].decode('ascii', errors='replace') raise GpgException("Other error: %s %s" % (f2, f3)) elif fields[1] == b"SIG_ID": self.signature_ids.append(fields[2]) elif fields[1] in (b'PLAINTEXT', b'GOODSIG', b'KEY_CONSIDERED', b'NEWSIG', b'NOTATION_NAME', b'NOTATION_FLAGS', b'NOTATION_DATA', b'SIGEXPIRED', b'KEYEXPIRED', b'POLICY_URL', b'PROGRESS', b'VERIFICATION_COMPLIANCE_MODE'): pass elif fields[1] in (b'EXPSIG', b'EXPKEYSIG'): self.expired = True self.invalid = True elif fields[1] in (b'REVKEYSIG', b'BADSIG', b'ERRSIG', b'KEYREVOKED', b'NO_PUBKEY'): self.invalid = True else: field = fields[1].decode('ascii', errors='replace') raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(field))
[docs] def _exec_gpg(self, stdin, stdout, stderr, statusfd): try: if stdin != 0: os.dup2(stdin, 0) if stdout != 1: os.dup2(stdout, 1) if stderr != 2: os.dup2(stderr, 2) if statusfd != 3: os.dup2(statusfd, 3) for fd in range(4): old = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) os.closerange(4, _MAXFD) args = [self.gpg, "--status-fd=3", "--no-default-keyring", "--batch", "--no-tty", "--trust-model", "always", "--fixed-list-mode"] for k in self.keyrings: args.extend(["--keyring", k]) args.extend(["--decrypt", "-"]) os.execvp(self.gpg, args) finally: try: print("Failed to execute gpg.", file=sys.stderr) sys.stderr.flush() except: # Ignore errors, we want to reach the `exit` call below. pass os._exit(3)
@property def contents_sha1(self) -> str: return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
[docs]def sign(infile, outfile=None, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None, *, digest_algorithm="SHA256"): args = [ '/usr/bin/gpg', '--no-options', '--no-tty', '--batch', '--armour', '--personal-digest-preferences', digest_algorithm, ] for keyid in keyids: args.extend(['--local-user', keyid]) if pubring is not None: args.extend(['--keyring', pubring]) if secring is not None: args.extend(['--secret-keyring', secring]) if homedir is not None: args.extend(['--homedir', homedir]) if passphrase_file is not None: args.extend(['--pinentry-mode', 'loopback', '--passphrase-file', passphrase_file]) args.append('--clearsign' if inline else '--detach-sign') kwargs = {} if isinstance(infile, str): infile = infile.encode('utf-8') if isinstance(infile, bytes): kwargs['input'] = infile else: kwargs['stdin'] = infile if outfile is None: kwargs['stdout'] = subprocess.PIPE else: kwargs['stdout'] = outfile result = subprocess.run(args, check=True, **kwargs) if outfile is None: return result.stdout
# vim: set sw=4 et: