Coverage for daklib/import_repository.py: 25%

126 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1# Copyright (C) 2015, Ansgar Burchardt <ansgar@debian.org> 

2# 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7# 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12# 

13# You should have received a copy of the GNU General Public License along 

14# with this program; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17# This is still work-in-progress and far too incomplete. 

18# ruff: noqa 

19# type: ignore 

20 

21import os 

22import shutil 

23import tempfile 

24import urllib.error 

25import urllib.parse 

26import urllib.request 

27from typing import Optional 

28 

29import apt_pkg 

30from sqlalchemy.orm import object_session 

31 

32import daklib.compress 

33import daklib.config 

34import daklib.dakapt 

35import daklib.dbconn 

36import daklib.gpg 

37import daklib.regexes 

38import daklib.upload 

39from daklib.dbconn import Archive, DBBinary, DBSource, PoolFile 

40 

41# Hmm, maybe use APT directly for all of this? 

42 

43_release_hashes_fields = ("MD5Sum", "SHA1", "SHA256") 

44 

45 

46class Release: 

47 def __init__(self, base, suite_name, data): 

48 self._base = base 

49 self._suite_name = suite_name 

50 self._dict = apt_pkg.TagSection(data) 

51 self._hashes = daklib.upload.parse_file_list( 

52 self._dict, False, daklib.regexes.re_file_safe_slash, _release_hashes_fields 

53 ) 

54 

55 def architectures(self): 

56 return self._dict["Architectures"].split() 

57 

58 def components(self): 

59 return self._dict["Components"].split() 

60 

61 def packages(self, component, architecture): 

62 fn = "{0}/binary-{1}/Packages".format(component, architecture) 

63 tmp = obtain_release_file(self, fn) 

64 return apt_pkg.TagFile(tmp.fh()) 

65 

66 def sources(self, component): 

67 fn = "{0}/source/Sources".format(component) 

68 tmp = obtain_release_file(self, fn) 

69 return apt_pkg.TagFile(tmp.fh()) 

70 

71 def suite(self): 

72 return self._dict["Suite"] 

73 

74 def codename(self): 

75 return self._dict["Codename"] 

76 

77 # TODO: Handle Date/Valid-Until to make sure we import 

78 # a newer version than before 

79 

80 

81class File: 

82 def __init__(self): 

83 config = daklib.config.Config() 

84 self._tmp = tempfile.NamedTemporaryFile(dir=config["Dir::TempPath"]) 

85 

86 def fh(self): 

87 self._tmp.seek(0) 

88 return self._tmp 

89 

90 def hashes(self): 

91 return daklib.dakapt.DakHashes(self.fh()) 

92 

93 

94def obtain_file(base, path) -> File: 

95 """Obtain a file 'path' located below 'base' 

96 

97 .. note:: 

98 

99 return type can still change 

100 """ 

101 fn = "{0}/{1}".format(base, path) 

102 tmp = File() 

103 if fn.startswith("http://"): 

104 fh = urllib.request.urlopen(fn, timeout=300) 

105 shutil.copyfileobj(fh, tmp._tmp) 

106 fh.close() 

107 else: 

108 with open(fn, "rb") as fh: 

109 shutil.copyfileobj(fh, tmp._tmp) 

110 return tmp 

111 

112 

113def obtain_release(base, suite_name, keyring, fingerprint=None) -> Release: 

114 """Obtain release information""" 

115 tmp = obtain_file(base, "dists/{0}/InRelease".format(suite_name)) 

116 data = tmp.fh().read() 

117 f = daklib.gpg.SignedFile(data, [keyring]) 

118 r = Release(base, suite_name, f.contents) 

119 if r.suite() != suite_name and r.codename() != suite_name: 

120 raise Exception( 

121 "Suite {0} doesn't match suite or codename from Release file.".format( 

122 suite_name 

123 ) 

124 ) 

125 return r 

126 

127 

128_compressions = (".zst", ".xz", ".gz", ".bz2") 

129 

130 

131def obtain_release_file(release, filename) -> File: 

132 """Obtain file referenced from Release 

133 

134 A compressed version is automatically selected and decompressed if it exists. 

135 """ 

136 if filename not in release._hashes: 

137 raise ValueError("File {0} not referenced in Release".format(filename)) 

138 

139 compressed = False 

140 for ext in _compressions: 

141 compressed_file = filename + ext 

142 if compressed_file in release._hashes: 

143 compressed = True 

144 filename = compressed_file 

145 break 

146 

147 # Obtain file and check hashes 

148 tmp = obtain_file( 

149 release._base, "dists/{0}/{1}".format(release._suite_name, filename) 

150 ) 

151 hashedfile = release._hashes[filename] 

152 hashedfile.check_fh(tmp.fh()) 

153 

154 if compressed: 

155 tmp2 = File() 

156 daklib.compress.decompress(tmp.fh(), tmp2.fh(), filename) 

157 tmp = tmp2 

158 

159 return tmp 

160 

161 

162def import_source_to_archive(base, entry, transaction, archive, component) -> DBSource: 

163 """Import source package described by 'entry' into the given 'archive' and 'component' 

164 

165 'entry' needs to be a dict-like object with at least the following 

166 keys as used in a Sources index: Directory, Files, Checksums-Sha1, 

167 Checksums-Sha256 

168 """ 

169 # Obtain and verify files 

170 if not daklib.regexes.re_file_safe_slash.match(entry["Directory"]): 

171 raise Exception("Unsafe path in Directory field") 

172 hashed_files = daklib.upload.parse_file_list(entry, False) 

173 files = [] 

174 for f in hashed_files.values(): 

175 path = os.path.join(entry["Directory"], f.filename) 

176 tmp = obtain_file(base, path) 

177 f.check_fh(tmp.fh()) 

178 files.append(tmp) 

179 directory, f.input_filename = os.path.split(tmp.fh().name) 

180 

181 # Inject files into archive 

182 source = daklib.upload.Source( 

183 directory, list(hashed_files.values()), [], require_signature=False 

184 ) 

185 # TODO: ugly hack! 

186 for f in hashed_files.keys(): 

187 if f.endswith(".dsc"): 

188 continue 

189 source.files[f].input_filename = hashed_files[f].input_filename 

190 

191 # TODO: allow changed_by to be NULL 

192 changed_by = source.dsc["Maintainer"] 

193 db_changed_by = daklib.dbconn.get_or_set_maintainer(changed_by, transaction.session) 

194 db_source = transaction.install_source_to_archive( 

195 directory, source, archive, component, db_changed_by 

196 ) 

197 

198 return db_source 

199 

200 

201def import_package_to_suite(base, entry, transaction, suite, component) -> DBBinary: 

202 """Import binary package described by 'entry' into the given 'suite' and 'component' 

203 

204 'entry' needs to be a dict-like object with at least the following 

205 keys as used in a Packages index: Filename, Size, MD5sum, SHA1, 

206 SHA256 

207 """ 

208 # Obtain and verify file 

209 filename = entry["Filename"] 

210 tmp = obtain_file(base, filename) 

211 directory, fn = os.path.split(tmp.fh().name) 

212 hashedfile = daklib.upload.HashedFile( 

213 os.path.basename(filename), 

214 int(entry["Size"]), 

215 entry["MD5sum"], 

216 entry["SHA1"], 

217 entry["SHA256"], 

218 input_filename=fn, 

219 ) 

220 hashedfile.check_fh(tmp.fh()) 

221 

222 # Inject file into archive 

223 binary = daklib.upload.Binary(directory, hashedfile) 

224 db_binary = transaction.install_binary(directory, binary, suite, component) 

225 transaction.flush() 

226 

227 return db_binary 

228 

229 

230def import_source_to_suite(base, entry, transaction, suite, component): 

231 """Import source package described by 'entry' into the given 'suite' and 'component' 

232 

233 'entry' needs to be a dict-like object with at least the following 

234 keys as used in a Sources index: Directory, Files, Checksums-Sha1, 

235 Checksums-Sha256 

236 """ 

237 source = import_source_to_archive( 

238 base, entry, transaction, suite.archive, component 

239 ) 

240 source.suites.append(suite) 

241 transaction.flush() 

242 

243 

244def source_in_archive( 

245 source: str, 

246 version: str, 

247 archive: Archive, 

248 component: Optional[daklib.dbconn.Component] = None, 

249) -> bool: 

250 """Check that source package 'source' with version 'version' exists in 'archive', 

251 with an optional check for the given component 'component'. 

252 

253 .. note:: 

254 

255 This should probably be moved somewhere else 

256 """ 

257 session = object_session(archive) 

258 query = ( 

259 session.query(DBSource) 

260 .filter_by(source=source, version=version) 

261 .join(DBSource.poolfile) 

262 .join(PoolFile.archives) 

263 .filter_by(archive=archive) 

264 ) 

265 if component is not None: 

266 query = query.filter_by(component=component) 

267 return session.query(query.exists()).scalar()