1"""Utilities for signed files 

2 

3@contact: Debian FTP Master <ftpmaster@debian.org> 

4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org> 

5@license: GNU General Public License version 2 or later 

6""" 

7 

8# This program is free software; you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation; either version 2 of the License, or 

11# (at your option) any later version. 

12 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17 

18# You should have received a copy of the GNU General Public License 

19# along with this program; if not, write to the Free Software 

20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

21 

22import datetime 

23import fcntl 

24import os 

25import select 

26import subprocess 

27import sys 

28from collections.abc import Iterable 

29from contextlib import ExitStack 

30from tempfile import NamedTemporaryFile 

31from typing import Optional 

32 

33import apt_pkg 

34 

35try: 

36 _MAXFD = os.sysconf("SC_OPEN_MAX") 

37except: 

38 _MAXFD = 256 

39 

40 

41class GpgException(Exception): 

42 pass 

43 

44 

45class _Pipe: 

46 """context manager for pipes 

47 

48 Note: When the pipe is closed by other means than the close_r and close_w 

49 methods, you have to set self.r (self.w) to None. 

50 """ 

51 

52 def __enter__(self): 

53 (self.r, self.w) = os.pipe() 

54 return self 

55 

56 def __exit__(self, type, value, traceback): 

57 self.close_w() 

58 self.close_r() 

59 return False 

60 

61 def close_r(self): 

62 """close reading side of the pipe""" 

63 if self.r: 

64 os.close(self.r) 

65 self.r = None 

66 

67 def close_w(self): 

68 """close writing part of the pipe""" 

69 if self.w: 

70 os.close(self.w) 

71 self.w = None 

72 

73 

74# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9) 

75def waitstatus_to_exitcode(status): 

76 if os.WIFEXITED(status): 76 ↛ 78line 76 didn't jump to line 78, because the condition on line 76 was never false

77 return os.WEXITSTATUS(status) 

78 elif os.WIFSIGNALED(status): 

79 return -os.WTERMSIG(status) 

80 else: 

81 raise ValueError(f"Unexpected status code '{status}'") 

82 

83 

84def _create_named_temporary_file_with_contents(contents: bytes) -> NamedTemporaryFile: 

85 f = NamedTemporaryFile() 

86 f.write(contents) 

87 f.flush() 

88 return f 

89 

90 

91class SignedFile: 

92 """handle files signed with PGP 

93 

94 The following attributes are available: 

95 contents - byte-string with the content (after removing PGP armor) 

96 valid - Boolean indicating a valid signature was found 

97 weak_signature - signature uses a weak algorithm (e.g. SHA-1) 

98 fingerprint - fingerprint of the key used for signing 

99 primary_fingerprint - fingerprint of the primary key associated to the key used for signing 

100 """ 

101 

102 def __init__( 

103 self, 

104 data: bytes, 

105 keyrings: Iterable[str], 

106 require_signature: bool = True, 

107 *, 

108 detached_signature: Optional[bytes] = None, 

109 gpg: str = "/usr/bin/gpg", 

110 ): 

111 """ 

112 :param data: byte-string containing the message 

113 :param keyrings: sequence of keyrings 

114 :param require_signature: if True (the default), will raise an exception if no valid signature was found 

115 :param detached_signature: validate detached signature if set; otherwise an inline signature is expected 

116 :param gpg: location of the gpg binary 

117 """ 

118 self.gpg = gpg 

119 self.keyrings = keyrings 

120 

121 self.valid: bool = False #: valid signature 

122 self.expired = False 

123 self.invalid = False 

124 self.weak_signature = False 

125 self.fingerprints: list[str] = [] 

126 self.primary_fingerprints: list[str] = [] 

127 self.signature_ids: list[str] = [] 

128 

129 self._verify(data, detached_signature, require_signature) 

130 

131 @property 

132 def fingerprint(self) -> str: 

133 """fingerprint of the (sub)key used for the signature""" 

134 assert len(self.fingerprints) == 1 

135 return self.fingerprints[0] 

136 

137 @property 

138 def primary_fingerprint(self) -> str: 

139 """fingerprint of the primary key used for the signature""" 

140 assert len(self.primary_fingerprints) == 1 

141 return self.primary_fingerprints[0] 

142 

143 @property 

144 def signature_id(self): 

145 assert len(self.signature_ids) == 1 

146 return self.signature_ids[0] 

147 

148 def _verify( 

149 self, data: bytes, detached_signature: Optional[bytes], require_signature: bool 

150 ): 

151 with _Pipe() as stdin, _Pipe() as contents, _Pipe() as status, _Pipe() as stderr, ExitStack() as stack: 

152 detached_signature_file = ( 

153 stack.enter_context( 

154 _create_named_temporary_file_with_contents(detached_signature) 

155 ) 

156 if detached_signature is not None 

157 else None 

158 ) 

159 

160 pid = os.fork() 

161 if pid == 0: 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true

162 self._exec_gpg( 

163 stdin.r, contents.w, stderr.w, status.w, detached_signature_file 

164 ) 

165 else: 

166 stdin.close_r() 

167 contents.close_w() 

168 stderr.close_w() 

169 status.close_w() 

170 

171 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data}) 

172 stdin.w = None # was closed by _do_io 

173 

174 (pid_, status_code, usage_) = os.wait4(pid, 0) 

175 if pid_ != pid: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true

176 raise Exception( 

177 f"wait4() waited for pid {pid_}, but we expected {pid}" 

178 ) 

179 exit_code = waitstatus_to_exitcode(status_code) 

180 

181 if detached_signature is None: 

182 self.contents = read[contents.r] 

183 elif read[contents.r]: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true

184 raise GpgException( 

185 "GPG wrote data to stdout when verifying detached signature." 

186 ) 

187 else: 

188 self.contents = data 

189 

190 self.status = read[status.r] 

191 self.stderr = read[stderr.r] 

192 

193 if self.status == b"": 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true

194 stderr = self.stderr.decode("ascii", errors="replace") 

195 raise GpgException( 

196 "No status output from GPG. (GPG exited with status code %s)\n%s" 

197 % (exit_code, stderr) 

198 ) 

199 

200 # gpg exits with 0 (no error), 1 (at least one invalid sig), 

201 # or anything else (fatal error). Even though status 2 

202 # indicates a fatal error, we still allow it as it is also 

203 # returned when the public key is not known. 

204 if exit_code not in (0, 1, 2): 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true

205 stderr = self.stderr.decode("ascii", errors="replace") 

206 raise GpgException( 

207 f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}" 

208 ) 

209 

210 self._seen_goodsig = 0 

211 

212 for line in self.status.splitlines(): 

213 self._parse_status(line) 

214 

215 if self.invalid or self._seen_goodsig != len(self.fingerprints): 

216 self.valid = False 

217 

218 if require_signature and not self.valid: 

219 stderr = self.stderr.decode("ascii", errors="replace") 

220 raise GpgException( 

221 "No valid signature found. (GPG exited with status code %s)\n%s" 

222 % (exit_code, stderr) 

223 ) 

224 

225 assert len(self.fingerprints) == len(self.primary_fingerprints) 

226 assert len(self.fingerprints) == len(self.signature_ids) 

227 

228 def _do_io(self, read, write): 

229 for fd in write: 

230 old = fcntl.fcntl(fd, fcntl.F_GETFL) 

231 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK) 

232 

233 read_lines = dict((fd, []) for fd in read) 

234 write_pos = dict((fd, 0) for fd in write) 

235 

236 read_set = list(read) 

237 write_set = list(write) 

238 while len(read_set) > 0 or len(write_set) > 0: 

239 r, w, x_ = select.select(read_set, write_set, ()) 

240 for fd in r: 

241 data = os.read(fd, 4096) 

242 if len(data) == 0: 

243 read_set.remove(fd) 

244 else: 

245 read_lines[fd].append(data) 

246 for fd in w: 

247 data = write[fd][write_pos[fd] :] 

248 if len(data) == 0: 

249 os.close(fd) 

250 write_set.remove(fd) 

251 else: 

252 bytes_written = os.write(fd, data) 

253 write_pos[fd] += bytes_written 

254 

255 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines) 

256 

257 def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime: 

258 """parse timestamp in GnuPG's format 

259 

260 :return: datetime object for the given timestamp 

261 """ 

262 # The old implementation did only return the date. As we already 

263 # used this for replay production, return the legacy value for 

264 # old signatures. 

265 if datestring is not None: 265 ↛ 272line 265 didn't jump to line 272, because the condition on line 265 was never false

266 year, month, day = datestring.split(b"-") 

267 date = datetime.date(int(year), int(month), int(day)) 

268 time = datetime.time(0, 0) 

269 if date < datetime.date(2014, 8, 4): 

270 return datetime.datetime.combine(date, time) 

271 

272 if b"T" in timestamp: 272 ↛ 273line 272 didn't jump to line 273, because the condition on line 272 was never true

273 raise Exception("No support for ISO 8601 timestamps.") 

274 return datetime.datetime.utcfromtimestamp(int(timestamp)) 

275 

276 def _parse_status(self, line): 

277 fields = line.split() 

278 if fields[0] != b"[GNUPG:]": 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true

279 raise GpgException("Unexpected output on status-fd: %s" % line) 

280 

281 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp> 

282 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo> 

283 # <hash-algo> <sig-class> <primary-key-fpr> 

284 if fields[1] == b"VALIDSIG": 

285 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, 

286 # which Debian 8 does not yet include. We want to make sure 

287 # to not accept uploads covered by a MD5-based signature. 

288 # RFC 4880, table 9.4: 

289 # 1 - MD5 

290 # 2 - SHA-1 

291 # 3 - RIPE-MD/160 

292 if fields[9] == b"1": 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true

293 raise GpgException("Digest algorithm MD5 is not trusted.") 

294 if fields[9] in (b"2", b"3"): 

295 self.weak_signature = True 

296 

297 self.valid = True 

298 self.fingerprints.append(fields[2].decode("ascii")) 

299 self.primary_fingerprints.append(fields[11].decode("ascii")) 

300 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) 

301 

302 elif fields[1] == b"BADARMOR": 

303 raise GpgException("Bad armor.") 

304 

305 elif fields[1] == b"NODATA": 

306 raise GpgException("No data.") 

307 

308 elif fields[1] == b"DECRYPTION_FAILED": 308 ↛ 309line 308 didn't jump to line 309, because the condition on line 308 was never true

309 raise GpgException("Decryption failed.") 

310 

311 elif fields[1] == b"ERROR": 311 ↛ 312line 311 didn't jump to line 312, because the condition on line 311 was never true

312 f2 = fields[2].decode("ascii", errors="replace") 

313 f3 = fields[3].decode("ascii", errors="replace") 

314 raise GpgException("Other error: %s %s" % (f2, f3)) 

315 

316 elif fields[1] == b"SIG_ID": 

317 self.signature_ids.append(fields[2]) 

318 

319 elif fields[1] == b"GOODSIG": 

320 self._seen_goodsig += 1 

321 

322 elif fields[1] in ( 

323 b"PLAINTEXT", 

324 b"KEY_CONSIDERED", 

325 b"NEWSIG", 

326 b"NOTATION_NAME", 

327 b"NOTATION_FLAGS", 

328 b"NOTATION_DATA", 

329 b"SIGEXPIRED", 

330 b"KEYEXPIRED", 

331 b"POLICY_URL", 

332 b"PROGRESS", 

333 b"VERIFICATION_COMPLIANCE_MODE", 

334 ): 

335 pass 

336 

337 elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"): 

338 self.expired = True 

339 self.invalid = True 

340 

341 elif fields[1] in ( 

342 b"REVKEYSIG", 

343 b"BADSIG", 

344 b"ERRSIG", 

345 b"KEYREVOKED", 

346 b"NO_PUBKEY", 

347 ): 

348 self.invalid = True 

349 

350 else: 

351 field = fields[1].decode("ascii", errors="replace") 

352 raise GpgException( 

353 "Keyword '{0}' from GnuPG was not expected.".format(field) 

354 ) 

355 

356 def _exec_gpg( 

357 self, 

358 stdin, 

359 stdout, 

360 stderr, 

361 statusfd, 

362 detached_signature_file: Optional[NamedTemporaryFile], 

363 ): 

364 try: 

365 if stdin != 0: 

366 os.dup2(stdin, 0) 

367 if stdout != 1: 

368 os.dup2(stdout, 1) 

369 if stderr != 2: 

370 os.dup2(stderr, 2) 

371 if statusfd != 3: 

372 os.dup2(statusfd, 3) 

373 for fd in range(4): 

374 old = fcntl.fcntl(fd, fcntl.F_GETFD) 

375 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) 

376 os.closerange(4, _MAXFD) 

377 

378 args = [ 

379 self.gpg, 

380 "--status-fd=3", 

381 "--no-default-keyring", 

382 "--batch", 

383 "--no-tty", 

384 "--trust-model", 

385 "always", 

386 "--fixed-list-mode", 

387 ] 

388 for k in self.keyrings: 

389 args.extend(["--keyring", k]) 

390 

391 if detached_signature_file is not None: 

392 args.extend(["--verify", detached_signature_file.name, "-"]) 

393 else: 

394 args.extend(["--output", "-", "--verify", "-"]) 

395 

396 os.execvp(self.gpg, args) 

397 finally: 

398 try: 

399 print("Failed to execute gpg.", file=sys.stderr) 

400 sys.stderr.flush() 

401 except: 

402 # Ignore errors, we want to reach the `exit` call below. 

403 pass 

404 os._exit(3) 

405 

406 @property 

407 def contents_sha1(self) -> str: 

408 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined] 

409 

410 

411def sign( 

412 infile, 

413 outfile=None, 

414 keyids=[], 

415 inline=False, 

416 pubring=None, 

417 secring=None, 

418 homedir=None, 

419 passphrase_file=None, 

420 *, 

421 digest_algorithm="SHA256", 

422): 

423 args = [ 

424 "/usr/bin/gpg", 

425 "--no-options", 

426 "--no-tty", 

427 "--batch", 

428 "--armour", 

429 "--personal-digest-preferences", 

430 digest_algorithm, 

431 ] 

432 

433 for keyid in keyids: 

434 args.extend(["--local-user", keyid]) 

435 if pubring is not None: 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true

436 args.extend(["--keyring", pubring]) 

437 if secring is not None: 437 ↛ 438line 437 didn't jump to line 438, because the condition on line 437 was never true

438 args.extend(["--secret-keyring", secring]) 

439 if homedir is not None: 439 ↛ 441line 439 didn't jump to line 441, because the condition on line 439 was never false

440 args.extend(["--homedir", homedir]) 

441 if passphrase_file is not None: 441 ↛ 442line 441 didn't jump to line 442, because the condition on line 441 was never true

442 args.extend( 

443 ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file] 

444 ) 

445 

446 args.append("--clearsign" if inline else "--detach-sign") 

447 

448 kwargs = {} 

449 if isinstance(infile, str): 449 ↛ 450line 449 didn't jump to line 450, because the condition on line 449 was never true

450 infile = infile.encode("utf-8") 

451 if isinstance(infile, bytes): 

452 kwargs["input"] = infile 

453 else: 

454 kwargs["stdin"] = infile 

455 if outfile is None: 

456 kwargs["stdout"] = subprocess.PIPE 

457 else: 

458 kwargs["stdout"] = outfile 

459 

460 result = subprocess.run(args, check=True, **kwargs) 

461 if outfile is None: 

462 return result.stdout 

463 

464 

465# vim: set sw=4 et: