Coverage for dak/transitions.py: 12%
279 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1#! /usr/bin/env python3
3"""
4Display, edit and check the release manager's transition file.
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2008 Joerg Jaspert <joerg@debian.org>
8@license: GNU General Public License version 2 or later
9"""
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25################################################################################
27# <elmo> if klecker.d.o died, I swear to god, I'm going to migrate to gentoo.
29################################################################################
31import errno
32import fcntl
33import os
34import subprocess
35import sys
36import tempfile
37import time
38from collections.abc import Sequence
39from typing import Optional
41import apt_pkg
42import yaml
44from daklib import utils
45from daklib.dak_exceptions import TransitionsError
46from daklib.dbconn import DBConn, get_source_in_suite
47from daklib.regexes import re_broken_package
49# Globals
50Cnf: apt_pkg.Configuration #: Configuration
51Options: apt_pkg.Configuration #: Parsed CommandLine arguments
53################################################################################
55#####################################
56#### This may run within sudo !! ####
57#####################################
60def init():
61 """
62 Initialize. Sets up database connection, parses commandline arguments.
64 .. warning::
66 This function may run **within sudo**
68 """
69 global Cnf, Options
71 apt_pkg.init()
73 Cnf = utils.get_conf()
75 Arguments = [
76 ("a", "automatic", "Edit-Transitions::Options::Automatic"),
77 ("h", "help", "Edit-Transitions::Options::Help"),
78 ("e", "edit", "Edit-Transitions::Options::Edit"),
79 ("i", "import", "Edit-Transitions::Options::Import", "HasArg"),
80 ("c", "check", "Edit-Transitions::Options::Check"),
81 ("s", "sudo", "Edit-Transitions::Options::Sudo"),
82 ("n", "no-action", "Edit-Transitions::Options::No-Action"),
83 ]
85 for i in ["automatic", "help", "no-action", "edit", "import", "check", "sudo"]:
86 key = "Edit-Transitions::Options::%s" % i
87 if key not in Cnf: 87 ↛ 85line 87 didn't jump to line 85 because the condition on line 87 was always true
88 Cnf[key] = "" # type: ignore[index]
90 apt_pkg.parse_commandline(Cnf, Arguments, sys.argv) # type: ignore[attr-defined]
92 Options = Cnf.subtree("Edit-Transitions::Options") # type: ignore[attr-defined]
94 if Options["help"]: 94 ↛ 97line 94 didn't jump to line 97 because the condition on line 94 was always true
95 usage()
97 username = utils.getusername()
98 if username != "dak":
99 print("Non-dak user: %s" % username)
100 Options["sudo"] = "y" # type: ignore[index]
102 # Initialise DB connection
103 DBConn()
106################################################################################
109def usage(exit_code=0):
110 print(
111 """Usage: transitions [OPTION]...
112Update and check the release managers transition file.
114Options:
116 -h, --help show this help and exit.
117 -e, --edit edit the transitions file
118 -i, --import <file> check and import transitions from file
119 -c, --check check the transitions file, remove outdated entries
120 -S, --sudo use sudo to update transitions file
121 -a, --automatic don't prompt (only affects check).
122 -n, --no-action don't do anything (only affects check)"""
123 )
125 sys.exit(exit_code)
128################################################################################
130#####################################
131#### This may run within sudo !! ####
132#####################################
135def load_transitions(trans_file: str) -> Optional[dict]:
136 """
137 Parse a transition yaml file and check it for validity.
139 .. warning::
141 This function may run **within sudo**
143 :param trans_file: filename to parse
144 :return: validated dictionary of transition entries or None
145 if validation fails, empty string if reading `trans_file`
146 returned something else than a dict
148 """
149 # Parse the yaml file
150 with open(trans_file, "r") as sourcefile:
151 sourcecontent = sourcefile.read()
152 failure = False
153 try:
154 trans = yaml.safe_load(sourcecontent)
155 except yaml.YAMLError as exc:
156 # Someone fucked it up
157 print("ERROR: %s" % (exc))
158 return None
160 # lets do further validation here
161 checkkeys = ["source", "reason", "packages", "new", "rm"]
163 # If we get an empty definition - we just have nothing to check, no transitions defined
164 if not isinstance(trans, dict):
165 # This can be anything. We could have no transitions defined. Or someone totally fucked up the
166 # file, adding stuff in a way we dont know or want. Then we set it empty - and simply have no
167 # transitions anymore. User will see it in the information display after he quit the editor and
168 # could fix it
169 return None
171 try:
172 for test in trans:
173 t = trans[test]
175 # First check if we know all the keys for the transition and if they have
176 # the right type (and for the packages also if the list has the right types
177 # included, ie. not a list in list, but only str in the list)
178 for key in t:
179 if key not in checkkeys:
180 print("ERROR: Unknown key %s in transition %s" % (key, test))
181 failure = True
183 if key == "packages":
184 if not isinstance(t[key], list):
185 print(
186 "ERROR: Unknown type %s for packages in transition %s."
187 % (type(t[key]), test)
188 )
189 failure = True
190 try:
191 for package in t["packages"]:
192 if not isinstance(package, str):
193 print(
194 "ERROR: Packages list contains invalid type %s (as %s) in transition %s"
195 % (type(package), package, test)
196 )
197 failure = True
198 if re_broken_package.match(package):
199 # Someone had a space too much (or not enough), we have something looking like
200 # "package1 - package2" now.
201 print(
202 "ERROR: Invalid indentation of package list in transition %s, around package(s): %s"
203 % (test, package)
204 )
205 failure = True
206 except TypeError:
207 # In case someone has an empty packages list
208 print("ERROR: No packages defined in transition %s" % (test))
209 failure = True
210 continue
212 elif not isinstance(t[key], str):
213 if key == "new" and isinstance(t[key], int):
214 # Ok, debian native version
215 continue
216 else:
217 print(
218 "ERROR: Unknown type %s for key %s in transition %s"
219 % (type(t[key]), key, test)
220 )
221 failure = True
223 # And now the other way round - are all our keys defined?
224 for key in checkkeys:
225 if key not in t:
226 print("ERROR: Missing key %s in transition %s" % (key, test))
227 failure = True
228 except TypeError:
229 # In case someone defined very broken things
230 print("ERROR: Unable to parse the file")
231 failure = True
233 if failure:
234 return None
236 return trans
239################################################################################
241#####################################
242#### This may run within sudo !! ####
243#####################################
246def lock_file(f: str) -> int:
247 """
248 Lock a file
250 .. warning::
252 This function may run **within sudo**
253 """
254 for retry in range(10):
255 lock_fd = os.open(f, os.O_RDWR | os.O_CREAT)
256 try:
257 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
258 return lock_fd
259 except OSError as e:
260 if e.errno in (errno.EACCES, errno.EEXIST):
261 print("Unable to get lock for %s (try %d of 10)" % (f, retry + 1))
262 time.sleep(60)
263 else:
264 raise
266 utils.fubar("Couldn't obtain lock for %s." % (f))
269################################################################################
271#####################################
272#### This may run within sudo !! ####
273#####################################
276def write_transitions(from_trans: dict) -> None:
277 """
278 Update the active transitions file safely.
279 This function takes a parsed input file (which avoids invalid
280 files or files that may be be modified while the function is
281 active) and ensure the transitions file is updated atomically
282 to avoid locks.
284 .. warning::
286 This function may run **within sudo**
288 :param from_trans: transitions dictionary, as returned by :func:`load_transitions`
289 """
291 trans_file = Cnf["Dinstall::ReleaseTransitions"]
292 trans_temp = trans_file + ".tmp"
294 trans_lock = lock_file(trans_file)
295 temp_lock = lock_file(trans_temp)
297 with open(trans_temp, "w") as destfile:
298 yaml.safe_dump(from_trans, destfile, default_flow_style=False)
300 os.rename(trans_temp, trans_file)
301 os.close(temp_lock)
302 os.close(trans_lock)
305################################################################################
307##########################################
308#### This usually runs within sudo !! ####
309##########################################
312def write_transitions_from_file(from_file: str) -> None:
313 """
314 We have a file we think is valid; if we're using sudo, we invoke it
315 here, otherwise we just parse the file and call write_transitions
317 .. warning::
319 This function usually runs **within sudo**
321 :param from_file: filename of a transitions file
322 """
324 # Lets check if from_file is in the directory we expect it to be in
325 if not os.path.abspath(from_file).startswith(Cnf["Dir::TempPath"]):
326 print("Will not accept transitions file outside of %s" % (Cnf["Dir::TempPath"]))
327 sys.exit(3)
329 if Options["sudo"]:
330 subprocess.check_call(
331 [
332 "/usr/bin/sudo",
333 "-u",
334 "dak",
335 "-H",
336 "/usr/local/bin/dak",
337 "transitions",
338 "--import",
339 from_file,
340 ]
341 )
342 else:
343 trans = load_transitions(from_file)
344 if trans is None:
345 raise TransitionsError("Unparsable transitions file %s" % (from_file))
346 write_transitions(trans)
349################################################################################
352def temp_transitions_file(transitions: dict) -> str:
353 """
354 Open a temporary file and dump the current transitions into it, so users
355 can edit them.
357 :param transitions: current defined transitions
358 :return: path of newly created tempfile
360 .. note::
362 file is unlinked by caller, but fd is never actually closed.
363 We need the chmod, as the file is (most possibly) copied from a
364 sudo-ed script and would be unreadable if it has default mkstemp mode
365 """
367 (fd, path) = tempfile.mkstemp("", "transitions", Cnf["Dir::TempPath"])
368 os.chmod(path, 0o644)
369 with open(path, "w") as f:
370 yaml.safe_dump(transitions, f, default_flow_style=False)
371 return path
374################################################################################
377def edit_transitions():
378 """Edit the defined transitions."""
379 trans_file = Cnf["Dinstall::ReleaseTransitions"]
380 edit_file = temp_transitions_file(load_transitions(trans_file) or {})
382 editor = os.environ.get("EDITOR", "vi")
384 while True:
385 result = os.system("%s %s" % (editor, edit_file))
386 if result != 0:
387 os.unlink(edit_file)
388 utils.fubar(
389 "%s invocation failed for %s, not removing tempfile."
390 % (editor, edit_file)
391 )
393 # Now try to load the new file
394 test = load_transitions(edit_file)
396 if test is None:
397 # Edit is broken
398 print("Edit was unparsable.")
399 prompt = "[E]dit again, Drop changes?"
400 default = "E"
401 else:
402 print("Edit looks okay.\n")
403 print("The following transitions are defined:")
404 print(
405 "------------------------------------------------------------------------"
406 )
407 transition_info(test)
409 prompt = "[S]ave, Edit again, Drop changes?"
410 default = "S"
412 answer = "XXX"
413 while prompt.find(answer) == -1:
414 answer = utils.input_or_exit(prompt)
415 if answer == "":
416 answer = default
417 answer = answer[:1].upper()
419 if answer == "E":
420 continue
421 elif answer == "D":
422 os.unlink(edit_file)
423 print("OK, discarding changes")
424 sys.exit(0)
425 elif answer == "S":
426 # Ready to save
427 break
428 else:
429 print("You pressed something you shouldn't have :(")
430 sys.exit(1)
432 # We seem to be done and also have a working file. Copy over.
433 write_transitions_from_file(edit_file)
434 os.unlink(edit_file)
436 print("Transitions file updated.")
439################################################################################
442def check_transitions(transitions) -> None:
443 """
444 Check if the defined transitions still apply and remove those that no longer do.
445 @note: Asks the user for confirmation first unless -a has been set.
447 """
448 global Cnf
450 to_dump = 0
451 to_remove = []
452 info = {}
454 session = DBConn().session()
456 # Now look through all defined transitions
457 for trans in transitions:
458 t = transitions[trans]
459 source = t["source"]
460 expected = t["new"]
462 # Will be an empty list if nothing is in testing.
463 sourceobj = get_source_in_suite(source, "testing", session)
465 info[trans] = get_info(
466 trans, source, expected, t["rm"], t["reason"], t["packages"]
467 )
468 print(info[trans])
470 if sourceobj is None:
471 # No package in testing
472 print(
473 "Transition source %s not in testing, transition still ongoing."
474 % (source)
475 )
476 else:
477 current = sourceobj.version
478 compare = apt_pkg.version_compare(current, expected)
479 if compare < 0:
480 # This is still valid, the current version in database is older than
481 # the new version we wait for
482 print(
483 "This transition is still ongoing, we currently have version %s"
484 % (current)
485 )
486 else:
487 print(
488 "REMOVE: This transition is over, the target package reached testing. REMOVE"
489 )
490 print("%s wanted version: %s, has %s" % (source, expected, current))
491 to_remove.append(trans)
492 to_dump = 1
493 print(
494 "-------------------------------------------------------------------------"
495 )
497 if to_dump:
498 prompt = "Removing: "
499 for remove in to_remove:
500 prompt += remove
501 prompt += ","
503 prompt += " Commit Changes? (y/N)"
504 answer = ""
506 if Options["no-action"]:
507 answer = "n"
508 elif Options["automatic"]:
509 answer = "y"
510 else:
511 answer = utils.input_or_exit(prompt).lower()
513 if answer == "":
514 answer = "n"
516 if answer == "n":
517 print("Not committing changes")
518 sys.exit(0)
519 elif answer == "y":
520 print("Committing")
521 subst = {}
522 subst["__SUBJECT__"] = "Transitions completed: " + ", ".join(
523 sorted(to_remove)
524 )
525 subst["__TRANSITION_MESSAGE__"] = (
526 "The following transitions were removed:\n"
527 )
528 for remove in sorted(to_remove):
529 subst["__TRANSITION_MESSAGE__"] += info[remove] + "\n"
530 del transitions[remove]
532 # If we have a mail address configured for transitions,
533 # send a notification
534 subst["__TRANSITION_EMAIL__"] = Cnf.get("Transitions::Notifications", "")
535 if subst["__TRANSITION_EMAIL__"] != "":
536 print("Sending notification to %s" % subst["__TRANSITION_EMAIL__"])
537 subst["__DAK_ADDRESS__"] = Cnf["Dinstall::MyEmailAddress"]
538 subst["__BCC__"] = "X-DAK: dak transitions"
539 if "Dinstall::Bcc" in Cnf:
540 subst["__BCC__"] += "\nBcc: %s" % Cnf["Dinstall::Bcc"]
541 message = utils.TemplateSubst(
542 subst, os.path.join(Cnf["Dir::Templates"], "transition.removed")
543 )
544 utils.send_mail(message)
546 edit_file = temp_transitions_file(transitions)
547 write_transitions_from_file(edit_file)
549 print("Done")
550 else:
551 print("WTF are you typing?")
552 sys.exit(0)
555################################################################################
558def get_info(
559 trans: str,
560 source: str,
561 expected: str,
562 rm: str,
563 reason: str,
564 packages: Sequence[str],
565) -> str:
566 """
567 Print information about a single transition.
569 :param trans: Transition name
570 :param source: Source package
571 :param expected: Expected version in testing
572 :param rm: Responsible release manager (RM)
573 :param reason: Reason
574 :param packages: list of blocked packages
575 """
576 return """Looking at transition: %s
577Source: %s
578New Version: %s
579Responsible: %s
580Description: %s
581Blocked Packages (total: %d): %s
582""" % (
583 trans,
584 source,
585 expected,
586 rm,
587 reason,
588 len(packages),
589 ", ".join(packages),
590 )
593################################################################################
596def transition_info(transitions) -> None:
597 """
598 Print information about all defined transitions.
599 Calls :func:`get_info` for every transition and then tells user if the transition is
600 still ongoing or if the expected version already hit testing.
602 :param transitions: defined transitions
603 """
605 session = DBConn().session()
607 for trans in transitions:
608 t = transitions[trans]
609 source = t["source"]
610 expected = t["new"]
612 # Will be None if nothing is in testing.
613 sourceobj = get_source_in_suite(source, "testing", session)
615 print(get_info(trans, source, expected, t["rm"], t["reason"], t["packages"]))
617 if sourceobj is None:
618 # No package in testing
619 print(
620 "Transition source %s not in testing, transition still ongoing."
621 % (source)
622 )
623 else:
624 compare = apt_pkg.version_compare(sourceobj.version, expected)
625 print("Apt compare says: %s" % (compare))
626 if compare < 0:
627 # This is still valid, the current version in database is older than
628 # the new version we wait for
629 print(
630 "This transition is still ongoing, we currently have version %s"
631 % (sourceobj.version)
632 )
633 else:
634 print(
635 "This transition is over, the target package reached testing, should be removed"
636 )
637 print(
638 "%s wanted version: %s, has %s"
639 % (source, expected, sourceobj.version)
640 )
641 print(
642 "-------------------------------------------------------------------------"
643 )
646################################################################################
649def main():
650 """
651 Prepare the work to be done, do basic checks.
653 .. warning::
655 This function may run **within sudo**
656 """
657 global Cnf
659 #####################################
660 #### This can run within sudo !! ####
661 #####################################
662 init()
664 # Check if there is a file defined (and existant)
665 transpath = Cnf.get("Dinstall::ReleaseTransitions", "")
666 if transpath == "":
667 utils.warn("Dinstall::ReleaseTransitions not defined")
668 sys.exit(1)
669 if not os.path.exists(transpath):
670 utils.warn(
671 "ReleaseTransitions file, %s, not found."
672 % (Cnf["Dinstall::ReleaseTransitions"])
673 )
674 sys.exit(1)
675 # Also check if our temp directory is defined and existant
676 temppath = Cnf.get("Dir::TempPath", "")
677 if temppath == "":
678 utils.warn("Dir::TempPath not defined")
679 sys.exit(1)
680 if not os.path.exists(temppath):
681 utils.warn("Temporary path %s not found." % (Cnf["Dir::TempPath"]))
682 sys.exit(1)
684 if Options["import"]:
685 try:
686 write_transitions_from_file(Options["import"])
687 except TransitionsError as m:
688 print(m)
689 sys.exit(2)
690 sys.exit(0)
691 ##############################################
692 #### Up to here it can run within sudo !! ####
693 ##############################################
695 # Parse the yaml file
696 transitions = load_transitions(transpath)
697 if transitions is None:
698 # Something very broken with the transitions, exit
699 utils.warn("Could not parse existing transitions file. Aborting.")
700 sys.exit(2)
702 if Options["edit"]:
703 # Let's edit the transitions file
704 edit_transitions()
705 elif Options["check"]:
706 # Check and remove outdated transitions
707 check_transitions(transitions)
708 else:
709 # Output information about the currently defined transitions.
710 print("Currently defined transitions:")
711 transition_info(transitions)
713 sys.exit(0)
716################################################################################
719if __name__ == "__main__":
720 main()