1# vim:set et sw=4:
3"""
4multiprocessing for DAK
6@contact: Debian FTP Master <ftpmaster@debian.org>
7@copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
8@license: GNU General Public License version 2 or later
9"""
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25###############################################################################
27from multiprocessing.pool import Pool
28from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
29import traceback
31import sqlalchemy.orm.session
33__all__ = []
35PROC_STATUS_SUCCESS = 0 # Everything ok
36PROC_STATUS_EXCEPTION = 1 # An exception was caught
37PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
38PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
40__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
41 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
44class SignalException(Exception):
45 def __init__(self, signum):
46 self.signum = signum
48 def __str__(self):
49 return "<SignalException: %d>" % self.signum
52__all__.append('SignalException')
55def signal_handler(signum, info):
56 raise SignalException(signum)
59def _func_wrapper(func, *args, **kwds):
60 # We need to handle signals to avoid hanging
61 signal(SIGHUP, signal_handler)
62 signal(SIGTERM, signal_handler)
63 signal(SIGPIPE, signal_handler)
64 signal(SIGALRM, signal_handler)
66 # We expect our callback function to return:
67 # (status, messages)
68 # Where:
69 # status is one of PROC_STATUS_*
70 # messages is a string used for logging
71 try:
72 return (func(*args, **kwds))
73 except SignalException as e:
74 return (PROC_STATUS_SIGNALRAISED, e.signum)
75 except Exception as e:
76 return (PROC_STATUS_EXCEPTION, "Exception: %s\n%s" % (e, traceback.format_exc()))
77 finally:
78 # Make sure connections are closed. We might die otherwise.
79 sqlalchemy.orm.session.Session.close_all()
82class DakProcessPool(Pool):
83 def __init__(self, *args, **kwds):
84 Pool.__init__(self, *args, **kwds)
85 self.results = []
86 self.int_results = []
88 def apply_async(self, func, args=(), kwds={}, callback=None):
89 wrapper_args = list(args)
90 wrapper_args.insert(0, func)
91 self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
93 def join(self):
94 Pool.join(self)
95 for r in self.int_results:
96 # return values were already handled in the callbacks, but asking
97 # for them might raise exceptions which would otherwise be lost
98 self.results.append(r.get())
100 def overall_status(self) -> int:
101 # Return the highest of our status results
102 # This basically allows us to do sys.exit(overall_status()) and have us
103 # exit 0 if everything was good and non-zero if not
104 status = 0
105 for r in self.results:
106 if r[0] > status: 106 ↛ 107line 106 didn't jump to line 107, because the condition on line 106 was never true
107 status = r[0]
108 return status
111__all__.append('DakProcessPool')