Coverage for daklib/dakmultiprocessing.py: 96%
54 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1# vim:set et sw=4:
3"""
4multiprocessing for DAK
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
8@license: GNU General Public License version 2 or later
9"""
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25###############################################################################
27import traceback
28from multiprocessing.pool import Pool
29from signal import SIGALRM, SIGHUP, SIGPIPE, SIGTERM, signal
30from typing import override
32import sqlalchemy.orm.session
34__all__ = []
36PROC_STATUS_SUCCESS = 0 # Everything ok
37PROC_STATUS_EXCEPTION = 1 # An exception was caught
38PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
39PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
41__all__.extend(
42 [
43 "PROC_STATUS_SUCCESS",
44 "PROC_STATUS_EXCEPTION",
45 "PROC_STATUS_SIGNALRAISED",
46 "PROC_STATUS_MISCFAILURE",
47 ]
48)
51class SignalException(Exception):
52 def __init__(self, signum):
53 self.signum = signum
55 @override
56 def __str__(self):
57 return "<SignalException: %d>" % self.signum
60__all__.append("SignalException")
63def signal_handler(signum, info):
64 raise SignalException(signum)
67def _func_wrapper(func, *args, **kwds):
68 # We need to handle signals to avoid hanging
69 signal(SIGHUP, signal_handler)
70 signal(SIGTERM, signal_handler)
71 signal(SIGPIPE, signal_handler)
72 signal(SIGALRM, signal_handler)
74 # We expect our callback function to return:
75 # (status, messages)
76 # Where:
77 # status is one of PROC_STATUS_*
78 # messages is a string used for logging
79 try:
80 return func(*args, **kwds)
81 except SignalException as e:
82 return (PROC_STATUS_SIGNALRAISED, e.signum)
83 except Exception as e:
84 return (
85 PROC_STATUS_EXCEPTION,
86 "Exception: %s\n%s" % (e, traceback.format_exc()),
87 )
88 finally:
89 # Make sure connections are closed. We might die otherwise.
90 sqlalchemy.orm.session.Session.close_all()
93class DakProcessPool(Pool):
94 def __init__(self, *args, **kwds):
95 Pool.__init__(self, *args, **kwds)
96 self.results = []
97 self.int_results = []
99 @override
100 def apply_async(self, func, args=(), kwds={}, callback=None):
101 wrapper_args = list(args)
102 wrapper_args.insert(0, func)
103 self.int_results.append(
104 Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback)
105 )
107 @override
108 def join(self):
109 Pool.join(self)
110 for r in self.int_results:
111 # return values were already handled in the callbacks, but asking
112 # for them might raise exceptions which would otherwise be lost
113 self.results.append(r.get())
115 def overall_status(self) -> int:
116 # Return the highest of our status results
117 # This basically allows us to do sys.exit(overall_status()) and have us
118 # exit 0 if everything was good and non-zero if not
119 status = 0
120 for r in self.results:
121 if r[0] > status: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 status = r[0]
123 return status
126__all__.append("DakProcessPool")