1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""Transactions for filesystem actions 

18""" 

19 

20import errno 

21import os 

22import shutil 

23from typing import IO, Optional 

24 

25 

26class _FilesystemAction: 

27 @property 

28 def temporary_name(self) -> str: 

29 raise NotImplementedError() 

30 

31 def check_for_temporary(self) -> None: 

32 try: 

33 if os.path.exists(self.temporary_name): 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true

34 raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), self.temporary_name) 

35 except NotImplementedError: 

36 pass 

37 

38 

39class _FilesystemCopyAction(_FilesystemAction): 

40 def __init__(self, source, destination, link=True, symlink=False, mode=None): 

41 self.destination = destination 

42 self.need_cleanup = False 

43 

44 dirmode = 0o2755 

45 if mode is not None: 

46 dirmode = 0o2700 | mode 

47 # Allow +x for group and others if they have +r. 

48 if dirmode & 0o0040: 48 ↛ 50line 48 didn't jump to line 50, because the condition on line 48 was never false

49 dirmode = dirmode | 0o0010 

50 if dirmode & 0o0004: 

51 dirmode = dirmode | 0o0001 

52 

53 self.check_for_temporary() 

54 destdir = os.path.dirname(self.destination) 

55 if not os.path.exists(destdir): 

56 os.makedirs(destdir, dirmode) 

57 if symlink: 

58 os.symlink(source, self.destination) 

59 elif link: 

60 try: 

61 os.link(source, self.destination) 

62 except OSError: 

63 shutil.copy2(source, self.destination) 

64 else: 

65 shutil.copy2(source, self.destination) 

66 

67 self.need_cleanup = True 

68 if mode is not None: 

69 os.chmod(self.destination, mode) 

70 

71 @property 

72 def temporary_name(self): 

73 return self.destination 

74 

75 def commit(self): 

76 pass 

77 

78 def rollback(self): 

79 if self.need_cleanup: 79 ↛ exitline 79 didn't return from function 'rollback', because the condition on line 79 was never false

80 os.unlink(self.destination) 

81 self.need_cleanup = False 

82 

83 

84class _FilesystemUnlinkAction(_FilesystemAction): 

85 def __init__(self, path: str): 

86 self.path: str = path 

87 self.need_cleanup: bool = False 

88 

89 self.check_for_temporary() 

90 os.rename(self.path, self.temporary_name) 

91 self.need_cleanup: bool = True 

92 

93 @property 

94 def temporary_name(self) -> str: 

95 return "{0}.dak-rm".format(self.path) 

96 

97 def commit(self) -> None: 

98 if self.need_cleanup: 98 ↛ exitline 98 didn't return from function 'commit', because the condition on line 98 was never false

99 os.unlink(self.temporary_name) 

100 self.need_cleanup = False 

101 

102 def rollback(self) -> None: 

103 if self.need_cleanup: 103 ↛ exitline 103 didn't return from function 'rollback', because the condition on line 103 was never false

104 os.rename(self.temporary_name, self.path) 

105 self.need_cleanup = False 

106 

107 

108class _FilesystemCreateAction(_FilesystemAction): 

109 def __init__(self, path: str): 

110 self.path: str = path 

111 self.need_cleanup: bool = True 

112 

113 @property 

114 def temporary_name(self) -> str: 

115 return self.path 

116 

117 def commit(self) -> None: 

118 pass 

119 

120 def rollback(self) -> None: 

121 if self.need_cleanup: 

122 os.unlink(self.path) 

123 self.need_cleanup = False 

124 

125 

126class FilesystemTransaction: 

127 """transactions for filesystem actions""" 

128 

129 def __init__(self): 

130 self.actions = [] 

131 

132 def copy(self, source: str, destination: str, link: bool = False, symlink: bool = False, mode: Optional[int] = None) -> None: 

133 """copy `source` to `destination` 

134 

135 :param source: source file 

136 :param destination: destination file 

137 :param link: try hardlinking, falling back to copying 

138 :param symlink: create a symlink instead of copying 

139 :param mode: permissions to change `destination` to 

140 """ 

141 if isinstance(mode, str): 

142 mode = int(mode, 8) 

143 

144 self.actions.append(_FilesystemCopyAction(source, destination, link=link, symlink=symlink, mode=mode)) 

145 

146 def move(self, source: str, destination: str, mode: Optional[int] = None) -> None: 

147 """move `source` to `destination` 

148 

149 :param source: source file 

150 :param destination: destination file 

151 :param mode: permissions to change `destination` to 

152 """ 

153 self.copy(source, destination, link=True, mode=mode) 

154 self.unlink(source) 

155 

156 def unlink(self, path: str) -> None: 

157 """unlink `path` 

158 

159 :param path: file to unlink 

160 """ 

161 self.actions.append(_FilesystemUnlinkAction(path)) 

162 

163 def create(self, path: str, mode: Optional[int] = None, text: bool = True) -> IO: 

164 """create `filename` and return file handle 

165 

166 :param path: file to create 

167 :param mode: permissions for the new file 

168 :param text: open file in text mode 

169 :return: file handle of the new file 

170 """ 

171 if isinstance(mode, str): 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true

172 mode = int(mode, 8) 

173 

174 destdir = os.path.dirname(path) 

175 if not os.path.exists(destdir): 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true

176 os.makedirs(destdir, 0o2775) 

177 if os.path.exists(path): 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true

178 raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path) 

179 fh = open(path, 'w' if text else 'wb') 

180 self.actions.append(_FilesystemCreateAction(path)) 

181 if mode is not None: 

182 os.chmod(path, mode) 

183 return fh 

184 

185 def commit(self): 

186 """Commit all recorded actions.""" 

187 try: 

188 for action in self.actions: 

189 action.commit() 

190 except: 

191 self.rollback() 

192 raise 

193 finally: 

194 self.actions = [] 

195 

196 def rollback(self): 

197 """Undo all recorded actions.""" 

198 try: 

199 for action in self.actions: 

200 action.rollback() 

201 finally: 

202 self.actions = [] 

203 

204 def __enter__(self): 

205 return self 

206 

207 def __exit__(self, type, value, traceback): 

208 if type is None: 

209 self.commit() 

210 else: 

211 self.rollback() 

212 return None