1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""Transactions for filesystem actions
18"""
20import errno
21import os
22import shutil
23from typing import IO, Optional
26class _FilesystemAction:
27 @property
28 def temporary_name(self) -> str:
29 raise NotImplementedError()
31 def check_for_temporary(self) -> None:
32 try:
33 if os.path.exists(self.temporary_name): 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true
34 raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), self.temporary_name)
35 except NotImplementedError:
36 pass
39class _FilesystemCopyAction(_FilesystemAction):
40 def __init__(self, source, destination, link=True, symlink=False, mode=None):
41 self.destination = destination
42 self.need_cleanup = False
44 dirmode = 0o2755
45 if mode is not None:
46 dirmode = 0o2700 | mode
47 # Allow +x for group and others if they have +r.
48 if dirmode & 0o0040: 48 ↛ 50line 48 didn't jump to line 50, because the condition on line 48 was never false
49 dirmode = dirmode | 0o0010
50 if dirmode & 0o0004:
51 dirmode = dirmode | 0o0001
53 self.check_for_temporary()
54 destdir = os.path.dirname(self.destination)
55 if not os.path.exists(destdir):
56 os.makedirs(destdir, dirmode)
57 if symlink:
58 os.symlink(source, self.destination)
59 elif link:
60 try:
61 os.link(source, self.destination)
62 except OSError:
63 shutil.copy2(source, self.destination)
64 else:
65 shutil.copy2(source, self.destination)
67 self.need_cleanup = True
68 if mode is not None:
69 os.chmod(self.destination, mode)
71 @property
72 def temporary_name(self):
73 return self.destination
75 def commit(self):
76 pass
78 def rollback(self):
79 if self.need_cleanup: 79 ↛ exitline 79 didn't return from function 'rollback', because the condition on line 79 was never false
80 os.unlink(self.destination)
81 self.need_cleanup = False
84class _FilesystemUnlinkAction(_FilesystemAction):
85 def __init__(self, path: str):
86 self.path: str = path
87 self.need_cleanup: bool = False
89 self.check_for_temporary()
90 os.rename(self.path, self.temporary_name)
91 self.need_cleanup: bool = True
93 @property
94 def temporary_name(self) -> str:
95 return "{0}.dak-rm".format(self.path)
97 def commit(self) -> None:
98 if self.need_cleanup: 98 ↛ exitline 98 didn't return from function 'commit', because the condition on line 98 was never false
99 os.unlink(self.temporary_name)
100 self.need_cleanup = False
102 def rollback(self) -> None:
103 if self.need_cleanup: 103 ↛ exitline 103 didn't return from function 'rollback', because the condition on line 103 was never false
104 os.rename(self.temporary_name, self.path)
105 self.need_cleanup = False
108class _FilesystemCreateAction(_FilesystemAction):
109 def __init__(self, path: str):
110 self.path: str = path
111 self.need_cleanup: bool = True
113 @property
114 def temporary_name(self) -> str:
115 return self.path
117 def commit(self) -> None:
118 pass
120 def rollback(self) -> None:
121 if self.need_cleanup:
122 os.unlink(self.path)
123 self.need_cleanup = False
126class FilesystemTransaction:
127 """transactions for filesystem actions"""
129 def __init__(self):
130 self.actions = []
132 def copy(self, source: str, destination: str, link: bool = False, symlink: bool = False, mode: Optional[int] = None) -> None:
133 """copy `source` to `destination`
135 :param source: source file
136 :param destination: destination file
137 :param link: try hardlinking, falling back to copying
138 :param symlink: create a symlink instead of copying
139 :param mode: permissions to change `destination` to
140 """
141 if isinstance(mode, str):
142 mode = int(mode, 8)
144 self.actions.append(_FilesystemCopyAction(source, destination, link=link, symlink=symlink, mode=mode))
146 def move(self, source: str, destination: str, mode: Optional[int] = None) -> None:
147 """move `source` to `destination`
149 :param source: source file
150 :param destination: destination file
151 :param mode: permissions to change `destination` to
152 """
153 self.copy(source, destination, link=True, mode=mode)
154 self.unlink(source)
156 def unlink(self, path: str) -> None:
157 """unlink `path`
159 :param path: file to unlink
160 """
161 self.actions.append(_FilesystemUnlinkAction(path))
163 def create(self, path: str, mode: Optional[int] = None, text: bool = True) -> IO:
164 """create `filename` and return file handle
166 :param path: file to create
167 :param mode: permissions for the new file
168 :param text: open file in text mode
169 :return: file handle of the new file
170 """
171 if isinstance(mode, str): 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true
172 mode = int(mode, 8)
174 destdir = os.path.dirname(path)
175 if not os.path.exists(destdir): 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true
176 os.makedirs(destdir, 0o2775)
177 if os.path.exists(path): 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true
178 raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
179 fh = open(path, 'w' if text else 'wb')
180 self.actions.append(_FilesystemCreateAction(path))
181 if mode is not None:
182 os.chmod(path, mode)
183 return fh
185 def commit(self):
186 """Commit all recorded actions."""
187 try:
188 for action in self.actions:
189 action.commit()
190 except:
191 self.rollback()
192 raise
193 finally:
194 self.actions = []
196 def rollback(self):
197 """Undo all recorded actions."""
198 try:
199 for action in self.actions:
200 action.rollback()
201 finally:
202 self.actions = []
204 def __enter__(self):
205 return self
207 def __exit__(self, type, value, traceback):
208 if type is None:
209 self.commit()
210 else:
211 self.rollback()
212 return None