Source code for daklib.dakmultiprocessing
# vim:set et sw=4:
"""
multiprocessing for DAK
@contact: Debian FTP Master <ftpmaster@debian.org>
@copyright: 2011 Ansgar Burchardt <ansgar@debian.org>
@license: GNU General Public License version 2 or later
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
###############################################################################
from multiprocessing.pool import Pool
from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM
import traceback
import sqlalchemy.orm.session
__all__ = []
PROC_STATUS_SUCCESS = 0 # Everything ok
PROC_STATUS_EXCEPTION = 1 # An exception was caught
PROC_STATUS_SIGNALRAISED = 2 # A signal was generated
PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message
__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION',
'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE'])
[docs]class SignalException(Exception):
def __init__(self, signum):
self.signum = signum
def __str__(self):
return "<SignalException: %d>" % self.signum
__all__.append('SignalException')
def signal_handler(signum, info):
raise SignalException(signum)
def _func_wrapper(func, *args, **kwds):
# We need to handle signals to avoid hanging
signal(SIGHUP, signal_handler)
signal(SIGTERM, signal_handler)
signal(SIGPIPE, signal_handler)
signal(SIGALRM, signal_handler)
# We expect our callback function to return:
# (status, messages)
# Where:
# status is one of PROC_STATUS_*
# messages is a string used for logging
try:
return (func(*args, **kwds))
except SignalException as e:
return (PROC_STATUS_SIGNALRAISED, e.signum)
except Exception as e:
return (PROC_STATUS_EXCEPTION, "Exception: %s\n%s" % (e, traceback.format_exc()))
finally:
# Make sure connections are closed. We might die otherwise.
sqlalchemy.orm.session.Session.close_all()
[docs]class DakProcessPool(Pool):
def __init__(self, *args, **kwds):
Pool.__init__(self, *args, **kwds)
self.results = []
self.int_results = []
[docs] def apply_async(self, func, args=(), kwds={}, callback=None):
wrapper_args = list(args)
wrapper_args.insert(0, func)
self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
[docs] def join(self):
Pool.join(self)
for r in self.int_results:
# return values were already handled in the callbacks, but asking
# for them might raise exceptions which would otherwise be lost
self.results.append(r.get())
[docs] def overall_status(self) -> int:
# Return the highest of our status results
# This basically allows us to do sys.exit(overall_status()) and have us
# exit 0 if everything was good and non-zero if not
status = 0
for r in self.results:
if r[0] > status:
status = r[0]
return status
__all__.append('DakProcessPool')