1# vim:set et sw=4: 

2 

3""" 

4multiprocessing for DAK 

5 

6@contact: Debian FTP Master <ftpmaster@debian.org> 

7@copyright: 2011 Ansgar Burchardt <ansgar@debian.org> 

8@license: GNU General Public License version 2 or later 

9""" 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25############################################################################### 

26 

27import traceback 

28from multiprocessing.pool import Pool 

29from signal import SIGALRM, SIGHUP, SIGPIPE, SIGTERM, signal 

30 

31import sqlalchemy.orm.session 

32 

33__all__ = [] 

34 

35PROC_STATUS_SUCCESS = 0 # Everything ok 

36PROC_STATUS_EXCEPTION = 1 # An exception was caught 

37PROC_STATUS_SIGNALRAISED = 2 # A signal was generated 

38PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message 

39 

40__all__.extend( 

41 [ 

42 "PROC_STATUS_SUCCESS", 

43 "PROC_STATUS_EXCEPTION", 

44 "PROC_STATUS_SIGNALRAISED", 

45 "PROC_STATUS_MISCFAILURE", 

46 ] 

47) 

48 

49 

50class SignalException(Exception): 

51 def __init__(self, signum): 

52 self.signum = signum 

53 

54 def __str__(self): 

55 return "<SignalException: %d>" % self.signum 

56 

57 

58__all__.append("SignalException") 

59 

60 

61def signal_handler(signum, info): 

62 raise SignalException(signum) 

63 

64 

65def _func_wrapper(func, *args, **kwds): 

66 # We need to handle signals to avoid hanging 

67 signal(SIGHUP, signal_handler) 

68 signal(SIGTERM, signal_handler) 

69 signal(SIGPIPE, signal_handler) 

70 signal(SIGALRM, signal_handler) 

71 

72 # We expect our callback function to return: 

73 # (status, messages) 

74 # Where: 

75 # status is one of PROC_STATUS_* 

76 # messages is a string used for logging 

77 try: 

78 return func(*args, **kwds) 

79 except SignalException as e: 

80 return (PROC_STATUS_SIGNALRAISED, e.signum) 

81 except Exception as e: 

82 return ( 

83 PROC_STATUS_EXCEPTION, 

84 "Exception: %s\n%s" % (e, traceback.format_exc()), 

85 ) 

86 finally: 

87 # Make sure connections are closed. We might die otherwise. 

88 sqlalchemy.orm.session.Session.close_all() 

89 

90 

91class DakProcessPool(Pool): 

92 def __init__(self, *args, **kwds): 

93 Pool.__init__(self, *args, **kwds) 

94 self.results = [] 

95 self.int_results = [] 

96 

97 def apply_async(self, func, args=(), kwds={}, callback=None): 

98 wrapper_args = list(args) 

99 wrapper_args.insert(0, func) 

100 self.int_results.append( 

101 Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback) 

102 ) 

103 

104 def join(self): 

105 Pool.join(self) 

106 for r in self.int_results: 

107 # return values were already handled in the callbacks, but asking 

108 # for them might raise exceptions which would otherwise be lost 

109 self.results.append(r.get()) 

110 

111 def overall_status(self) -> int: 

112 # Return the highest of our status results 

113 # This basically allows us to do sys.exit(overall_status()) and have us 

114 # exit 0 if everything was good and non-zero if not 

115 status = 0 

116 for r in self.results: 

117 if r[0] > status: 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true

118 status = r[0] 

119 return status 

120 

121 

122__all__.append("DakProcessPool")