"""Utilities for signed files
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import datetime
import fcntl
import os
import select
import subprocess
import sys
from collections.abc import Iterable
import apt_pkg
try:
_MAXFD = os.sysconf("SC_OPEN_MAX")
except:
_MAXFD = 256
[docs]class GpgException(Exception):
pass
[docs]class _Pipe:
"""context manager for pipes
Note: When the pipe is closed by other means than the close_r and close_w
methods, you have to set self.r (self.w) to None.
"""
def __enter__(self):
(self.r, self.w) = os.pipe()
return self
def __exit__(self, type, value, traceback):
self.close_w()
self.close_r()
return False
[docs] def close_r(self):
"""close reading side of the pipe"""
if self.r:
os.close(self.r)
self.r = None
[docs] def close_w(self):
"""close writing part of the pipe"""
if self.w:
os.close(self.w)
self.w = None
# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9)
[docs]def waitstatus_to_exitcode(status):
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
else:
raise ValueError(f"Unexpected status code '{status}'")
[docs]class SignedFile:
"""handle files signed with PGP
The following attributes are available:
contents - byte-string with the content (after removing PGP armor)
valid - Boolean indicating a valid signature was found
weak_signature - signature uses a weak algorithm (e.g. SHA-1)
fingerprint - fingerprint of the key used for signing
primary_fingerprint - fingerprint of the primary key associated to the key used for signing
"""
def __init__(
self,
data: bytes,
keyrings: Iterable[str],
require_signature: bool = True,
gpg: str = "/usr/bin/gpg",
):
"""
:param data: byte-string containing the message
:param keyrings: sequence of keyrings
:param require_signature: if True (the default), will raise an exception if no valid signature was found
:param gpg: location of the gpg binary
"""
self.gpg = gpg
self.keyrings = keyrings
self.valid: bool = False #: valid signature
self.expired = False
self.invalid = False
self.weak_signature = False
self.fingerprints: list[str] = []
self.primary_fingerprints: list[str] = []
self.signature_ids: list[str] = []
self._verify(data, require_signature)
@property
def fingerprint(self) -> str:
"""fingerprint of the (sub)key used for the signature"""
assert len(self.fingerprints) == 1
return self.fingerprints[0]
@property
def primary_fingerprint(self) -> str:
"""fingerprint of the primary key used for the signature"""
assert len(self.primary_fingerprints) == 1
return self.primary_fingerprints[0]
@property
def signature_id(self):
assert len(self.signature_ids) == 1
return self.signature_ids[0]
[docs] def _verify(self, data, require_signature):
with _Pipe() as stdin, _Pipe() as contents, _Pipe() as status, _Pipe() as stderr:
pid = os.fork()
if pid == 0:
self._exec_gpg(stdin.r, contents.w, stderr.w, status.w)
else:
stdin.close_r()
contents.close_w()
stderr.close_w()
status.close_w()
read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data})
stdin.w = None # was closed by _do_io
(pid_, status_code, usage_) = os.wait4(pid, 0)
if pid_ != pid:
raise Exception(
f"wait4() waited for pid {pid_}, but we expected {pid}"
)
exit_code = waitstatus_to_exitcode(status_code)
self.contents = read[contents.r]
self.status = read[status.r]
self.stderr = read[stderr.r]
if self.status == b"":
stderr = self.stderr.decode("ascii", errors="replace")
raise GpgException(
"No status output from GPG. (GPG exited with status code %s)\n%s"
% (exit_code, stderr)
)
# gpg exits with 0 (no error), 1 (at least one invalid sig),
# or anything else (fatal error). Even though status 2
# indicates a fatal error, we still allow it as it is also
# returned when the public key is not known.
if exit_code not in (0, 1, 2):
stderr = self.stderr.decode("ascii", errors="replace")
raise GpgException(
f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}"
)
for line in self.status.splitlines():
self._parse_status(line)
if self.invalid:
self.valid = False
if require_signature and not self.valid:
stderr = self.stderr.decode("ascii", errors="replace")
raise GpgException(
"No valid signature found. (GPG exited with status code %s)\n%s"
% (exit_code, stderr)
)
assert len(self.fingerprints) == len(self.primary_fingerprints)
assert len(self.fingerprints) == len(self.signature_ids)
[docs] def _do_io(self, read, write):
for fd in write:
old = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK)
read_lines = dict((fd, []) for fd in read)
write_pos = dict((fd, 0) for fd in write)
read_set = list(read)
write_set = list(write)
while len(read_set) > 0 or len(write_set) > 0:
r, w, x_ = select.select(read_set, write_set, ())
for fd in r:
data = os.read(fd, 4096)
if len(data) == 0:
read_set.remove(fd)
else:
read_lines[fd].append(data)
for fd in w:
data = write[fd][write_pos[fd] :]
if len(data) == 0:
os.close(fd)
write_set.remove(fd)
else:
bytes_written = os.write(fd, data)
write_pos[fd] += bytes_written
return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
[docs] def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime:
"""parse timestamp in GnuPG's format
:return: datetime object for the given timestamp
"""
# The old implementation did only return the date. As we already
# used this for replay production, return the legacy value for
# old signatures.
if datestring is not None:
year, month, day = datestring.split(b"-")
date = datetime.date(int(year), int(month), int(day))
time = datetime.time(0, 0)
if date < datetime.date(2014, 8, 4):
return datetime.datetime.combine(date, time)
if b"T" in timestamp:
raise Exception("No support for ISO 8601 timestamps.")
return datetime.datetime.utcfromtimestamp(int(timestamp))
[docs] def _parse_status(self, line):
fields = line.split()
if fields[0] != b"[GNUPG:]":
raise GpgException("Unexpected output on status-fd: %s" % line)
# VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
# <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
# <hash-algo> <sig-class> <primary-key-fpr>
if fields[1] == b"VALIDSIG":
# GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
# which Debian 8 does not yet include. We want to make sure
# to not accept uploads covered by a MD5-based signature.
# RFC 4880, table 9.4:
# 1 - MD5
# 2 - SHA-1
# 3 - RIPE-MD/160
if fields[9] == b"1":
raise GpgException("Digest algorithm MD5 is not trusted.")
if fields[9] in (b"2", b"3"):
self.weak_signature = True
self.valid = True
self.fingerprints.append(fields[2].decode("ascii"))
self.primary_fingerprints.append(fields[11].decode("ascii"))
self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
elif fields[1] == b"BADARMOR":
raise GpgException("Bad armor.")
elif fields[1] == b"NODATA":
raise GpgException("No data.")
elif fields[1] == b"DECRYPTION_FAILED":
raise GpgException("Decryption failed.")
elif fields[1] == b"ERROR":
f2 = fields[2].decode("ascii", errors="replace")
f3 = fields[3].decode("ascii", errors="replace")
raise GpgException("Other error: %s %s" % (f2, f3))
elif fields[1] == b"SIG_ID":
self.signature_ids.append(fields[2])
elif fields[1] in (
b"PLAINTEXT",
b"GOODSIG",
b"KEY_CONSIDERED",
b"NEWSIG",
b"NOTATION_NAME",
b"NOTATION_FLAGS",
b"NOTATION_DATA",
b"SIGEXPIRED",
b"KEYEXPIRED",
b"POLICY_URL",
b"PROGRESS",
b"VERIFICATION_COMPLIANCE_MODE",
):
pass
elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"):
self.expired = True
self.invalid = True
elif fields[1] in (
b"REVKEYSIG",
b"BADSIG",
b"ERRSIG",
b"KEYREVOKED",
b"NO_PUBKEY",
):
self.invalid = True
else:
field = fields[1].decode("ascii", errors="replace")
raise GpgException(
"Keyword '{0}' from GnuPG was not expected.".format(field)
)
[docs] def _exec_gpg(self, stdin, stdout, stderr, statusfd):
try:
if stdin != 0:
os.dup2(stdin, 0)
if stdout != 1:
os.dup2(stdout, 1)
if stderr != 2:
os.dup2(stderr, 2)
if statusfd != 3:
os.dup2(statusfd, 3)
for fd in range(4):
old = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC)
os.closerange(4, _MAXFD)
args = [
self.gpg,
"--status-fd=3",
"--no-default-keyring",
"--batch",
"--no-tty",
"--trust-model",
"always",
"--fixed-list-mode",
]
for k in self.keyrings:
args.extend(["--keyring", k])
args.extend(["--decrypt", "-"])
os.execvp(self.gpg, args)
finally:
try:
print("Failed to execute gpg.", file=sys.stderr)
sys.stderr.flush()
except:
# Ignore errors, we want to reach the `exit` call below.
pass
os._exit(3)
@property
def contents_sha1(self) -> str:
return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined]
[docs]def sign(
infile,
outfile=None,
keyids=[],
inline=False,
pubring=None,
secring=None,
homedir=None,
passphrase_file=None,
*,
digest_algorithm="SHA256",
):
args = [
"/usr/bin/gpg",
"--no-options",
"--no-tty",
"--batch",
"--armour",
"--personal-digest-preferences",
digest_algorithm,
]
for keyid in keyids:
args.extend(["--local-user", keyid])
if pubring is not None:
args.extend(["--keyring", pubring])
if secring is not None:
args.extend(["--secret-keyring", secring])
if homedir is not None:
args.extend(["--homedir", homedir])
if passphrase_file is not None:
args.extend(
["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file]
)
args.append("--clearsign" if inline else "--detach-sign")
kwargs = {}
if isinstance(infile, str):
infile = infile.encode("utf-8")
if isinstance(infile, bytes):
kwargs["input"] = infile
else:
kwargs["stdin"] = infile
if outfile is None:
kwargs["stdout"] = subprocess.PIPE
else:
kwargs["stdout"] = outfile
result = subprocess.run(args, check=True, **kwargs)
if outfile is None:
return result.stdout
# vim: set sw=4 et: