1# vim:set et sw=4: 

2 

3""" 

4multiprocessing for DAK 

5 

6@contact: Debian FTP Master <ftpmaster@debian.org> 

7@copyright: 2011 Ansgar Burchardt <ansgar@debian.org> 

8@license: GNU General Public License version 2 or later 

9""" 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25############################################################################### 

26 

27from multiprocessing.pool import Pool 

28from signal import signal, SIGHUP, SIGTERM, SIGPIPE, SIGALRM 

29import traceback 

30 

31import sqlalchemy.orm.session 

32 

33__all__ = [] 

34 

35PROC_STATUS_SUCCESS = 0 # Everything ok 

36PROC_STATUS_EXCEPTION = 1 # An exception was caught 

37PROC_STATUS_SIGNALRAISED = 2 # A signal was generated 

38PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message 

39 

40__all__.extend(['PROC_STATUS_SUCCESS', 'PROC_STATUS_EXCEPTION', 

41 'PROC_STATUS_SIGNALRAISED', 'PROC_STATUS_MISCFAILURE']) 

42 

43 

44class SignalException(Exception): 

45 def __init__(self, signum): 

46 self.signum = signum 

47 

48 def __str__(self): 

49 return "<SignalException: %d>" % self.signum 

50 

51 

52__all__.append('SignalException') 

53 

54 

55def signal_handler(signum, info): 

56 raise SignalException(signum) 

57 

58 

59def _func_wrapper(func, *args, **kwds): 

60 # We need to handle signals to avoid hanging 

61 signal(SIGHUP, signal_handler) 

62 signal(SIGTERM, signal_handler) 

63 signal(SIGPIPE, signal_handler) 

64 signal(SIGALRM, signal_handler) 

65 

66 # We expect our callback function to return: 

67 # (status, messages) 

68 # Where: 

69 # status is one of PROC_STATUS_* 

70 # messages is a string used for logging 

71 try: 

72 return (func(*args, **kwds)) 

73 except SignalException as e: 

74 return (PROC_STATUS_SIGNALRAISED, e.signum) 

75 except Exception as e: 

76 return (PROC_STATUS_EXCEPTION, "Exception: %s\n%s" % (e, traceback.format_exc())) 

77 finally: 

78 # Make sure connections are closed. We might die otherwise. 

79 sqlalchemy.orm.session.Session.close_all() 

80 

81 

82class DakProcessPool(Pool): 

83 def __init__(self, *args, **kwds): 

84 Pool.__init__(self, *args, **kwds) 

85 self.results = [] 

86 self.int_results = [] 

87 

88 def apply_async(self, func, args=(), kwds={}, callback=None): 

89 wrapper_args = list(args) 

90 wrapper_args.insert(0, func) 

91 self.int_results.append(Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback)) 

92 

93 def join(self): 

94 Pool.join(self) 

95 for r in self.int_results: 

96 # return values were already handled in the callbacks, but asking 

97 # for them might raise exceptions which would otherwise be lost 

98 self.results.append(r.get()) 

99 

100 def overall_status(self) -> int: 

101 # Return the highest of our status results 

102 # This basically allows us to do sys.exit(overall_status()) and have us 

103 # exit 0 if everything was good and non-zero if not 

104 status = 0 

105 for r in self.results: 

106 if r[0] > status: 106 ↛ 107line 106 didn't jump to line 107, because the condition on line 106 was never true

107 status = r[0] 

108 return status 

109 

110 

111__all__.append('DakProcessPool')