1"""Utilities for signed files 

2 

3@contact: Debian FTP Master <ftpmaster@debian.org> 

4@copyright: 2011-2018 Ansgar Burchardt <ansgar@debian.org> 

5@license: GNU General Public License version 2 or later 

6""" 

7 

8# This program is free software; you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation; either version 2 of the License, or 

11# (at your option) any later version. 

12 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17 

18# You should have received a copy of the GNU General Public License 

19# along with this program; if not, write to the Free Software 

20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

21 

22import datetime 

23import fcntl 

24import os 

25import select 

26import subprocess 

27import sys 

28from collections.abc import Iterable 

29 

30import apt_pkg 

31 

32try: 

33 _MAXFD = os.sysconf("SC_OPEN_MAX") 

34except: 

35 _MAXFD = 256 

36 

37 

38class GpgException(Exception): 

39 pass 

40 

41 

42class _Pipe: 

43 """context manager for pipes 

44 

45 Note: When the pipe is closed by other means than the close_r and close_w 

46 methods, you have to set self.r (self.w) to None. 

47 """ 

48 

49 def __enter__(self): 

50 (self.r, self.w) = os.pipe() 

51 return self 

52 

53 def __exit__(self, type, value, traceback): 

54 self.close_w() 

55 self.close_r() 

56 return False 

57 

58 def close_r(self): 

59 """close reading side of the pipe""" 

60 if self.r: 

61 os.close(self.r) 

62 self.r = None 

63 

64 def close_w(self): 

65 """close writing part of the pipe""" 

66 if self.w: 

67 os.close(self.w) 

68 self.w = None 

69 

70 

71# Can be replaced with `os.waitstatus_to_exitcode` (Python 3.9) 

72def waitstatus_to_exitcode(status): 

73 if os.WIFEXITED(status): 73 ↛ 75line 73 didn't jump to line 75, because the condition on line 73 was never false

74 return os.WEXITSTATUS(status) 

75 elif os.WIFSIGNALED(status): 

76 return -os.WTERMSIG(status) 

77 else: 

78 raise ValueError(f"Unexpected status code '{status}'") 

79 

80 

81class SignedFile: 

82 """handle files signed with PGP 

83 

84 The following attributes are available: 

85 contents - byte-string with the content (after removing PGP armor) 

86 valid - Boolean indicating a valid signature was found 

87 weak_signature - signature uses a weak algorithm (e.g. SHA-1) 

88 fingerprint - fingerprint of the key used for signing 

89 primary_fingerprint - fingerprint of the primary key associated to the key used for signing 

90 """ 

91 

92 def __init__( 

93 self, 

94 data: bytes, 

95 keyrings: Iterable[str], 

96 require_signature: bool = True, 

97 gpg: str = "/usr/bin/gpg", 

98 ): 

99 """ 

100 :param data: byte-string containing the message 

101 :param keyrings: sequence of keyrings 

102 :param require_signature: if True (the default), will raise an exception if no valid signature was found 

103 :param gpg: location of the gpg binary 

104 """ 

105 self.gpg = gpg 

106 self.keyrings = keyrings 

107 

108 self.valid: bool = False #: valid signature 

109 self.expired = False 

110 self.invalid = False 

111 self.weak_signature = False 

112 self.fingerprints: list[str] = [] 

113 self.primary_fingerprints: list[str] = [] 

114 self.signature_ids: list[str] = [] 

115 

116 self._verify(data, require_signature) 

117 

118 @property 

119 def fingerprint(self) -> str: 

120 """fingerprint of the (sub)key used for the signature""" 

121 assert len(self.fingerprints) == 1 

122 return self.fingerprints[0] 

123 

124 @property 

125 def primary_fingerprint(self) -> str: 

126 """fingerprint of the primary key used for the signature""" 

127 assert len(self.primary_fingerprints) == 1 

128 return self.primary_fingerprints[0] 

129 

130 @property 

131 def signature_id(self): 

132 assert len(self.signature_ids) == 1 

133 return self.signature_ids[0] 

134 

135 def _verify(self, data, require_signature): 

136 with _Pipe() as stdin, _Pipe() as contents, _Pipe() as status, _Pipe() as stderr: 

137 pid = os.fork() 

138 if pid == 0: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true

139 self._exec_gpg(stdin.r, contents.w, stderr.w, status.w) 

140 else: 

141 stdin.close_r() 

142 contents.close_w() 

143 stderr.close_w() 

144 status.close_w() 

145 

146 read = self._do_io([contents.r, stderr.r, status.r], {stdin.w: data}) 

147 stdin.w = None # was closed by _do_io 

148 

149 (pid_, status_code, usage_) = os.wait4(pid, 0) 

150 if pid_ != pid: 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true

151 raise Exception( 

152 f"wait4() waited for pid {pid_}, but we expected {pid}" 

153 ) 

154 exit_code = waitstatus_to_exitcode(status_code) 

155 

156 self.contents = read[contents.r] 

157 self.status = read[status.r] 

158 self.stderr = read[stderr.r] 

159 

160 if self.status == b"": 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true

161 stderr = self.stderr.decode("ascii", errors="replace") 

162 raise GpgException( 

163 "No status output from GPG. (GPG exited with status code %s)\n%s" 

164 % (exit_code, stderr) 

165 ) 

166 

167 # gpg exits with 0 (no error), 1 (at least one invalid sig), 

168 # or anything else (fatal error). Even though status 2 

169 # indicates a fatal error, we still allow it as it is also 

170 # returned when the public key is not known. 

171 if exit_code not in (0, 1, 2): 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true

172 stderr = self.stderr.decode("ascii", errors="replace") 

173 raise GpgException( 

174 f"GPG exited with a fatal error (exit code {exit_code})\n{stderr}" 

175 ) 

176 

177 for line in self.status.splitlines(): 

178 self._parse_status(line) 

179 

180 if self.invalid: 

181 self.valid = False 

182 

183 if require_signature and not self.valid: 

184 stderr = self.stderr.decode("ascii", errors="replace") 

185 raise GpgException( 

186 "No valid signature found. (GPG exited with status code %s)\n%s" 

187 % (exit_code, stderr) 

188 ) 

189 

190 assert len(self.fingerprints) == len(self.primary_fingerprints) 

191 assert len(self.fingerprints) == len(self.signature_ids) 

192 

193 def _do_io(self, read, write): 

194 for fd in write: 

195 old = fcntl.fcntl(fd, fcntl.F_GETFL) 

196 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NONBLOCK) 

197 

198 read_lines = dict((fd, []) for fd in read) 

199 write_pos = dict((fd, 0) for fd in write) 

200 

201 read_set = list(read) 

202 write_set = list(write) 

203 while len(read_set) > 0 or len(write_set) > 0: 

204 r, w, x_ = select.select(read_set, write_set, ()) 

205 for fd in r: 

206 data = os.read(fd, 4096) 

207 if len(data) == 0: 

208 read_set.remove(fd) 

209 else: 

210 read_lines[fd].append(data) 

211 for fd in w: 

212 data = write[fd][write_pos[fd] :] 

213 if len(data) == 0: 

214 os.close(fd) 

215 write_set.remove(fd) 

216 else: 

217 bytes_written = os.write(fd, data) 

218 write_pos[fd] += bytes_written 

219 

220 return dict((fd, b"".join(read_lines[fd])) for fd in read_lines) 

221 

222 def _parse_timestamp(self, timestamp, datestring=None) -> datetime.datetime: 

223 """parse timestamp in GnuPG's format 

224 

225 :return: datetime object for the given timestamp 

226 """ 

227 # The old implementation did only return the date. As we already 

228 # used this for replay production, return the legacy value for 

229 # old signatures. 

230 if datestring is not None: 230 ↛ 237line 230 didn't jump to line 237, because the condition on line 230 was never false

231 year, month, day = datestring.split(b"-") 

232 date = datetime.date(int(year), int(month), int(day)) 

233 time = datetime.time(0, 0) 

234 if date < datetime.date(2014, 8, 4): 

235 return datetime.datetime.combine(date, time) 

236 

237 if b"T" in timestamp: 237 ↛ 238line 237 didn't jump to line 238, because the condition on line 237 was never true

238 raise Exception("No support for ISO 8601 timestamps.") 

239 return datetime.datetime.utcfromtimestamp(int(timestamp)) 

240 

241 def _parse_status(self, line): 

242 fields = line.split() 

243 if fields[0] != b"[GNUPG:]": 243 ↛ 244line 243 didn't jump to line 244, because the condition on line 243 was never true

244 raise GpgException("Unexpected output on status-fd: %s" % line) 

245 

246 # VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp> 

247 # <expire-timestamp> <sig-version> <reserved> <pubkey-algo> 

248 # <hash-algo> <sig-class> <primary-key-fpr> 

249 if fields[1] == b"VALIDSIG": 

250 # GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20, 

251 # which Debian 8 does not yet include. We want to make sure 

252 # to not accept uploads covered by a MD5-based signature. 

253 # RFC 4880, table 9.4: 

254 # 1 - MD5 

255 # 2 - SHA-1 

256 # 3 - RIPE-MD/160 

257 if fields[9] == b"1": 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true

258 raise GpgException("Digest algorithm MD5 is not trusted.") 

259 if fields[9] in (b"2", b"3"): 

260 self.weak_signature = True 

261 

262 self.valid = True 

263 self.fingerprints.append(fields[2].decode("ascii")) 

264 self.primary_fingerprints.append(fields[11].decode("ascii")) 

265 self.signature_timestamp = self._parse_timestamp(fields[4], fields[3]) 

266 

267 elif fields[1] == b"BADARMOR": 

268 raise GpgException("Bad armor.") 

269 

270 elif fields[1] == b"NODATA": 

271 raise GpgException("No data.") 

272 

273 elif fields[1] == b"DECRYPTION_FAILED": 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true

274 raise GpgException("Decryption failed.") 

275 

276 elif fields[1] == b"ERROR": 276 ↛ 277line 276 didn't jump to line 277, because the condition on line 276 was never true

277 f2 = fields[2].decode("ascii", errors="replace") 

278 f3 = fields[3].decode("ascii", errors="replace") 

279 raise GpgException("Other error: %s %s" % (f2, f3)) 

280 

281 elif fields[1] == b"SIG_ID": 

282 self.signature_ids.append(fields[2]) 

283 

284 elif fields[1] in ( 

285 b"PLAINTEXT", 

286 b"GOODSIG", 

287 b"KEY_CONSIDERED", 

288 b"NEWSIG", 

289 b"NOTATION_NAME", 

290 b"NOTATION_FLAGS", 

291 b"NOTATION_DATA", 

292 b"SIGEXPIRED", 

293 b"KEYEXPIRED", 

294 b"POLICY_URL", 

295 b"PROGRESS", 

296 b"VERIFICATION_COMPLIANCE_MODE", 

297 ): 

298 pass 

299 

300 elif fields[1] in (b"EXPSIG", b"EXPKEYSIG"): 

301 self.expired = True 

302 self.invalid = True 

303 

304 elif fields[1] in ( 

305 b"REVKEYSIG", 

306 b"BADSIG", 

307 b"ERRSIG", 

308 b"KEYREVOKED", 

309 b"NO_PUBKEY", 

310 ): 

311 self.invalid = True 

312 

313 else: 

314 field = fields[1].decode("ascii", errors="replace") 

315 raise GpgException( 

316 "Keyword '{0}' from GnuPG was not expected.".format(field) 

317 ) 

318 

319 def _exec_gpg(self, stdin, stdout, stderr, statusfd): 

320 try: 

321 if stdin != 0: 

322 os.dup2(stdin, 0) 

323 if stdout != 1: 

324 os.dup2(stdout, 1) 

325 if stderr != 2: 

326 os.dup2(stderr, 2) 

327 if statusfd != 3: 

328 os.dup2(statusfd, 3) 

329 for fd in range(4): 

330 old = fcntl.fcntl(fd, fcntl.F_GETFD) 

331 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) 

332 os.closerange(4, _MAXFD) 

333 

334 args = [ 

335 self.gpg, 

336 "--status-fd=3", 

337 "--no-default-keyring", 

338 "--batch", 

339 "--no-tty", 

340 "--trust-model", 

341 "always", 

342 "--fixed-list-mode", 

343 ] 

344 for k in self.keyrings: 

345 args.extend(["--keyring", k]) 

346 args.extend(["--decrypt", "-"]) 

347 

348 os.execvp(self.gpg, args) 

349 finally: 

350 try: 

351 print("Failed to execute gpg.", file=sys.stderr) 

352 sys.stderr.flush() 

353 except: 

354 # Ignore errors, we want to reach the `exit` call below. 

355 pass 

356 os._exit(3) 

357 

358 @property 

359 def contents_sha1(self) -> str: 

360 return apt_pkg.sha1sum(self.contents) # type: ignore[attr-defined] 

361 

362 

363def sign( 

364 infile, 

365 outfile=None, 

366 keyids=[], 

367 inline=False, 

368 pubring=None, 

369 secring=None, 

370 homedir=None, 

371 passphrase_file=None, 

372 *, 

373 digest_algorithm="SHA256", 

374): 

375 args = [ 

376 "/usr/bin/gpg", 

377 "--no-options", 

378 "--no-tty", 

379 "--batch", 

380 "--armour", 

381 "--personal-digest-preferences", 

382 digest_algorithm, 

383 ] 

384 

385 for keyid in keyids: 

386 args.extend(["--local-user", keyid]) 

387 if pubring is not None: 387 ↛ 388line 387 didn't jump to line 388, because the condition on line 387 was never true

388 args.extend(["--keyring", pubring]) 

389 if secring is not None: 389 ↛ 390line 389 didn't jump to line 390, because the condition on line 389 was never true

390 args.extend(["--secret-keyring", secring]) 

391 if homedir is not None: 391 ↛ 393line 391 didn't jump to line 393, because the condition on line 391 was never false

392 args.extend(["--homedir", homedir]) 

393 if passphrase_file is not None: 393 ↛ 394line 393 didn't jump to line 394, because the condition on line 393 was never true

394 args.extend( 

395 ["--pinentry-mode", "loopback", "--passphrase-file", passphrase_file] 

396 ) 

397 

398 args.append("--clearsign" if inline else "--detach-sign") 

399 

400 kwargs = {} 

401 if isinstance(infile, str): 401 ↛ 402line 401 didn't jump to line 402, because the condition on line 401 was never true

402 infile = infile.encode("utf-8") 

403 if isinstance(infile, bytes): 

404 kwargs["input"] = infile 

405 else: 

406 kwargs["stdin"] = infile 

407 if outfile is None: 

408 kwargs["stdout"] = subprocess.PIPE 

409 else: 

410 kwargs["stdout"] = outfile 

411 

412 result = subprocess.run(args, check=True, **kwargs) 

413 if outfile is None: 

414 return result.stdout 

415 

416 

417# vim: set sw=4 et: