1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""Transactions for filesystem actions"""
19import errno
20import os
21import shutil
22from typing import IO, Optional
25class _FilesystemAction:
26 @property
27 def temporary_name(self) -> str:
28 raise NotImplementedError()
30 def check_for_temporary(self) -> None:
31 try:
32 if os.path.exists(self.temporary_name): 32 ↛ 33line 32 didn't jump to line 33, because the condition on line 32 was never true
33 raise OSError(
34 errno.EEXIST, os.strerror(errno.EEXIST), self.temporary_name
35 )
36 except NotImplementedError:
37 pass
40class _FilesystemCopyAction(_FilesystemAction):
41 def __init__(self, source, destination, link=True, symlink=False, mode=None):
42 self.destination = destination
43 self.need_cleanup = False
45 dirmode = 0o2755
46 if mode is not None:
47 dirmode = 0o2700 | mode
48 # Allow +x for group and others if they have +r.
49 if dirmode & 0o0040: 49 ↛ 51line 49 didn't jump to line 51, because the condition on line 49 was never false
50 dirmode = dirmode | 0o0010
51 if dirmode & 0o0004:
52 dirmode = dirmode | 0o0001
54 self.check_for_temporary()
55 destdir = os.path.dirname(self.destination)
56 if not os.path.exists(destdir):
57 os.makedirs(destdir, dirmode)
58 if symlink:
59 os.symlink(source, self.destination)
60 elif link:
61 try:
62 os.link(source, self.destination)
63 except OSError:
64 shutil.copy2(source, self.destination)
65 else:
66 shutil.copy2(source, self.destination)
68 self.need_cleanup = True
69 if mode is not None:
70 os.chmod(self.destination, mode)
72 @property
73 def temporary_name(self):
74 return self.destination
76 def commit(self):
77 pass
79 def rollback(self):
80 if self.need_cleanup: 80 ↛ exitline 80 didn't return from function 'rollback', because the condition on line 80 was never false
81 os.unlink(self.destination)
82 self.need_cleanup = False
85class _FilesystemUnlinkAction(_FilesystemAction):
86 def __init__(self, path: str):
87 self.path: str = path
88 self.need_cleanup: bool = False
90 self.check_for_temporary()
91 os.rename(self.path, self.temporary_name)
92 self.need_cleanup: bool = True
94 @property
95 def temporary_name(self) -> str:
96 return "{0}.dak-rm".format(self.path)
98 def commit(self) -> None:
99 if self.need_cleanup: 99 ↛ exitline 99 didn't return from function 'commit', because the condition on line 99 was never false
100 os.unlink(self.temporary_name)
101 self.need_cleanup = False
103 def rollback(self) -> None:
104 if self.need_cleanup: 104 ↛ exitline 104 didn't return from function 'rollback', because the condition on line 104 was never false
105 os.rename(self.temporary_name, self.path)
106 self.need_cleanup = False
109class _FilesystemCreateAction(_FilesystemAction):
110 def __init__(self, path: str):
111 self.path: str = path
112 self.need_cleanup: bool = True
114 @property
115 def temporary_name(self) -> str:
116 return self.path
118 def commit(self) -> None:
119 pass
121 def rollback(self) -> None:
122 if self.need_cleanup:
123 os.unlink(self.path)
124 self.need_cleanup = False
127class FilesystemTransaction:
128 """transactions for filesystem actions"""
130 def __init__(self):
131 self.actions = []
133 def copy(
134 self,
135 source: str,
136 destination: str,
137 link: bool = False,
138 symlink: bool = False,
139 mode: Optional[int] = None,
140 ) -> None:
141 """copy `source` to `destination`
143 :param source: source file
144 :param destination: destination file
145 :param link: try hardlinking, falling back to copying
146 :param symlink: create a symlink instead of copying
147 :param mode: permissions to change `destination` to
148 """
149 if isinstance(mode, str):
150 mode = int(mode, 8)
152 self.actions.append(
153 _FilesystemCopyAction(
154 source, destination, link=link, symlink=symlink, mode=mode
155 )
156 )
158 def move(self, source: str, destination: str, mode: Optional[int] = None) -> None:
159 """move `source` to `destination`
161 :param source: source file
162 :param destination: destination file
163 :param mode: permissions to change `destination` to
164 """
165 self.copy(source, destination, link=True, mode=mode)
166 self.unlink(source)
168 def unlink(self, path: str) -> None:
169 """unlink `path`
171 :param path: file to unlink
172 """
173 self.actions.append(_FilesystemUnlinkAction(path))
175 def create(self, path: str, mode: Optional[int] = None, text: bool = True) -> IO:
176 """create `filename` and return file handle
178 :param path: file to create
179 :param mode: permissions for the new file
180 :param text: open file in text mode
181 :return: file handle of the new file
182 """
183 if isinstance(mode, str): 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true
184 mode = int(mode, 8)
186 destdir = os.path.dirname(path)
187 if not os.path.exists(destdir): 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true
188 os.makedirs(destdir, 0o2775)
189 if os.path.exists(path): 189 ↛ 190line 189 didn't jump to line 190, because the condition on line 189 was never true
190 raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
191 fh = open(path, "w" if text else "wb")
192 self.actions.append(_FilesystemCreateAction(path))
193 if mode is not None:
194 os.chmod(path, mode)
195 return fh
197 def commit(self):
198 """Commit all recorded actions."""
199 try:
200 for action in self.actions:
201 action.commit()
202 except:
203 self.rollback()
204 raise
205 finally:
206 self.actions = []
208 def rollback(self):
209 """Undo all recorded actions."""
210 try:
211 for action in self.actions:
212 action.rollback()
213 finally:
214 self.actions = []
216 def __enter__(self):
217 return self
219 def __exit__(self, type, value, traceback):
220 if type is None:
221 self.commit()
222 else:
223 self.rollback()
224 return None