1""" 

2Helper code for file writing with optional compression. 

3 

4@contact: Debian FTPMaster <ftpmaster@debian.org> 

5@copyright: 2011 Torsten Werner <twerner@debian.org> 

6@license: GNU General Public License version 2 or later 

7""" 

8 

9################################################################################ 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27import errno 

28import os 

29import os.path 

30import subprocess 

31from dataclasses import dataclass 

32from typing import Optional, TextIO 

33 

34 

35@dataclass 

36class CompressionMethod: 

37 keyword: str 

38 extension: str 

39 command: Optional[list[str]] 

40 

41 

42_compression_methods = ( 

43 CompressionMethod('bzip2', '.bz2', ['bzip2', '-9']), 

44 CompressionMethod('gzip', '.gz', ['gzip', '-9cn', '--rsyncable', '--no-name']), 

45 CompressionMethod('xz', '.xz', ['xz', '-c', '-e', '-T0']), 

46 CompressionMethod('zstd', '.zst', ['zstd', '--compress']), 

47 # 'none' must be the last compression method as BaseFileWriter 

48 # handling it will remove the input file for other compressions 

49 CompressionMethod('none', '', None), 

50) 

51 

52 

53class BaseFileWriter: 

54 ''' 

55 Base class for compressed and uncompressed file writing. 

56 ''' 

57 

58 def __init__(self, template, **keywords): 

59 ''' 

60 The template argument is a string template like 

61 "dists/%(suite)s/%(component)s/Contents-%(architecture)s.gz" that 

62 should be relative to the archive's root directory. The keywords 

63 include strings for suite, component, architecture and booleans 

64 uncompressed, gzip, bzip2. 

65 ''' 

66 self.compression = keywords.get('compression', ['none']) 

67 self.path = template % keywords 

68 

69 def open(self) -> TextIO: 

70 ''' 

71 Returns a file object for writing. 

72 ''' 

73 # create missing directories 

74 try: 

75 os.makedirs(os.path.dirname(self.path)) 

76 except: 

77 pass 

78 self.file = open(self.path + '.new', 'w') 

79 return self.file 

80 

81 # internal helper function 

82 def rename(self, filename: str) -> None: 

83 tempfilename = filename + '.new' 

84 os.chmod(tempfilename, 0o644) 

85 os.rename(tempfilename, filename) 

86 

87 # internal helper function to compress output 

88 def compress(self, cmd, suffix, path) -> None: 

89 in_filename = "{0}.new".format(path) 

90 out_filename = "{0}{1}.new".format(path, suffix) 

91 if cmd is not None: 91 ↛ 94line 91 didn't jump to line 94, because the condition on line 91 was never false

92 with open(in_filename, 'r') as in_fh, open(out_filename, 'w') as out_fh: 

93 subprocess.check_call(cmd, stdin=in_fh, stdout=out_fh, close_fds=True) 

94 self.rename("{0}{1}".format(path, suffix)) 

95 

96 def close(self) -> None: 

97 ''' 

98 Closes the file object and does the compression and rename work. 

99 ''' 

100 self.file.close() 

101 for method in _compression_methods: 

102 if method.keyword in self.compression: 

103 self.compress(method.command, method.extension, self.path) 

104 else: 

105 # Try removing the file that would be generated. 

106 # It's not an error if it does not exist. 

107 try: 

108 os.unlink("{0}{1}".format(self.path, method.extension)) 

109 except OSError as e: 

110 if e.errno != errno.ENOENT: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

111 raise 

112 else: 

113 os.unlink(self.path + '.new') 

114 

115 

116class BinaryContentsFileWriter(BaseFileWriter): 

117 def __init__(self, **keywords): 

118 ''' 

119 The value of the keywords suite, component, and architecture are 

120 strings. The value of component may be omitted if not applicable. 

121 Output files are gzip compressed only. 

122 ''' 

123 flags = { 

124 'compression': ['gzip'], 

125 } 

126 flags.update(keywords) 

127 if flags['debtype'] == 'deb': 

128 template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-%(architecture)s" 

129 else: # udeb 

130 template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-udeb-%(architecture)s" 

131 BaseFileWriter.__init__(self, template, **flags) 

132 

133 

134class SourceContentsFileWriter(BaseFileWriter): 

135 def __init__(self, **keywords): 

136 ''' 

137 The value of the keywords suite and component are strings. 

138 Output files are gzip compressed only. 

139 ''' 

140 flags = { 

141 'compression': ['gzip'], 

142 } 

143 flags.update(keywords) 

144 template = "%(archive)s/dists/%(suite)s/%(component)s/Contents-source" 

145 BaseFileWriter.__init__(self, template, **flags) 

146 

147 

148class PackagesFileWriter(BaseFileWriter): 

149 def __init__(self, **keywords): 

150 ''' 

151 The value of the keywords suite, component, debtype and architecture 

152 are strings. Output files are gzip compressed only. 

153 ''' 

154 flags = { 

155 'compression': ['gzip', 'xz'], 

156 } 

157 flags.update(keywords) 

158 if flags['debtype'] == 'deb': 

159 template = "%(archive)s/dists/%(suite)s/%(component)s/binary-%(architecture)s/Packages" 

160 else: # udeb 

161 template = "%(archive)s/dists/%(suite)s/%(component)s/debian-installer/binary-%(architecture)s/Packages" 

162 BaseFileWriter.__init__(self, template, **flags) 

163 

164 

165class SourcesFileWriter(BaseFileWriter): 

166 def __init__(self, **keywords): 

167 ''' 

168 The value of the keywords suite and component are strings. Output 

169 files are gzip compressed only. 

170 ''' 

171 flags = { 

172 'compression': ['gzip', 'xz'], 

173 } 

174 flags.update(keywords) 

175 template = "%(archive)s/dists/%(suite)s/%(component)s/source/Sources" 

176 BaseFileWriter.__init__(self, template, **flags) 

177 

178 

179class TranslationFileWriter(BaseFileWriter): 

180 def __init__(self, **keywords): 

181 ''' 

182 The value of the keywords suite, component and language are strings. 

183 Output files are bzip2 compressed only. 

184 ''' 

185 flags = { 

186 'compression': ['bzip2'], 

187 'language': 'en', 

188 } 

189 flags.update(keywords) 

190 template = "%(archive)s/dists/%(suite)s/%(component)s/i18n/Translation-%(language)s" 

191 super(TranslationFileWriter, self).__init__(template, **flags)