Coverage for daklib/import_repository.py: 25%
126 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1# Copyright (C) 2015, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17# This is still work-in-progress and far too incomplete.
18# ruff: noqa
19# type: ignore
21import os
22import shutil
23import tempfile
24import urllib.error
25import urllib.parse
26import urllib.request
27from typing import Optional
29import apt_pkg
30from sqlalchemy.orm import object_session
32import daklib.compress
33import daklib.config
34import daklib.dakapt
35import daklib.dbconn
36import daklib.gpg
37import daklib.regexes
38import daklib.upload
39from daklib.dbconn import Archive, DBBinary, DBSource, PoolFile
41# Hmm, maybe use APT directly for all of this?
43_release_hashes_fields = ("MD5Sum", "SHA1", "SHA256")
46class Release:
47 def __init__(self, base, suite_name, data):
48 self._base = base
49 self._suite_name = suite_name
50 self._dict = apt_pkg.TagSection(data)
51 self._hashes = daklib.upload.parse_file_list(
52 self._dict, False, daklib.regexes.re_file_safe_slash, _release_hashes_fields
53 )
55 def architectures(self):
56 return self._dict["Architectures"].split()
58 def components(self):
59 return self._dict["Components"].split()
61 def packages(self, component, architecture):
62 fn = "{0}/binary-{1}/Packages".format(component, architecture)
63 tmp = obtain_release_file(self, fn)
64 return apt_pkg.TagFile(tmp.fh())
66 def sources(self, component):
67 fn = "{0}/source/Sources".format(component)
68 tmp = obtain_release_file(self, fn)
69 return apt_pkg.TagFile(tmp.fh())
71 def suite(self):
72 return self._dict["Suite"]
74 def codename(self):
75 return self._dict["Codename"]
77 # TODO: Handle Date/Valid-Until to make sure we import
78 # a newer version than before
81class File:
82 def __init__(self):
83 config = daklib.config.Config()
84 self._tmp = tempfile.NamedTemporaryFile(dir=config["Dir::TempPath"])
86 def fh(self):
87 self._tmp.seek(0)
88 return self._tmp
90 def hashes(self):
91 return daklib.dakapt.DakHashes(self.fh())
94def obtain_file(base, path) -> File:
95 """Obtain a file 'path' located below 'base'
97 .. note::
99 return type can still change
100 """
101 fn = "{0}/{1}".format(base, path)
102 tmp = File()
103 if fn.startswith("http://"):
104 fh = urllib.request.urlopen(fn, timeout=300)
105 shutil.copyfileobj(fh, tmp._tmp)
106 fh.close()
107 else:
108 with open(fn, "rb") as fh:
109 shutil.copyfileobj(fh, tmp._tmp)
110 return tmp
113def obtain_release(base, suite_name, keyring, fingerprint=None) -> Release:
114 """Obtain release information"""
115 tmp = obtain_file(base, "dists/{0}/InRelease".format(suite_name))
116 data = tmp.fh().read()
117 f = daklib.gpg.SignedFile(data, [keyring])
118 r = Release(base, suite_name, f.contents)
119 if r.suite() != suite_name and r.codename() != suite_name:
120 raise Exception(
121 "Suite {0} doesn't match suite or codename from Release file.".format(
122 suite_name
123 )
124 )
125 return r
128_compressions = (".zst", ".xz", ".gz", ".bz2")
131def obtain_release_file(release, filename) -> File:
132 """Obtain file referenced from Release
134 A compressed version is automatically selected and decompressed if it exists.
135 """
136 if filename not in release._hashes:
137 raise ValueError("File {0} not referenced in Release".format(filename))
139 compressed = False
140 for ext in _compressions:
141 compressed_file = filename + ext
142 if compressed_file in release._hashes:
143 compressed = True
144 filename = compressed_file
145 break
147 # Obtain file and check hashes
148 tmp = obtain_file(
149 release._base, "dists/{0}/{1}".format(release._suite_name, filename)
150 )
151 hashedfile = release._hashes[filename]
152 hashedfile.check_fh(tmp.fh())
154 if compressed:
155 tmp2 = File()
156 daklib.compress.decompress(tmp.fh(), tmp2.fh(), filename)
157 tmp = tmp2
159 return tmp
162def import_source_to_archive(base, entry, transaction, archive, component) -> DBSource:
163 """Import source package described by 'entry' into the given 'archive' and 'component'
165 'entry' needs to be a dict-like object with at least the following
166 keys as used in a Sources index: Directory, Files, Checksums-Sha1,
167 Checksums-Sha256
168 """
169 # Obtain and verify files
170 if not daklib.regexes.re_file_safe_slash.match(entry["Directory"]):
171 raise Exception("Unsafe path in Directory field")
172 hashed_files = daklib.upload.parse_file_list(entry, False)
173 files = []
174 for f in hashed_files.values():
175 path = os.path.join(entry["Directory"], f.filename)
176 tmp = obtain_file(base, path)
177 f.check_fh(tmp.fh())
178 files.append(tmp)
179 directory, f.input_filename = os.path.split(tmp.fh().name)
181 # Inject files into archive
182 source = daklib.upload.Source(
183 directory, list(hashed_files.values()), [], require_signature=False
184 )
185 # TODO: ugly hack!
186 for f in hashed_files.keys():
187 if f.endswith(".dsc"):
188 continue
189 source.files[f].input_filename = hashed_files[f].input_filename
191 # TODO: allow changed_by to be NULL
192 changed_by = source.dsc["Maintainer"]
193 db_changed_by = daklib.dbconn.get_or_set_maintainer(changed_by, transaction.session)
194 db_source = transaction.install_source_to_archive(
195 directory, source, archive, component, db_changed_by
196 )
198 return db_source
201def import_package_to_suite(base, entry, transaction, suite, component) -> DBBinary:
202 """Import binary package described by 'entry' into the given 'suite' and 'component'
204 'entry' needs to be a dict-like object with at least the following
205 keys as used in a Packages index: Filename, Size, MD5sum, SHA1,
206 SHA256
207 """
208 # Obtain and verify file
209 filename = entry["Filename"]
210 tmp = obtain_file(base, filename)
211 directory, fn = os.path.split(tmp.fh().name)
212 hashedfile = daklib.upload.HashedFile(
213 os.path.basename(filename),
214 int(entry["Size"]),
215 entry["MD5sum"],
216 entry["SHA1"],
217 entry["SHA256"],
218 input_filename=fn,
219 )
220 hashedfile.check_fh(tmp.fh())
222 # Inject file into archive
223 binary = daklib.upload.Binary(directory, hashedfile)
224 db_binary = transaction.install_binary(directory, binary, suite, component)
225 transaction.flush()
227 return db_binary
230def import_source_to_suite(base, entry, transaction, suite, component):
231 """Import source package described by 'entry' into the given 'suite' and 'component'
233 'entry' needs to be a dict-like object with at least the following
234 keys as used in a Sources index: Directory, Files, Checksums-Sha1,
235 Checksums-Sha256
236 """
237 source = import_source_to_archive(
238 base, entry, transaction, suite.archive, component
239 )
240 source.suites.append(suite)
241 transaction.flush()
244def source_in_archive(
245 source: str,
246 version: str,
247 archive: Archive,
248 component: Optional[daklib.dbconn.Component] = None,
249) -> bool:
250 """Check that source package 'source' with version 'version' exists in 'archive',
251 with an optional check for the given component 'component'.
253 .. note::
255 This should probably be moved somewhere else
256 """
257 session = object_session(archive)
258 query = (
259 session.query(DBSource)
260 .filter_by(source=source, version=version)
261 .join(DBSource.poolfile)
262 .join(PoolFile.archives)
263 .filter_by(archive=archive)
264 )
265 if component is not None:
266 query = query.filter_by(component=component)
267 return session.query(query.exists()).scalar()