1""" 

2Helper code for file writing with optional compression. 

3 

4@contact: Debian FTPMaster <ftpmaster@debian.org> 

5@copyright: 2011 Torsten Werner <twerner@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9################################################################################ 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27import errno 

28import os 

29import os.path 

30import subprocess 

31from dataclasses import dataclass 

32from typing import Optional, TextIO 

33 

34 

35@dataclass 

36class CompressionMethod: 

37 keyword: str 

38 extension: str 

39 command: Optional[list[str]] 

40 

41 

42_compression_methods = ( 

43 CompressionMethod("bzip2", ".bz2", ["bzip2", "-9"]), 

44 CompressionMethod("gzip", ".gz", ["gzip", "-9cn", "--rsyncable", "--no-name"]), 

45 CompressionMethod("xz", ".xz", ["xz", "-c", "-e", "-T0"]), 

46 CompressionMethod("zstd", ".zst", ["zstd", "--compress"]), 

47 # 'none' must be the last compression method as BaseFileWriter 

48 # handling it will remove the input file for other compressions 

49 CompressionMethod("none", "", None), 

50) 

51 

52 

53class BaseFileWriter: 

54 """ 

55 Base class for compressed and uncompressed file writing. 

56 """ 

57 

58 def __init__(self, template, **keywords): 

59 """ 

60 The template argument is a string template like 

61 "dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" that 

62 should be relative to the archive's root directory. The keywords 

63 include strings for suite, component, architecture and booleans 

64 uncompressed, gzip, bzip2. 

65 """ 

66 self.compression = keywords.get("compression", ["none"]) 

67 self.path = template % keywords 

68 

69 def open(self) -> TextIO: 

70 """ 

71 Returns a file object for writing. 

72 """ 

73 # create missing directories 

74 try: 

75 os.makedirs(os.path.dirname(self.path)) 

76 except: 

77 pass 

78 self.file = open(self.path + ".new", "w") 

79 return self.file 

80 

81 # internal helper function 

82 def rename(self, filename: str) -> None: 

83 tempfilename = filename + ".new" 

84 os.chmod(tempfilename, 0o644) 

85 os.rename(tempfilename, filename) 

86 

87 # internal helper function to compress output 

88 def compress(self, cmd, suffix, path) -> None: 

89 in_filename = "{0}.new".format(path) 

90 out_filename = "{0}{1}.new".format(path, suffix) 

91 if cmd is not None: 91 ↛ 94line 91 didn't jump to line 94, because the condition on line 91 was never false

92 with open(in_filename, "r") as in_fh, open(out_filename, "w") as out_fh: 

93 subprocess.check_call(cmd, stdin=in_fh, stdout=out_fh, close_fds=True) 

94 self.rename("{0}{1}".format(path, suffix)) 

95 

96 def close(self) -> None: 

97 """ 

98 Closes the file object and does the compression and rename work. 

99 """ 

100 self.file.close() 

101 for method in _compression_methods: 

102 if method.keyword in self.compression: 

103 self.compress(method.command, method.extension, self.path) 

104 else: 

105 # Try removing the file that would be generated. 

106 # It's not an error if it does not exist. 

107 try: 

108 os.unlink("{0}{1}".format(self.path, method.extension)) 

109 except OSError as e: 

110 if e.errno != errno.ENOENT: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

111 raise 

112 else: 

113 os.unlink(self.path + ".new") 

114 

115 

116class BinaryContentsFileWriter(BaseFileWriter): 

117 def __init__(self, **keywords): 

118 """ 

119 The value of the keywords suite, component, and architecture are 

120 strings. The value of component may be omitted if not applicable. 

121 Output files are gzip compressed only. 

122 """ 

123 flags = { 

124 "compression": ["gzip"], 

125 } 

126 flags.update(keywords) 

127 if flags["debtype"] == "deb": 

128 template = ( 

129 "%(archive)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s" 

130 ) 

131 else: # udeb 

132 template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-udeb-%(architecture)s" 

133 BaseFileWriter.__init__(self, template, **flags) 

134 

135 

136class SourceContentsFileWriter(BaseFileWriter): 

137 def __init__(self, **keywords): 

138 """ 

139 The value of the keywords suite and component are strings. 

140 Output files are gzip compressed only. 

141 """ 

142 flags = { 

143 "compression": ["gzip"], 

144 } 

145 flags.update(keywords) 

146 template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-source" 

147 BaseFileWriter.__init__(self, template, **flags) 

148 

149 

150class PackagesFileWriter(BaseFileWriter): 

151 def __init__(self, **keywords): 

152 """ 

153 The value of the keywords suite, component, debtype and architecture 

154 are strings. Output files are gzip compressed only. 

155 """ 

156 flags = { 

157 "compression": ["gzip", "xz"], 

158 } 

159 flags.update(keywords) 

160 if flags["debtype"] == "deb": 

161 template = "%(archive)s/dists/%(suite)s/%(component)s/binary-%(architecture)s/Packages" 

162 else: # udeb 

163 template = "%(archive)s/dists/%(suite)s/%(component)s/debian-installer/binary-%(architecture)s/Packages" 

164 BaseFileWriter.__init__(self, template, **flags) 

165 

166 

167class SourcesFileWriter(BaseFileWriter): 

168 def __init__(self, **keywords): 

169 """ 

170 The value of the keywords suite and component are strings. Output 

171 files are gzip compressed only. 

172 """ 

173 flags = { 

174 "compression": ["gzip", "xz"], 

175 } 

176 flags.update(keywords) 

177 template = "%(archive)s/dists/%(suite)s/%(component)s/source/Sources" 

178 BaseFileWriter.__init__(self, template, **flags) 

179 

180 

181class TranslationFileWriter(BaseFileWriter): 

182 def __init__(self, **keywords): 

183 """ 

184 The value of the keywords suite, component and language are strings. 

185 Output files are bzip2 compressed only. 

186 """ 

187 flags = { 

188 "compression": ["bzip2"], 

189 "language": "en", 

190 } 

191 flags.update(keywords) 

192 template = ( 

193 "%(archive)s/dists/%(suite)s/%(component)s/i18n/Translation-%(language)s" 

194 ) 

195 super(TranslationFileWriter, self).__init__(template, **flags)