Coverage for daklib/fstransactions.py: 86%

128 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17"""Transactions for filesystem actions""" 

18 

19import errno 

20import os 

21import shutil 

22from typing import IO, Optional, override 

23 

24 

25class _FilesystemAction: 

26 @property 

27 def temporary_name(self) -> str: 

28 raise NotImplementedError() 

29 

30 def check_for_temporary(self) -> None: 

31 try: 

32 if os.path.exists(self.temporary_name): 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 raise OSError( 

34 errno.EEXIST, os.strerror(errno.EEXIST), self.temporary_name 

35 ) 

36 except NotImplementedError: 

37 pass 

38 

39 

40class _FilesystemCopyAction(_FilesystemAction): 

41 def __init__( 

42 self, 

43 source: str, 

44 destination: str, 

45 link=True, 

46 symlink=False, 

47 mode: int | None = None, 

48 ): 

49 self.destination = destination 

50 self.need_cleanup = False 

51 

52 dirmode = 0o2755 

53 if mode is not None: 

54 dirmode = 0o2700 | mode 

55 # Allow +x for group and others if they have +r. 

56 if dirmode & 0o0040: 56 ↛ 58line 56 didn't jump to line 58 because the condition on line 56 was always true

57 dirmode = dirmode | 0o0010 

58 if dirmode & 0o0004: 

59 dirmode = dirmode | 0o0001 

60 

61 self.check_for_temporary() 

62 destdir = os.path.dirname(self.destination) 

63 if not os.path.exists(destdir): 

64 os.makedirs(destdir, dirmode) 

65 if symlink: 

66 os.symlink(source, self.destination) 

67 elif link: 

68 try: 

69 os.link(source, self.destination) 

70 except OSError: 

71 shutil.copy2(source, self.destination) 

72 else: 

73 shutil.copy2(source, self.destination) 

74 

75 self.need_cleanup = True 

76 if mode is not None: 

77 os.chmod(self.destination, mode) 

78 

79 @property 

80 @override 

81 def temporary_name(self): 

82 return self.destination 

83 

84 def commit(self): 

85 pass 

86 

87 def rollback(self): 

88 if self.need_cleanup: 88 ↛ exitline 88 didn't return from function 'rollback' because the condition on line 88 was always true

89 os.unlink(self.destination) 

90 self.need_cleanup = False 

91 

92 

93class _FilesystemUnlinkAction(_FilesystemAction): 

94 def __init__(self, path: str): 

95 self.path: str = path 

96 self.need_cleanup: bool = False 

97 

98 self.check_for_temporary() 

99 os.rename(self.path, self.temporary_name) 

100 self.need_cleanup = True 

101 

102 @property 

103 @override 

104 def temporary_name(self) -> str: 

105 return "{0}.dak-rm".format(self.path) 

106 

107 def commit(self) -> None: 

108 if self.need_cleanup: 108 ↛ exitline 108 didn't return from function 'commit' because the condition on line 108 was always true

109 os.unlink(self.temporary_name) 

110 self.need_cleanup = False 

111 

112 def rollback(self) -> None: 

113 if self.need_cleanup: 113 ↛ exitline 113 didn't return from function 'rollback' because the condition on line 113 was always true

114 os.rename(self.temporary_name, self.path) 

115 self.need_cleanup = False 

116 

117 

118class _FilesystemCreateAction(_FilesystemAction): 

119 def __init__(self, path: str): 

120 self.path: str = path 

121 self.need_cleanup: bool = True 

122 

123 @property 

124 @override 

125 def temporary_name(self) -> str: 

126 return self.path 

127 

128 def commit(self) -> None: 

129 pass 

130 

131 def rollback(self) -> None: 

132 if self.need_cleanup: 

133 os.unlink(self.path) 

134 self.need_cleanup = False 

135 

136 

137class FilesystemTransaction: 

138 """transactions for filesystem actions""" 

139 

140 def __init__(self): 

141 self.actions = [] 

142 

143 def copy( 

144 self, 

145 source: str, 

146 destination: str, 

147 link: bool = False, 

148 symlink: bool = False, 

149 mode: Optional[int | str] = None, 

150 ) -> None: 

151 """copy `source` to `destination` 

152 

153 :param source: source file 

154 :param destination: destination file 

155 :param link: try hardlinking, falling back to copying 

156 :param symlink: create a symlink instead of copying 

157 :param mode: permissions to change `destination` to 

158 """ 

159 if isinstance(mode, str): 

160 mode = int(mode, 8) 

161 

162 self.actions.append( 

163 _FilesystemCopyAction( 

164 source, destination, link=link, symlink=symlink, mode=mode 

165 ) 

166 ) 

167 

168 def move(self, source: str, destination: str, mode: Optional[int] = None) -> None: 

169 """move `source` to `destination` 

170 

171 :param source: source file 

172 :param destination: destination file 

173 :param mode: permissions to change `destination` to 

174 """ 

175 self.copy(source, destination, link=True, mode=mode) 

176 self.unlink(source) 

177 

178 def unlink(self, path: str) -> None: 

179 """unlink `path` 

180 

181 :param path: file to unlink 

182 """ 

183 self.actions.append(_FilesystemUnlinkAction(path)) 

184 

185 def create(self, path: str, mode: Optional[int] = None, text: bool = True) -> IO: 

186 """create `filename` and return file handle 

187 

188 :param path: file to create 

189 :param mode: permissions for the new file 

190 :param text: open file in text mode 

191 :return: file handle of the new file 

192 """ 

193 if isinstance(mode, str): 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true

194 mode = int(mode, 8) 

195 

196 destdir = os.path.dirname(path) 

197 if not os.path.exists(destdir): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 os.makedirs(destdir, 0o2775) 

199 if os.path.exists(path): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path) 

201 fh = open(path, "w" if text else "wb") 

202 self.actions.append(_FilesystemCreateAction(path)) 

203 if mode is not None: 

204 os.chmod(path, mode) 

205 return fh 

206 

207 def commit(self): 

208 """Commit all recorded actions.""" 

209 try: 

210 for action in self.actions: 

211 action.commit() 

212 except: 

213 self.rollback() 

214 raise 

215 finally: 

216 self.actions = [] 

217 

218 def rollback(self): 

219 """Undo all recorded actions.""" 

220 try: 

221 for action in self.actions: 

222 action.rollback() 

223 finally: 

224 self.actions = [] 

225 

226 def __enter__(self): 

227 return self 

228 

229 def __exit__(self, type, value, traceback): 

230 if type is None: 

231 self.commit() 

232 else: 

233 self.rollback() 

234 return None