Coverage for daklib/command.py: 65%

263 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1"""module to handle command files 

2 

3@contact: Debian FTP Master <ftpmaster@debian.org> 

4@copyright: 2012, Ansgar Burchardt <ansgar@debian.org> 

5@copyright: 2023 Emilio Pozuelo Monfort <pochu@debian.org> 

6@license: GPL-2+ 

7""" 

8 

9# This program is free software; you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation; either version 2 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License along 

20# with this program; if not, write to the Free Software Foundation, Inc., 

21# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

22 

23import os 

24import tempfile 

25from typing import TYPE_CHECKING 

26 

27import apt_pkg 

28 

29from daklib.config import Config 

30from daklib.dak_exceptions import ParseMaintError 

31from daklib.dbconn import ( 

32 ACL, 

33 ACLPerSource, 

34 ACLPerSuite, 

35 DBChange, 

36 DBConn, 

37 DBSource, 

38 Fingerprint, 

39 Keyring, 

40 PolicyQueueUpload, 

41 SignatureHistory, 

42) 

43from daklib.gpg import SignedFile 

44from daklib.regexes import re_field_package 

45from daklib.textutils import fix_maintainer 

46from daklib.utils import TemplateSubst, gpg_get_key_addresses, send_mail 

47 

48if TYPE_CHECKING: 

49 from sqlalchemy.orm import Session 

50 

51 

52class CommandError(Exception): 

53 pass 

54 

55 

56class CommandFile: 

57 def __init__(self, filename: str, data: bytes, log=None): 

58 if log is None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 from daklib.daklog import Logger 

60 

61 log = Logger() 

62 self.cc: list[str] = [] 

63 self.result: list[str] = [] 

64 self.log = log 

65 self.filename: str = filename 

66 self.data = data 

67 self.uploader: str | None = None 

68 

69 def _check_replay(self, signed_file: SignedFile, session: "Session"): 

70 """check for replays 

71 

72 .. note:: 

73 

74 Will commit changes to the database. 

75 

76 :param session: database session 

77 """ 

78 # Mark commands file as seen to prevent replays. 

79 signature_history = SignatureHistory.from_signed_file(signed_file) 

80 session.add(signature_history) 

81 session.commit() 

82 

83 def _quote_section(self, section: apt_pkg.TagSection) -> str: 

84 lines = [f"> {line}" for line in str(section).splitlines()] 

85 return "\n".join(lines) 

86 

87 def _evaluate_sections(self, sections: apt_pkg.TagFile, session: "Session"): 

88 session.rollback() 

89 try: 

90 while True: 

91 next(sections) 

92 section: apt_pkg.TagSection = sections.section # type: ignore[attr-defined] 

93 self.result.append(self._quote_section(section)) 

94 

95 action = section.get("Action", None) 

96 if action is None: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true

97 raise CommandError("Encountered section without Action field") 

98 

99 if action == "dm": 

100 self.action_dm(self.fingerprint, section, session) 

101 elif action == "dm-remove": 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 self.action_dm_remove(self.fingerprint, section, session) 

103 elif action == "dm-migrate": 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 self.action_dm_migrate(self.fingerprint, section, session) 

105 elif action == "break-the-archive": 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 self.action_break_the_archive(self.fingerprint, section, session) 

107 elif action == "process-upload": 107 ↛ 110line 107 didn't jump to line 110 because the condition on line 107 was always true

108 self.action_process_upload(self.fingerprint, section, session) 

109 else: 

110 raise CommandError("Unknown action: {0}".format(action)) 

111 

112 self.result.append("") 

113 except StopIteration: 

114 pass 

115 finally: 

116 session.rollback() 

117 

118 def _notify_uploader(self): 

119 cnf = Config() 

120 

121 bcc = "X-DAK: dak process-command" 

122 if "Dinstall::Bcc" in cnf: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 bcc = "{0}\nBcc: {1}".format(bcc, cnf["Dinstall::Bcc"]) 

124 

125 maint_to = "" 

126 addresses = gpg_get_key_addresses(self.fingerprint.fingerprint) 

127 if len(addresses) > 0: 127 ↛ 130line 127 didn't jump to line 130 because the condition on line 127 was always true

128 maint_to = addresses[0] 

129 

130 if self.uploader: 

131 try: 

132 maint_to = fix_maintainer(self.uploader)[1] 

133 except ParseMaintError: 

134 self.log.log("ignoring malformed uploader", self.filename) 

135 

136 cc = set() 

137 for address in self.cc: 

138 try: 

139 cc.add(fix_maintainer(address)[1]) 

140 except ParseMaintError: 

141 self.log.log("ignoring malformed cc", self.filename) 

142 

143 subst = { 

144 "__DAK_ADDRESS__": cnf["Dinstall::MyEmailAddress"], 

145 "__MAINTAINER_TO__": maint_to, 

146 "__CC__": ", ".join(cc), 

147 "__BCC__": bcc, 

148 "__RESULTS__": "\n".join(self.result), 

149 "__FILENAME__": self.filename, 

150 } 

151 

152 message = TemplateSubst( 

153 subst, os.path.join(cnf["Dir::Templates"], "process-command.processed") 

154 ) 

155 

156 send_mail(message) 

157 

158 def evaluate(self) -> bool: 

159 """evaluate commands file 

160 

161 :return: :const:`True` if the file was processed sucessfully, 

162 :const:`False` otherwise 

163 """ 

164 result = True 

165 

166 session = DBConn().session() 

167 

168 keyrings = ( 

169 session.query(Keyring).filter_by(active=True).order_by(Keyring.priority) 

170 ) 

171 keyring_files = [k.keyring_name for k in keyrings] 

172 

173 signed_file = SignedFile(self.data, keyring_files) 

174 if not signed_file.valid: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 self.log.log(["invalid signature", self.filename]) 

176 return False 

177 

178 self.fingerprint = ( 

179 session.query(Fingerprint) 

180 .filter_by(fingerprint=signed_file.primary_fingerprint) 

181 .one() 

182 ) 

183 if self.fingerprint.keyring is None: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 self.log.log(["signed by key in unknown keyring", self.filename]) 

185 return False 

186 assert self.fingerprint.keyring.active 

187 

188 self.log.log( 

189 [ 

190 "processing", 

191 self.filename, 

192 "signed-by={0}".format(self.fingerprint.fingerprint), 

193 ] 

194 ) 

195 

196 with tempfile.TemporaryFile() as fh: 

197 fh.write(signed_file.contents) 

198 fh.seek(0) 

199 sections = apt_pkg.TagFile(fh) 

200 

201 try: 

202 next(sections) 

203 section: apt_pkg.TagSection = sections.section # type: ignore[attr-defined] 

204 if "Uploader" in section: 

205 self.uploader = section["Uploader"] 

206 if "Cc" in section: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 self.cc.append(section["Cc"]) 

208 # TODO: Verify first section has valid Archive field 

209 if "Archive" not in section: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise CommandError("No Archive field in first section.") 

211 

212 # TODO: send mail when we detected a replay. 

213 self._check_replay(signed_file, session) 

214 

215 self._evaluate_sections(sections, session) 

216 self.result.append("") 

217 except Exception as e: 

218 self.log.log(["ERROR", e]) 

219 self.result.append( 

220 "There was an error processing this section. No changes were committed.\nDetails:\n{0}".format( 

221 e 

222 ) 

223 ) 

224 result = False 

225 

226 self._notify_uploader() 

227 

228 session.close() 

229 

230 return result 

231 

232 def _split_packages(self, value: str) -> list[str]: 

233 names = value.split() 

234 for name in names: 

235 if not re_field_package.match(name): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 raise CommandError('Invalid package name "{0}"'.format(name)) 

237 return names 

238 

239 def action_dm( 

240 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session" 

241 ) -> None: 

242 cnf = Config() 

243 

244 if ( 244 ↛ 249line 244 didn't jump to line 249

245 "Command::DM::AdminKeyrings" not in cnf 

246 or "Command::DM::ACL" not in cnf 

247 or "Command::DM::Keyrings" not in cnf 

248 ): 

249 raise CommandError("DM command is not configured for this archive.") 

250 

251 allowed_keyrings = cnf.value_list("Command::DM::AdminKeyrings") 

252 if ( 252 ↛ 256line 252 didn't jump to line 256

253 fingerprint.keyring is None 

254 or fingerprint.keyring.keyring_name not in allowed_keyrings 

255 ): 

256 raise CommandError( 

257 "Key {0} is not allowed to set DM".format(fingerprint.fingerprint) 

258 ) 

259 

260 acl_name = cnf.get("Command::DM::ACL", "dm") 

261 acl = session.query(ACL).filter_by(name=acl_name).one() 

262 

263 fpr_hash = section["Fingerprint"].replace(" ", "") 

264 fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first() 

265 if fpr is None: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 raise CommandError("Unknown fingerprint {0}".format(fpr_hash)) 

267 if fpr.keyring is None or fpr.keyring.keyring_name not in cnf.value_list( 

268 "Command::DM::Keyrings" 

269 ): 

270 raise CommandError("Key {0} is not in DM keyring.".format(fpr.fingerprint)) 

271 addresses = gpg_get_key_addresses(fpr.fingerprint) 

272 if len(addresses) > 0: 272 ↛ 275line 272 didn't jump to line 275 because the condition on line 272 was always true

273 self.cc.append(addresses[0]) 

274 

275 self.log.log(["dm", "fingerprint", fpr.fingerprint]) 

276 self.result.append("Fingerprint: {0}".format(fpr.fingerprint)) 

277 if len(addresses) > 0: 277 ↛ 281line 277 didn't jump to line 281 because the condition on line 277 was always true

278 self.log.log(["dm", "uid", addresses[0]]) 

279 self.result.append("Uid: {0}".format(addresses[0])) 

280 

281 for source in self._split_packages(section.get("Allow", "")): 

282 # Check for existance of source package to catch typos 

283 if session.query(DBSource).filter_by(source=source).first() is None: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 raise CommandError( 

285 "Tried to grant permissions for unknown source package: {0}".format( 

286 source 

287 ) 

288 ) 

289 

290 if ( 290 ↛ 306line 290 didn't jump to line 306

291 session.query(ACLPerSource) 

292 .filter_by(acl=acl, fingerprint=fpr, source=source) 

293 .first() 

294 is None 

295 ): 

296 aps = ACLPerSource() 

297 aps.acl = acl 

298 aps.fingerprint = fpr 

299 aps.source = source 

300 aps.created_by = fingerprint 

301 aps.reason = section.get("Reason") 

302 session.add(aps) 

303 self.log.log(["dm", "allow", fpr.fingerprint, source]) 

304 self.result.append("Allowed: {0}".format(source)) 

305 else: 

306 self.result.append("Already-Allowed: {0}".format(source)) 

307 

308 session.flush() 

309 

310 for source in self._split_packages(section.get("Deny", "")): 

311 count = ( 

312 session.query(ACLPerSource) 

313 .filter_by(acl=acl, fingerprint=fpr, source=source) 

314 .delete() 

315 ) 

316 if count == 0: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 raise CommandError( 

318 "Tried to remove upload permissions for package {0}, " 

319 "but no upload permissions were granted before.".format(source) 

320 ) 

321 

322 self.log.log(["dm", "deny", fpr.fingerprint, source]) 

323 self.result.append("Denied: {0}".format(source)) 

324 

325 session.commit() 

326 

327 def _action_dm_admin_common( 

328 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session" 

329 ) -> None: 

330 cnf = Config() 

331 

332 if ( 

333 "Command::DM-Admin::AdminFingerprints" not in cnf 

334 or "Command::DM::ACL" not in cnf 

335 ): 

336 raise CommandError("DM admin command is not configured for this archive.") 

337 

338 allowed_fingerprints = cnf.value_list("Command::DM-Admin::AdminFingerprints") 

339 if fingerprint.fingerprint not in allowed_fingerprints: 

340 raise CommandError( 

341 "Key {0} is not allowed to admin DM".format(fingerprint.fingerprint) 

342 ) 

343 

344 def action_dm_remove( 

345 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session" 

346 ) -> None: 

347 self._action_dm_admin_common(fingerprint, section, session) 

348 

349 cnf = Config() 

350 acl_name = cnf.get("Command::DM::ACL", "dm") 

351 acl = session.query(ACL).filter_by(name=acl_name).one() 

352 

353 fpr_hash = section["Fingerprint"].replace(" ", "") 

354 fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first() 

355 if fpr is None: 

356 self.result.append( 

357 "Unknown fingerprint: {0}\nNo action taken.".format(fpr_hash) 

358 ) 

359 return 

360 

361 self.log.log(["dm-remove", fpr.fingerprint]) 

362 

363 count = 0 

364 for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr): 

365 self.log.log( 

366 ["dm-remove", fpr.fingerprint, "source={0}".format(entry.source)] 

367 ) 

368 count += 1 

369 session.delete(entry) 

370 

371 self.result.append( 

372 "Removed: {0}.\n{1} acl entries removed.".format(fpr.fingerprint, count) 

373 ) 

374 

375 session.commit() 

376 

377 def action_dm_migrate( 

378 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session" 

379 ) -> None: 

380 self._action_dm_admin_common(fingerprint, section, session) 

381 cnf = Config() 

382 acl_name = cnf.get("Command::DM::ACL", "dm") 

383 acl = session.query(ACL).filter_by(name=acl_name).one() 

384 

385 fpr_hash_from = section["From"].replace(" ", "") 

386 fpr_from = ( 

387 session.query(Fingerprint).filter_by(fingerprint=fpr_hash_from).first() 

388 ) 

389 if fpr_from is None: 

390 self.result.append( 

391 "Unknown fingerprint (From): {0}\nNo action taken.".format( 

392 fpr_hash_from 

393 ) 

394 ) 

395 return 

396 

397 fpr_hash_to = section["To"].replace(" ", "") 

398 fpr_to = session.query(Fingerprint).filter_by(fingerprint=fpr_hash_to).first() 

399 if fpr_to is None: 

400 self.result.append( 

401 "Unknown fingerprint (To): {0}\nNo action taken.".format(fpr_hash_to) 

402 ) 

403 return 

404 if fpr_to.keyring is None or fpr_to.keyring.keyring_name not in cnf.value_list( 

405 "Command::DM::Keyrings" 

406 ): 

407 self.result.append( 

408 "Key (To) {0} is not in DM keyring.\nNo action taken.".format( 

409 fpr_to.fingerprint 

410 ) 

411 ) 

412 return 

413 

414 self.log.log( 

415 [ 

416 "dm-migrate", 

417 "from={0}".format(fpr_hash_from), 

418 "to={0}".format(fpr_hash_to), 

419 ] 

420 ) 

421 

422 sources = [] 

423 for entry in session.query(ACLPerSource).filter_by( 

424 acl=acl, fingerprint=fpr_from 

425 ): 

426 self.log.log( 

427 [ 

428 "dm-migrate", 

429 "from={0}".format(fpr_hash_from), 

430 "to={0}".format(fpr_hash_to), 

431 "source={0}".format(entry.source), 

432 ] 

433 ) 

434 entry.fingerprint = fpr_to 

435 sources.append(entry.source) 

436 

437 self.result.append( 

438 "Migrated {0} to {1}.\n{2} acl entries changed: {3}".format( 

439 fpr_hash_from, fpr_hash_to, len(sources), ", ".join(sources) 

440 ) 

441 ) 

442 

443 session.commit() 

444 

445 def action_break_the_archive( 

446 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session" 

447 ) -> None: 

448 name = "Dave" 

449 uid = fingerprint.uid 

450 if uid is not None and uid.name is not None: 

451 name = uid.name.split()[0] 

452 

453 self.result.append( 

454 "DAK9000: I'm sorry, {0}. I'm afraid I can't do that.".format(name) 

455 ) 

456 

457 def _sourcename_from_dbchanges(self, changes: DBChange) -> str: 

458 source = changes.source 

459 # in case the Source contains spaces, e.g. in binNMU .changes 

460 source = source.split(" ")[0] 

461 

462 return source 

463 

464 def _process_upload_add_command_file( 

465 self, upload: PolicyQueueUpload, command: str 

466 ) -> None: 

467 source = self._sourcename_from_dbchanges(upload.changes) 

468 filename = f"{command}.{source}_{upload.changes.version}" 

469 content = "OK" if command == "ACCEPT" else "NOTOK" 

470 

471 with open( 

472 os.path.join(upload.policy_queue.path, "COMMENTS", filename), "x" 

473 ) as f: 

474 f.write(content + "\n") 

475 

476 def _action_process_upload_common( 

477 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session" 

478 ) -> None: 

479 cnf = Config() 

480 

481 if "Command::ProcessUpload::ACL" not in cnf: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 raise CommandError( 

483 "Process Upload command is not configured for this archive." 

484 ) 

485 

486 def action_process_upload( 

487 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session" 

488 ) -> None: 

489 self._action_process_upload_common(fingerprint, section, session) 

490 

491 cnf = Config() 

492 acl_name = cnf.get("Command::ProcessUpload::ACL", "process-upload") 

493 acl = session.query(ACL).filter_by(name=acl_name).one() 

494 

495 source = section["Source"].replace(" ", "") 

496 version = section["Version"].replace(" ", "") 

497 command = section["Command"].replace(" ", "") 

498 

499 if command not in ("ACCEPT", "REJECT"): 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true

500 raise CommandError("Invalid ProcessUpload command: {0}".format(command)) 

501 

502 uploads = ( 

503 session.query(PolicyQueueUpload) 

504 .join(PolicyQueueUpload.changes) 

505 .filter_by(version=version) 

506 .all() 

507 ) 

508 # we don't filter_by(source=source) because a source in a DBChange can 

509 # contain more than the source, e.g. 'source (version)' for binNMUs 

510 uploads = [ 

511 upload 

512 for upload in uploads 

513 if self._sourcename_from_dbchanges(upload.changes) == source 

514 ] 

515 if not uploads: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 raise CommandError( 

517 "Could not find upload for {0} {1}".format(source, version) 

518 ) 

519 

520 upload = uploads[0] 

521 

522 # we consider all uploads except those for NEW, and take into account the 

523 # target suite when checking for permissions 

524 if upload.policy_queue.queue_name == "new": 524 ↛ 525line 524 didn't jump to line 525 because the condition on line 524 was never true

525 raise CommandError( 

526 "Processing uploads from NEW not allowed ({0} {1})".format( 

527 source, version 

528 ) 

529 ) 

530 

531 suite = upload.target_suite 

532 

533 self.log.log( 

534 [ 

535 "process-upload", 

536 fingerprint.fingerprint, 

537 source, 

538 version, 

539 upload.policy_queue.queue_name, 

540 suite.suite_name, 

541 ] 

542 ) 

543 

544 allowed = ( 

545 session.query(ACLPerSource) 

546 .filter_by(acl=acl, fingerprint=fingerprint, source=source) 

547 .count() 

548 > 0 

549 or session.query(ACLPerSuite) 

550 .filter_by(acl=acl, fingerprint=fingerprint, suite=suite) 

551 .count() 

552 > 0 

553 ) 

554 

555 self.log.log( 

556 [ 

557 "process-upload", 

558 fingerprint.fingerprint, 

559 source, 

560 version, 

561 upload.policy_queue.queue_name, 

562 suite.suite_name, 

563 allowed, 

564 ] 

565 ) 

566 

567 if allowed: 

568 self._process_upload_add_command_file(upload, command) 

569 

570 self.result.append( 

571 "ProcessUpload: processed fp {0}: {1}_{2}/{3}".format( 

572 fingerprint.fingerprint, source, version, suite.codename 

573 ) 

574 )