Package daklib :: Module dakmultiprocessing
[hide private]
[frames] | no frames]

Source Code for Module daklib.dakmultiprocessing

  1  # vim:set et sw=4: 
  2   
  3  """ 
  4  multiprocessing for DAK 
  5   
  6  @contact: Debian FTP Master <ftpmaster@debian.org> 
  7  @copyright: 2011  Ansgar Burchardt <ansgar@debian.org> 
  8  @license: GNU General Public License version 2 or later 
  9  """ 
 10   
 11  # This program is free software; you can redistribute it and/or modify 
 12  # it under the terms of the GNU General Public License as published by 
 13  # the Free Software Foundation; either version 2 of the License, or 
 14  # (at your option) any later version. 
 15   
 16  # This program is distributed in the hope that it will be useful, 
 17  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 19  # GNU General Public License for more details. 
 20   
 21  # You should have received a copy of the GNU General Public License 
 22  # along with this program; if not, write to the Free Software 
 23  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 24   
 25  ############################################################################### 
 26   
 27  from multiprocessing.pool import Pool 
 28  from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM 
 29  import traceback 
 30   
 31  import sqlalchemy.orm.session 
 32   
 33  __all__ = [] 
 34   
 35  PROC_STATUS_SUCCESS = 0  # Everything ok 
 36  PROC_STATUS_EXCEPTION = 1  # An exception was caught 
 37  PROC_STATUS_SIGNALRAISED = 2  # A signal was generated 
 38  PROC_STATUS_MISCFAILURE = 3  # Process specific error; see message 
 39   
 40  __all__.extend(['PROC_STATUS_SUCCESS',      'PROC_STATUS_EXCEPTION', 
 41                  'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE']) 
 42   
 43   
44 -class SignalException(Exception):
45 - def __init__(self, signum):
46 self.signum = signum
47
48 - def __str__(self):
49 return "<SignalException: %d>" % self.signum
50 51 52 __all__.append('SignalException') 53 54
55 -def signal_handler(signum, info):
56 raise SignalException(signum)
57 58
59 -def _func_wrapper(func, *args, **kwds):
60 # We need to handle signals to avoid hanging 61 signal(SIGHUP, signal_handler) 62 signal(SIGTERM, signal_handler) 63 signal(SIGPIPE, signal_handler) 64 signal(SIGALRM, signal_handler) 65 66 # We expect our callback function to return: 67 # (status, messages) 68 # Where: 69 # status is one of PROC_STATUS_* 70 # messages is a string used for logging 71 try: 72 return (func(*args, **kwds)) 73 except SignalException as e: 74 return (PROC_STATUS_SIGNALRAISED, e.signum) 75 except Exception as e: 76 return (PROC_STATUS_EXCEPTION, "Exception: %s\n%s" % (e, traceback.format_exc())) 77 finally: 78 # Make sure connections are closed. We might die otherwise. 79 sqlalchemy.orm.session.Session.close_all()
80 81
82 -class DakProcessPool(Pool):
83 - def __init__(self, *args, **kwds):
84 Pool.__init__(self, *args, **kwds) 85 self.results = [] 86 self.int_results = []
87
88 - def apply_async(self, func, args=(), kwds={}, callback=None):
89 wrapper_args = list(args) 90 wrapper_args.insert(0, func) 91 self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback))
92
93 - def join(self):
94 Pool.join(self) 95 for r in self.int_results: 96 # return values were already handled in the callbacks, but asking 97 # for them might raise exceptions which would otherwise be lost 98 self.results.append(r.get())
99
100 - def overall_status(self):
101 # Return the highest of our status results 102 # This basically allows us to do sys.exit(overall_status()) and have us 103 # exit 0 if everything was good and non-zero if not 104 status = 0 105 for r in self.results: 106 if r[0] > status: 107 status = r[0] 108 return status
109 110 111 __all__.append('DakProcessPool') 112