1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""Transactions for filesystem actions""" 

18 

19import errno 

20import os 

21import shutil 

22from typing import IO, Optional 

23 

24 

25class _FilesystemAction: 

26 @property 

27 def temporary_name(self) -> str: 

28 raise NotImplementedError() 

29 

30 def check_for_temporary(self) -> None: 

31 try: 

32 if os.path.exists(self.temporary_name): 32 ↛ 33line 32 didn't jump to line 33, because the condition on line 32 was never true

33 raise OSError( 

34 errno.EEXIST, os.strerror(errno.EEXIST), self.temporary_name 

35 ) 

36 except NotImplementedError: 

37 pass 

38 

39 

40class _FilesystemCopyAction(_FilesystemAction): 

41 def __init__(self, source, destination, link=True, symlink=False, mode=None): 

42 self.destination = destination 

43 self.need_cleanup = False 

44 

45 dirmode = 0o2755 

46 if mode is not None: 

47 dirmode = 0o2700 | mode 

48 # Allow +x for group and others if they have +r. 

49 if dirmode & 0o0040: 49 ↛ 51line 49 didn't jump to line 51, because the condition on line 49 was never false

50 dirmode = dirmode | 0o0010 

51 if dirmode & 0o0004: 

52 dirmode = dirmode | 0o0001 

53 

54 self.check_for_temporary() 

55 destdir = os.path.dirname(self.destination) 

56 if not os.path.exists(destdir): 

57 os.makedirs(destdir, dirmode) 

58 if symlink: 

59 os.symlink(source, self.destination) 

60 elif link: 

61 try: 

62 os.link(source, self.destination) 

63 except OSError: 

64 shutil.copy2(source, self.destination) 

65 else: 

66 shutil.copy2(source, self.destination) 

67 

68 self.need_cleanup = True 

69 if mode is not None: 

70 os.chmod(self.destination, mode) 

71 

72 @property 

73 def temporary_name(self): 

74 return self.destination 

75 

76 def commit(self): 

77 pass 

78 

79 def rollback(self): 

80 if self.need_cleanup: 80 ↛ exitline 80 didn't return from function 'rollback', because the condition on line 80 was never false

81 os.unlink(self.destination) 

82 self.need_cleanup = False 

83 

84 

85class _FilesystemUnlinkAction(_FilesystemAction): 

86 def __init__(self, path: str): 

87 self.path: str = path 

88 self.need_cleanup: bool = False 

89 

90 self.check_for_temporary() 

91 os.rename(self.path, self.temporary_name) 

92 self.need_cleanup: bool = True 

93 

94 @property 

95 def temporary_name(self) -> str: 

96 return "{0}.dak-rm".format(self.path) 

97 

98 def commit(self) -> None: 

99 if self.need_cleanup: 99 ↛ exitline 99 didn't return from function 'commit', because the condition on line 99 was never false

100 os.unlink(self.temporary_name) 

101 self.need_cleanup = False 

102 

103 def rollback(self) -> None: 

104 if self.need_cleanup: 104 ↛ exitline 104 didn't return from function 'rollback', because the condition on line 104 was never false

105 os.rename(self.temporary_name, self.path) 

106 self.need_cleanup = False 

107 

108 

109class _FilesystemCreateAction(_FilesystemAction): 

110 def __init__(self, path: str): 

111 self.path: str = path 

112 self.need_cleanup: bool = True 

113 

114 @property 

115 def temporary_name(self) -> str: 

116 return self.path 

117 

118 def commit(self) -> None: 

119 pass 

120 

121 def rollback(self) -> None: 

122 if self.need_cleanup: 

123 os.unlink(self.path) 

124 self.need_cleanup = False 

125 

126 

127class FilesystemTransaction: 

128 """transactions for filesystem actions""" 

129 

130 def __init__(self): 

131 self.actions = [] 

132 

133 def copy( 

134 self, 

135 source: str, 

136 destination: str, 

137 link: bool = False, 

138 symlink: bool = False, 

139 mode: Optional[int] = None, 

140 ) -> None: 

141 """copy `source` to `destination` 

142 

143 :param source: source file 

144 :param destination: destination file 

145 :param link: try hardlinking, falling back to copying 

146 :param symlink: create a symlink instead of copying 

147 :param mode: permissions to change `destination` to 

148 """ 

149 if isinstance(mode, str): 

150 mode = int(mode, 8) 

151 

152 self.actions.append( 

153 _FilesystemCopyAction( 

154 source, destination, link=link, symlink=symlink, mode=mode 

155 ) 

156 ) 

157 

158 def move(self, source: str, destination: str, mode: Optional[int] = None) -> None: 

159 """move `source` to `destination` 

160 

161 :param source: source file 

162 :param destination: destination file 

163 :param mode: permissions to change `destination` to 

164 """ 

165 self.copy(source, destination, link=True, mode=mode) 

166 self.unlink(source) 

167 

168 def unlink(self, path: str) -> None: 

169 """unlink `path` 

170 

171 :param path: file to unlink 

172 """ 

173 self.actions.append(_FilesystemUnlinkAction(path)) 

174 

175 def create(self, path: str, mode: Optional[int] = None, text: bool = True) -> IO: 

176 """create `filename` and return file handle 

177 

178 :param path: file to create 

179 :param mode: permissions for the new file 

180 :param text: open file in text mode 

181 :return: file handle of the new file 

182 """ 

183 if isinstance(mode, str): 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true

184 mode = int(mode, 8) 

185 

186 destdir = os.path.dirname(path) 

187 if not os.path.exists(destdir): 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true

188 os.makedirs(destdir, 0o2775) 

189 if os.path.exists(path): 189 ↛ 190line 189 didn't jump to line 190, because the condition on line 189 was never true

190 raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path) 

191 fh = open(path, "w" if text else "wb") 

192 self.actions.append(_FilesystemCreateAction(path)) 

193 if mode is not None: 

194 os.chmod(path, mode) 

195 return fh 

196 

197 def commit(self): 

198 """Commit all recorded actions.""" 

199 try: 

200 for action in self.actions: 

201 action.commit() 

202 except: 

203 self.rollback() 

204 raise 

205 finally: 

206 self.actions = [] 

207 

208 def rollback(self): 

209 """Undo all recorded actions.""" 

210 try: 

211 for action in self.actions: 

212 action.rollback() 

213 finally: 

214 self.actions = [] 

215 

216 def __enter__(self): 

217 return self 

218 

219 def __exit__(self, type, value, traceback): 

220 if type is None: 

221 self.commit() 

222 else: 

223 self.rollback() 

224 return None