Coverage for daklib/command.py: 65%
263 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1"""module to handle command files
3@contact: Debian FTP Master <ftpmaster@debian.org>
4@copyright: 2012, Ansgar Burchardt <ansgar@debian.org>
5@copyright: 2023 Emilio Pozuelo Monfort <pochu@debian.org>
6@license: GPL-2+
7"""
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along
20# with this program; if not, write to the Free Software Foundation, Inc.,
21# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23import os
24import tempfile
25from typing import TYPE_CHECKING
27import apt_pkg
29from daklib.config import Config
30from daklib.dak_exceptions import ParseMaintError
31from daklib.dbconn import (
32 ACL,
33 ACLPerSource,
34 ACLPerSuite,
35 DBChange,
36 DBConn,
37 DBSource,
38 Fingerprint,
39 Keyring,
40 PolicyQueueUpload,
41 SignatureHistory,
42)
43from daklib.gpg import SignedFile
44from daklib.regexes import re_field_package
45from daklib.textutils import fix_maintainer
46from daklib.utils import TemplateSubst, gpg_get_key_addresses, send_mail
48if TYPE_CHECKING:
49 from sqlalchemy.orm import Session
52class CommandError(Exception):
53 pass
56class CommandFile:
57 def __init__(self, filename: str, data: bytes, log=None):
58 if log is None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 from daklib.daklog import Logger
61 log = Logger()
62 self.cc: list[str] = []
63 self.result: list[str] = []
64 self.log = log
65 self.filename: str = filename
66 self.data = data
67 self.uploader: str | None = None
69 def _check_replay(self, signed_file: SignedFile, session: "Session"):
70 """check for replays
72 .. note::
74 Will commit changes to the database.
76 :param session: database session
77 """
78 # Mark commands file as seen to prevent replays.
79 signature_history = SignatureHistory.from_signed_file(signed_file)
80 session.add(signature_history)
81 session.commit()
83 def _quote_section(self, section: apt_pkg.TagSection) -> str:
84 lines = [f"> {line}" for line in str(section).splitlines()]
85 return "\n".join(lines)
87 def _evaluate_sections(self, sections: apt_pkg.TagFile, session: "Session"):
88 session.rollback()
89 try:
90 while True:
91 next(sections)
92 section: apt_pkg.TagSection = sections.section # type: ignore[attr-defined]
93 self.result.append(self._quote_section(section))
95 action = section.get("Action", None)
96 if action is None: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true
97 raise CommandError("Encountered section without Action field")
99 if action == "dm":
100 self.action_dm(self.fingerprint, section, session)
101 elif action == "dm-remove": 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 self.action_dm_remove(self.fingerprint, section, session)
103 elif action == "dm-migrate": 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 self.action_dm_migrate(self.fingerprint, section, session)
105 elif action == "break-the-archive": 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 self.action_break_the_archive(self.fingerprint, section, session)
107 elif action == "process-upload": 107 ↛ 110line 107 didn't jump to line 110 because the condition on line 107 was always true
108 self.action_process_upload(self.fingerprint, section, session)
109 else:
110 raise CommandError("Unknown action: {0}".format(action))
112 self.result.append("")
113 except StopIteration:
114 pass
115 finally:
116 session.rollback()
118 def _notify_uploader(self):
119 cnf = Config()
121 bcc = "X-DAK: dak process-command"
122 if "Dinstall::Bcc" in cnf: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 bcc = "{0}\nBcc: {1}".format(bcc, cnf["Dinstall::Bcc"])
125 maint_to = ""
126 addresses = gpg_get_key_addresses(self.fingerprint.fingerprint)
127 if len(addresses) > 0: 127 ↛ 130line 127 didn't jump to line 130 because the condition on line 127 was always true
128 maint_to = addresses[0]
130 if self.uploader:
131 try:
132 maint_to = fix_maintainer(self.uploader)[1]
133 except ParseMaintError:
134 self.log.log("ignoring malformed uploader", self.filename)
136 cc = set()
137 for address in self.cc:
138 try:
139 cc.add(fix_maintainer(address)[1])
140 except ParseMaintError:
141 self.log.log("ignoring malformed cc", self.filename)
143 subst = {
144 "__DAK_ADDRESS__": cnf["Dinstall::MyEmailAddress"],
145 "__MAINTAINER_TO__": maint_to,
146 "__CC__": ", ".join(cc),
147 "__BCC__": bcc,
148 "__RESULTS__": "\n".join(self.result),
149 "__FILENAME__": self.filename,
150 }
152 message = TemplateSubst(
153 subst, os.path.join(cnf["Dir::Templates"], "process-command.processed")
154 )
156 send_mail(message)
158 def evaluate(self) -> bool:
159 """evaluate commands file
161 :return: :const:`True` if the file was processed sucessfully,
162 :const:`False` otherwise
163 """
164 result = True
166 session = DBConn().session()
168 keyrings = (
169 session.query(Keyring).filter_by(active=True).order_by(Keyring.priority)
170 )
171 keyring_files = [k.keyring_name for k in keyrings]
173 signed_file = SignedFile(self.data, keyring_files)
174 if not signed_file.valid: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 self.log.log(["invalid signature", self.filename])
176 return False
178 self.fingerprint = (
179 session.query(Fingerprint)
180 .filter_by(fingerprint=signed_file.primary_fingerprint)
181 .one()
182 )
183 if self.fingerprint.keyring is None: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 self.log.log(["signed by key in unknown keyring", self.filename])
185 return False
186 assert self.fingerprint.keyring.active
188 self.log.log(
189 [
190 "processing",
191 self.filename,
192 "signed-by={0}".format(self.fingerprint.fingerprint),
193 ]
194 )
196 with tempfile.TemporaryFile() as fh:
197 fh.write(signed_file.contents)
198 fh.seek(0)
199 sections = apt_pkg.TagFile(fh)
201 try:
202 next(sections)
203 section: apt_pkg.TagSection = sections.section # type: ignore[attr-defined]
204 if "Uploader" in section:
205 self.uploader = section["Uploader"]
206 if "Cc" in section: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 self.cc.append(section["Cc"])
208 # TODO: Verify first section has valid Archive field
209 if "Archive" not in section: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise CommandError("No Archive field in first section.")
212 # TODO: send mail when we detected a replay.
213 self._check_replay(signed_file, session)
215 self._evaluate_sections(sections, session)
216 self.result.append("")
217 except Exception as e:
218 self.log.log(["ERROR", e])
219 self.result.append(
220 "There was an error processing this section. No changes were committed.\nDetails:\n{0}".format(
221 e
222 )
223 )
224 result = False
226 self._notify_uploader()
228 session.close()
230 return result
232 def _split_packages(self, value: str) -> list[str]:
233 names = value.split()
234 for name in names:
235 if not re_field_package.match(name): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 raise CommandError('Invalid package name "{0}"'.format(name))
237 return names
239 def action_dm(
240 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session"
241 ) -> None:
242 cnf = Config()
244 if ( 244 ↛ 249line 244 didn't jump to line 249
245 "Command::DM::AdminKeyrings" not in cnf
246 or "Command::DM::ACL" not in cnf
247 or "Command::DM::Keyrings" not in cnf
248 ):
249 raise CommandError("DM command is not configured for this archive.")
251 allowed_keyrings = cnf.value_list("Command::DM::AdminKeyrings")
252 if ( 252 ↛ 256line 252 didn't jump to line 256
253 fingerprint.keyring is None
254 or fingerprint.keyring.keyring_name not in allowed_keyrings
255 ):
256 raise CommandError(
257 "Key {0} is not allowed to set DM".format(fingerprint.fingerprint)
258 )
260 acl_name = cnf.get("Command::DM::ACL", "dm")
261 acl = session.query(ACL).filter_by(name=acl_name).one()
263 fpr_hash = section["Fingerprint"].replace(" ", "")
264 fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first()
265 if fpr is None: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 raise CommandError("Unknown fingerprint {0}".format(fpr_hash))
267 if fpr.keyring is None or fpr.keyring.keyring_name not in cnf.value_list(
268 "Command::DM::Keyrings"
269 ):
270 raise CommandError("Key {0} is not in DM keyring.".format(fpr.fingerprint))
271 addresses = gpg_get_key_addresses(fpr.fingerprint)
272 if len(addresses) > 0: 272 ↛ 275line 272 didn't jump to line 275 because the condition on line 272 was always true
273 self.cc.append(addresses[0])
275 self.log.log(["dm", "fingerprint", fpr.fingerprint])
276 self.result.append("Fingerprint: {0}".format(fpr.fingerprint))
277 if len(addresses) > 0: 277 ↛ 281line 277 didn't jump to line 281 because the condition on line 277 was always true
278 self.log.log(["dm", "uid", addresses[0]])
279 self.result.append("Uid: {0}".format(addresses[0]))
281 for source in self._split_packages(section.get("Allow", "")):
282 # Check for existance of source package to catch typos
283 if session.query(DBSource).filter_by(source=source).first() is None: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 raise CommandError(
285 "Tried to grant permissions for unknown source package: {0}".format(
286 source
287 )
288 )
290 if ( 290 ↛ 306line 290 didn't jump to line 306
291 session.query(ACLPerSource)
292 .filter_by(acl=acl, fingerprint=fpr, source=source)
293 .first()
294 is None
295 ):
296 aps = ACLPerSource()
297 aps.acl = acl
298 aps.fingerprint = fpr
299 aps.source = source
300 aps.created_by = fingerprint
301 aps.reason = section.get("Reason")
302 session.add(aps)
303 self.log.log(["dm", "allow", fpr.fingerprint, source])
304 self.result.append("Allowed: {0}".format(source))
305 else:
306 self.result.append("Already-Allowed: {0}".format(source))
308 session.flush()
310 for source in self._split_packages(section.get("Deny", "")):
311 count = (
312 session.query(ACLPerSource)
313 .filter_by(acl=acl, fingerprint=fpr, source=source)
314 .delete()
315 )
316 if count == 0: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 raise CommandError(
318 "Tried to remove upload permissions for package {0}, "
319 "but no upload permissions were granted before.".format(source)
320 )
322 self.log.log(["dm", "deny", fpr.fingerprint, source])
323 self.result.append("Denied: {0}".format(source))
325 session.commit()
327 def _action_dm_admin_common(
328 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session"
329 ) -> None:
330 cnf = Config()
332 if (
333 "Command::DM-Admin::AdminFingerprints" not in cnf
334 or "Command::DM::ACL" not in cnf
335 ):
336 raise CommandError("DM admin command is not configured for this archive.")
338 allowed_fingerprints = cnf.value_list("Command::DM-Admin::AdminFingerprints")
339 if fingerprint.fingerprint not in allowed_fingerprints:
340 raise CommandError(
341 "Key {0} is not allowed to admin DM".format(fingerprint.fingerprint)
342 )
344 def action_dm_remove(
345 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session"
346 ) -> None:
347 self._action_dm_admin_common(fingerprint, section, session)
349 cnf = Config()
350 acl_name = cnf.get("Command::DM::ACL", "dm")
351 acl = session.query(ACL).filter_by(name=acl_name).one()
353 fpr_hash = section["Fingerprint"].replace(" ", "")
354 fpr = session.query(Fingerprint).filter_by(fingerprint=fpr_hash).first()
355 if fpr is None:
356 self.result.append(
357 "Unknown fingerprint: {0}\nNo action taken.".format(fpr_hash)
358 )
359 return
361 self.log.log(["dm-remove", fpr.fingerprint])
363 count = 0
364 for entry in session.query(ACLPerSource).filter_by(acl=acl, fingerprint=fpr):
365 self.log.log(
366 ["dm-remove", fpr.fingerprint, "source={0}".format(entry.source)]
367 )
368 count += 1
369 session.delete(entry)
371 self.result.append(
372 "Removed: {0}.\n{1} acl entries removed.".format(fpr.fingerprint, count)
373 )
375 session.commit()
377 def action_dm_migrate(
378 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session"
379 ) -> None:
380 self._action_dm_admin_common(fingerprint, section, session)
381 cnf = Config()
382 acl_name = cnf.get("Command::DM::ACL", "dm")
383 acl = session.query(ACL).filter_by(name=acl_name).one()
385 fpr_hash_from = section["From"].replace(" ", "")
386 fpr_from = (
387 session.query(Fingerprint).filter_by(fingerprint=fpr_hash_from).first()
388 )
389 if fpr_from is None:
390 self.result.append(
391 "Unknown fingerprint (From): {0}\nNo action taken.".format(
392 fpr_hash_from
393 )
394 )
395 return
397 fpr_hash_to = section["To"].replace(" ", "")
398 fpr_to = session.query(Fingerprint).filter_by(fingerprint=fpr_hash_to).first()
399 if fpr_to is None:
400 self.result.append(
401 "Unknown fingerprint (To): {0}\nNo action taken.".format(fpr_hash_to)
402 )
403 return
404 if fpr_to.keyring is None or fpr_to.keyring.keyring_name not in cnf.value_list(
405 "Command::DM::Keyrings"
406 ):
407 self.result.append(
408 "Key (To) {0} is not in DM keyring.\nNo action taken.".format(
409 fpr_to.fingerprint
410 )
411 )
412 return
414 self.log.log(
415 [
416 "dm-migrate",
417 "from={0}".format(fpr_hash_from),
418 "to={0}".format(fpr_hash_to),
419 ]
420 )
422 sources = []
423 for entry in session.query(ACLPerSource).filter_by(
424 acl=acl, fingerprint=fpr_from
425 ):
426 self.log.log(
427 [
428 "dm-migrate",
429 "from={0}".format(fpr_hash_from),
430 "to={0}".format(fpr_hash_to),
431 "source={0}".format(entry.source),
432 ]
433 )
434 entry.fingerprint = fpr_to
435 sources.append(entry.source)
437 self.result.append(
438 "Migrated {0} to {1}.\n{2} acl entries changed: {3}".format(
439 fpr_hash_from, fpr_hash_to, len(sources), ", ".join(sources)
440 )
441 )
443 session.commit()
445 def action_break_the_archive(
446 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session"
447 ) -> None:
448 name = "Dave"
449 uid = fingerprint.uid
450 if uid is not None and uid.name is not None:
451 name = uid.name.split()[0]
453 self.result.append(
454 "DAK9000: I'm sorry, {0}. I'm afraid I can't do that.".format(name)
455 )
457 def _sourcename_from_dbchanges(self, changes: DBChange) -> str:
458 source = changes.source
459 # in case the Source contains spaces, e.g. in binNMU .changes
460 source = source.split(" ")[0]
462 return source
464 def _process_upload_add_command_file(
465 self, upload: PolicyQueueUpload, command: str
466 ) -> None:
467 source = self._sourcename_from_dbchanges(upload.changes)
468 filename = f"{command}.{source}_{upload.changes.version}"
469 content = "OK" if command == "ACCEPT" else "NOTOK"
471 with open(
472 os.path.join(upload.policy_queue.path, "COMMENTS", filename), "x"
473 ) as f:
474 f.write(content + "\n")
476 def _action_process_upload_common(
477 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session"
478 ) -> None:
479 cnf = Config()
481 if "Command::ProcessUpload::ACL" not in cnf: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 raise CommandError(
483 "Process Upload command is not configured for this archive."
484 )
486 def action_process_upload(
487 self, fingerprint: Fingerprint, section: apt_pkg.TagSection, session: "Session"
488 ) -> None:
489 self._action_process_upload_common(fingerprint, section, session)
491 cnf = Config()
492 acl_name = cnf.get("Command::ProcessUpload::ACL", "process-upload")
493 acl = session.query(ACL).filter_by(name=acl_name).one()
495 source = section["Source"].replace(" ", "")
496 version = section["Version"].replace(" ", "")
497 command = section["Command"].replace(" ", "")
499 if command not in ("ACCEPT", "REJECT"): 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true
500 raise CommandError("Invalid ProcessUpload command: {0}".format(command))
502 uploads = (
503 session.query(PolicyQueueUpload)
504 .join(PolicyQueueUpload.changes)
505 .filter_by(version=version)
506 .all()
507 )
508 # we don't filter_by(source=source) because a source in a DBChange can
509 # contain more than the source, e.g. 'source (version)' for binNMUs
510 uploads = [
511 upload
512 for upload in uploads
513 if self._sourcename_from_dbchanges(upload.changes) == source
514 ]
515 if not uploads: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 raise CommandError(
517 "Could not find upload for {0} {1}".format(source, version)
518 )
520 upload = uploads[0]
522 # we consider all uploads except those for NEW, and take into account the
523 # target suite when checking for permissions
524 if upload.policy_queue.queue_name == "new": 524 ↛ 525line 524 didn't jump to line 525 because the condition on line 524 was never true
525 raise CommandError(
526 "Processing uploads from NEW not allowed ({0} {1})".format(
527 source, version
528 )
529 )
531 suite = upload.target_suite
533 self.log.log(
534 [
535 "process-upload",
536 fingerprint.fingerprint,
537 source,
538 version,
539 upload.policy_queue.queue_name,
540 suite.suite_name,
541 ]
542 )
544 allowed = (
545 session.query(ACLPerSource)
546 .filter_by(acl=acl, fingerprint=fingerprint, source=source)
547 .count()
548 > 0
549 or session.query(ACLPerSuite)
550 .filter_by(acl=acl, fingerprint=fingerprint, suite=suite)
551 .count()
552 > 0
553 )
555 self.log.log(
556 [
557 "process-upload",
558 fingerprint.fingerprint,
559 source,
560 version,
561 upload.policy_queue.queue_name,
562 suite.suite_name,
563 allowed,
564 ]
565 )
567 if allowed:
568 self._process_upload_add_command_file(upload, command)
570 self.result.append(
571 "ProcessUpload: processed fp {0}: {1}_{2}/{3}".format(
572 fingerprint.fingerprint, source, version, suite.codename
573 )
574 )