Coverage for daklib/dakmultiprocessing.py: 96%

54 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# vim:set et sw=4: 

2 

3""" 

4multiprocessing for DAK 

5 

6@contact: Debian FTP Master <ftpmaster@debian.org> 

7@copyright: 2011 Ansgar Burchardt <ansgar@debian.org> 

8@license: GNU General Public License version 2 or later 

9""" 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25############################################################################### 

26 

27import traceback 

28from multiprocessing.pool import Pool 

29from signal import SIGALRM, SIGHUP, SIGPIPE, SIGTERM, signal 

30from typing import override 

31 

32import sqlalchemy.orm.session 

33 

34__all__ = [] 

35 

36PROC_STATUS_SUCCESS = 0 # Everything ok 

37PROC_STATUS_EXCEPTION = 1 # An exception was caught 

38PROC_STATUS_SIGNALRAISED = 2 # A signal was generated 

39PROC_STATUS_MISCFAILURE = 3 # Process specific error; see message 

40 

41__all__.extend( 

42 [ 

43 "PROC_STATUS_SUCCESS", 

44 "PROC_STATUS_EXCEPTION", 

45 "PROC_STATUS_SIGNALRAISED", 

46 "PROC_STATUS_MISCFAILURE", 

47 ] 

48) 

49 

50 

51class SignalException(Exception): 

52 def __init__(self, signum): 

53 self.signum = signum 

54 

55 @override 

56 def __str__(self): 

57 return "<SignalException: %d>" % self.signum 

58 

59 

60__all__.append("SignalException") 

61 

62 

63def signal_handler(signum, info): 

64 raise SignalException(signum) 

65 

66 

67def _func_wrapper(func, *args, **kwds): 

68 # We need to handle signals to avoid hanging 

69 signal(SIGHUP, signal_handler) 

70 signal(SIGTERM, signal_handler) 

71 signal(SIGPIPE, signal_handler) 

72 signal(SIGALRM, signal_handler) 

73 

74 # We expect our callback function to return: 

75 # (status, messages) 

76 # Where: 

77 # status is one of PROC_STATUS_* 

78 # messages is a string used for logging 

79 try: 

80 return func(*args, **kwds) 

81 except SignalException as e: 

82 return (PROC_STATUS_SIGNALRAISED, e.signum) 

83 except Exception as e: 

84 return ( 

85 PROC_STATUS_EXCEPTION, 

86 "Exception: %s\n%s" % (e, traceback.format_exc()), 

87 ) 

88 finally: 

89 # Make sure connections are closed. We might die otherwise. 

90 sqlalchemy.orm.session.Session.close_all() 

91 

92 

93class DakProcessPool(Pool): 

94 def __init__(self, *args, **kwds): 

95 Pool.__init__(self, *args, **kwds) 

96 self.results = [] 

97 self.int_results = [] 

98 

99 @override 

100 def apply_async(self, func, args=(), kwds={}, callback=None): 

101 wrapper_args = list(args) 

102 wrapper_args.insert(0, func) 

103 self.int_results.append( 

104 Pool.apply_async(self, _func_wrapper, wrapper_args, kwds, callback) 

105 ) 

106 

107 @override 

108 def join(self): 

109 Pool.join(self) 

110 for r in self.int_results: 

111 # return values were already handled in the callbacks, but asking 

112 # for them might raise exceptions which would otherwise be lost 

113 self.results.append(r.get()) 

114 

115 def overall_status(self) -> int: 

116 # Return the highest of our status results 

117 # This basically allows us to do sys.exit(overall_status()) and have us 

118 # exit 0 if everything was good and non-zero if not 

119 status = 0 

120 for r in self.results: 

121 if r[0] > status: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 status = r[0] 

123 return status 

124 

125 

126__all__.append("DakProcessPool")