Source code for daklib.gpg

"""Utilities for signed files

@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2011-2018  Ansgar Burchardt <ansgar@debian.org>
@license: GNU General Public License version 2 or later
"""

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

import datetime
import fcntl
import os
import select
import subprocess
import sys
from collections.abc import Iterable
from contextlib import ExitStack
from tempfile import NamedTemporaryFile
from typing import Optional

import apt_pkg

try:
    _MAXFD = os.sysconf("SC_OPEN_MAX")
except:
    _MAXFD = 256


[docs]class GpgException(Exception): pass
[docs]class _Pipe: """context manager for pipes Note: When the pipe is closed by other means than the close_r and close_w methods, you have to set self.r (self.w) to None. """ def __enter__(self): (self.r, self.w) = os.pipe() return self def __exit__(self, type, value, traceback): self.close_w() self.close_r() return False
[docs] def close_r(self): """close reading side of the pipe""" if self.r: os.close(self.r) self.r = None
[docs] def close_w(self): """close writing part of the pipe""" if self.w: os.close(self.w) self.w = None
# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
[docs]def waitstatus_to_exitcode(status): if os.WIFEXITED(status): return os.WEXITSTATUS(status) elif os.WIFSIGNALED(status): return -os.WTERMSIG(status) else: raise ValueError(f"Unexpected status code '{status}'")
[docs]def _create_named_temporary_file_with_contents(contents: bytes) -> NamedTemporaryFile: f = NamedTemporaryFile() f.write(contents) f.flush() return f
[docs]class SignedFile: """handle files signed with PGP The following attributes are available: contents - byte-string with the content (after removing PGP armor) valid - Boolean indicating a valid signature was found weak_signature - signature uses a weak algorithm (e.g. SHA-1) fingerprint - fingerprint of the key used for signing primary_fingerprint - fingerprint of the primary key associated to the key used for signing """ def __init__( self, data: bytes, keyrings: Iterable[str], require_signature: bool = True, *, detached_signature: Optional[bytes] = None, gpg: str = "/usr/bin/gpg", ): """ :param data: byte-string containing the message :param keyrings: sequence of keyrings :param require_signature: if True (the default), will raise an exception if no valid signature was found :param detached_signature: validate detached signature if set; otherwise an inline signature is expected :param gpg: location of the gpg binary """ self.gpg = gpg self.keyrings = keyrings self.valid: bool = False #: valid signature self.expired = False self.invalid = False self.weak_signature = False self.fingerprints: list[str] = [] self.primary_fingerprints: list[str] = [] self.signature_ids: list[str] = [] self._verify(data, detached_signature, require_signature) @property def fingerprint(self) -> str: """fingerprint of the (sub)key used for the signature""" assert len(self.fingerprints) == 1 return self.fingerprints[0] @property def primary_fingerprint(self) -> str: """fingerprint of the primary key used for the signature""" assert len(self.primary_fingerprints) == 1 return self.primary_fingerprints[0] @property def signature_id(self): assert len(self.signature_ids) == 1 return self.signature_ids[0]
[docs] def _verify( self, data: bytes, detached_signature: Optional[bytes], require_signature: bool ): with _Pipe() as stdin, _Pipe() as contents, _Pipe() as status, _Pipe() as stderr, ExitStack() as stack: detached_signature_file = ( stack.enter_context( _create_named_temporary_file_with_contents(detached_signature) ) if detached_signature is not None else None ) pid = os.fork() if pid == 0: self._exec_gpg( stdin.r, contents.w, stderr.w, status.w, detached_signature_file ) else: stdin.close_r() contents.close_w() stderr.close_w() status.close_w() read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data}) stdin.w = None # was closed by _do_io (pid_, status_code, usage_) = os.wait4(pid, 0) if pid_ != pid: raise Exception( f"wait4() waited for pid {pid_}, but we expected {pid}" ) exit_code = waitstatus_to_exitcode(status_code) if detached_signature is None: self.contents = read[contents.r] elif read[contents.r]: raise GpgException( "GPG wrote data to stdout when verifying detached signature." ) else: self.contents = data self.status = read[status.r] self.stderr = read[stderr.r] if self.status == b"": stderr = self.stderr.decode("ascii", errors="replace") raise GpgException( "No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr) ) # gpg exits with 0 (no error), 1 (at least one invalid sig), # or anything else (fatal error). Even though status 2 # indicates a fatal error, we still allow it as it is also # returned when the public key is not known. if exit_code not in (0, 1, 2): stderr = self.stderr.decode("ascii", errors="replace") raise GpgException( f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}" ) self._seen_goodsig = 0 for line in self.status.splitlines(): self._parse_status(line) if self.invalid or self._seen_goodsig != len(self.fingerprints): self.valid = False if require_signature and not self.valid: stderr = self.stderr.decode("ascii", errors="replace") raise GpgException( "No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr) ) assert len(self.fingerprints) == len(self.primary_fingerprints) assert len(self.fingerprints) == len(self.signature_ids)
[docs] def _do_io(self, read, write): for fd in write: old = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK) read_lines = dict((fd, []) for fd in read) write_pos = dict((fd, 0) for fd in write) read_set = list(read) write_set = list(write) while len(read_set) > 0 or len(write_set) > 0: r, w, x_ = select.select(read_set, write_set, ()) for fd in r: data = os.read(fd, 4096) if len(data) == 0: read_set.remove(fd) else: read_lines[fd].append(data) for fd in w: data = write[fd][write_pos[fd] :] if len(data) == 0: os.close(fd) write_set.remove(fd) else: bytes_written = os.write(fd, data) write_pos[fd] += bytes_written return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
[docs] def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime: """parse timestamp in GnuPG's format :return: datetime object for the given timestamp """ # The old implementation did only return the date. As we already # used this for replay production, return the legacy value for # old signatures. if datestring is not None: year, month, day = datestring.split(b"-") date = datetime.date(int(year), int(month), int(day)) time = datetime.time(0, 0) if date < datetime.date(2014, 8, 4): return datetime.datetime.combine(date, time) if b"T" in timestamp: raise Exception("No support for ISO 8601 timestamps.") return datetime.datetime.utcfromtimestamp(int(timestamp))
[docs] def _parse_status(self, line): fields = line.split() if fields[0] != b"[GNUPG:]": raise GpgException("Unexpected output on status-fd: %s" % line) # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp> # <expire-timestamp> <sig-version> <reserved> <pubkey-algo> # <hash-algo> <sig-class> <primary-key-fpr> if fields[1] == b"VALIDSIG": # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, # which Debian 8 does not yet include. We want to make sure # to not accept uploads covered by a MD5-based signature. # RFC 4880, table 9.4: # 1 - MD5 # 2 - SHA-1 # 3 - RIPE-MD/160 if fields[9] == b"1": raise GpgException("Digest algorithm MD5 is not trusted.") if fields[9] in (b"2", b"3"): self.weak_signature = True self.valid = True self.fingerprints.append(fields[2].decode("ascii")) self.primary_fingerprints.append(fields[11].decode("ascii")) self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) elif fields[1] == b"BADARMOR": raise GpgException("Bad armor.") elif fields[1] == b"NODATA": raise GpgException("No data.") elif fields[1] == b"DECRYPTION_FAILED": raise GpgException("Decryption failed.") elif fields[1] == b"ERROR": f2 = fields[2].decode("ascii", errors="replace") f3 = fields[3].decode("ascii", errors="replace") raise GpgException("Other error: %s %s" % (f2, f3)) elif fields[1] == b"SIG_ID": self.signature_ids.append(fields[2]) elif fields[1] == b"GOODSIG": self._seen_goodsig += 1 elif fields[1] in ( b"PLAINTEXT", b"KEY_CONSIDERED", b"NEWSIG", b"NOTATION_NAME", b"NOTATION_FLAGS", b"NOTATION_DATA", b"SIGEXPIRED", b"KEYEXPIRED", b"POLICY_URL", b"PROGRESS", b"VERIFICATION_COMPLIANCE_MODE", ): pass elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"): self.expired = True self.invalid = True elif fields[1] in ( b"REVKEYSIG", b"BADSIG", b"ERRSIG", b"KEYREVOKED", b"NO_PUBKEY", ): self.invalid = True else: field = fields[1].decode("ascii", errors="replace") raise GpgException( "Keyword '{0}' from GnuPG was not expected.".format(field) )
[docs] def _exec_gpg( self, stdin, stdout, stderr, statusfd, detached_signature_file: Optional[NamedTemporaryFile], ): try: if stdin != 0: os.dup2(stdin, 0) if stdout != 1: os.dup2(stdout, 1) if stderr != 2: os.dup2(stderr, 2) if statusfd != 3: os.dup2(statusfd, 3) for fd in range(4): old = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) os.closerange(4, _MAXFD) args = [ self.gpg, "--status-fd=3", "--no-default-keyring", "--batch", "--no-tty", "--trust-model", "always", "--fixed-list-mode", ] for k in self.keyrings: args.extend(["--keyring", k]) if detached_signature_file is not None: args.extend(["--verify", detached_signature_file.name, "-"]) else: args.extend(["--output", "-", "--verify", "-"]) os.execvp(self.gpg, args) finally: try: print("Failed to execute gpg.", file=sys.stderr) sys.stderr.flush() except: # Ignore errors, we want to reach the `exit` call below. pass os._exit(3)
@property def contents_sha1(self) -> str: return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
[docs]def sign( infile, outfile=None, keyids=[], inline=False, pubring=None, secring=None, homedir=None, passphrase_file=None, *, digest_algorithm="SHA256", ): args = [ "/usr/bin/gpg", "--no-options", "--no-tty", "--batch", "--armour", "--personal-digest-preferences", digest_algorithm, ] for keyid in keyids: args.extend(["--local-user", keyid]) if pubring is not None: args.extend(["--keyring", pubring]) if secring is not None: args.extend(["--secret-keyring", secring]) if homedir is not None: args.extend(["--homedir", homedir]) if passphrase_file is not None: args.extend( ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file] ) args.append("--clearsign" if inline else "--detach-sign") kwargs = {} if isinstance(infile, str): infile = infile.encode("utf-8") if isinstance(infile, bytes): kwargs["input"] = infile else: kwargs["stdin"] = infile if outfile is None: kwargs["stdout"] = subprocess.PIPE else: kwargs["stdout"] = outfile result = subprocess.run(args, check=True, **kwargs) if outfile is None: return result.stdout
# vim: set sw=4 et: