1# vim:set et sw=4:
3"""
4multiprocessing for DAK
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
8@license: GNU General Public License version 2 or later
9"""
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25###############################################################################
27import traceback
28from multiprocessing.pool import Pool
29from signal import SIGALRM, SIGHUP, SIGPIPE, SIGTERM, signal
31import sqlalchemy.orm.session
33__all__ = []
35PROC_STATUS_SUCCESS = 0 # Everything ok
36PROC_STATUS_EXCEPTION = 1 # An exception was caught
37PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
38PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
40__all__.extend(
41 [
42 "PROC_STATUS_SUCCESS",
43 "PROC_STATUS_EXCEPTION",
44 "PROC_STATUS_SIGNALRAISED",
45 "PROC_STATUS_MISCFAILURE",
46 ]
47)
50class SignalException(Exception):
51 def __init__(self, signum):
52 self.signum = signum
54 def __str__(self):
55 return "<SignalException: %d>" % self.signum
58__all__.append("SignalException")
61def signal_handler(signum, info):
62 raise SignalException(signum)
65def _func_wrapper(func, *args, **kwds):
66 # We need to handle signals to avoid hanging
67 signal(SIGHUP, signal_handler)
68 signal(SIGTERM, signal_handler)
69 signal(SIGPIPE, signal_handler)
70 signal(SIGALRM, signal_handler)
72 # We expect our callback function to return:
73 # (status, messages)
74 # Where:
75 # status is one of PROC_STATUS_*
76 # messages is a string used for logging
77 try:
78 return func(*args, **kwds)
79 except SignalException as e:
80 return (PROC_STATUS_SIGNALRAISED, e.signum)
81 except Exception as e:
82 return (
83 PROC_STATUS_EXCEPTION,
84 "Exception: %s\n%s" % (e, traceback.format_exc()),
85 )
86 finally:
87 # Make sure connections are closed. We might die otherwise.
88 sqlalchemy.orm.session.Session.close_all()
91class DakProcessPool(Pool):
92 def __init__(self, *args, **kwds):
93 Pool.__init__(self, *args, **kwds)
94 self.results = []
95 self.int_results = []
97 def apply_async(self, func, args=(), kwds={}, callback=None):
98 wrapper_args = list(args)
99 wrapper_args.insert(0, func)
100 self.int_results.append(
101 Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback)
102 )
104 def join(self):
105 Pool.join(self)
106 for r in self.int_results:
107 # return values were already handled in the callbacks, but asking
108 # for them might raise exceptions which would otherwise be lost
109 self.results.append(r.get())
111 def overall_status(self) -> int:
112 # Return the highest of our status results
113 # This basically allows us to do sys.exit(overall_status()) and have us
114 # exit 0 if everything was good and non-zero if not
115 status = 0
116 for r in self.results:
117 if r[0] > status: 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true
118 status = r[0]
119 return status
122__all__.append("DakProcessPool")