Coverage for daklib/policy.py: 88%
195 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to process policy queue uploads"""
19import errno
20import os
21import shutil
22from typing import TYPE_CHECKING, NotRequired, Optional, TypedDict
24import daklib.utils as utils
26from .config import Config
27from .dbconn import (
28 Component,
29 Override,
30 OverrideType,
31 PolicyQueueUpload,
32 Priority,
33 Section,
34 Suite,
35 get_mapped_component,
36 get_mapped_component_name,
37)
38from .fstransactions import FilesystemTransaction
39from .packagelist import PackageList
40from .regexes import re_file_changes, re_file_safe
42if TYPE_CHECKING:
43 from sqlalchemy.orm import Session
46class UploadCopy:
47 """export a policy queue upload
49 This class can be used in a with-statement::
51 with UploadCopy(...) as copy:
52 ...
54 Doing so will provide a temporary copy of the upload in the directory
55 given by the :attr:`directory` attribute. The copy will be removed
56 on leaving the with-block.
57 """
59 def __init__(self, upload: PolicyQueueUpload, group: Optional[str] = None):
60 """initializer
62 :param upload: upload to handle
63 """
65 self._directory: Optional[str] = None
66 self.upload = upload
67 self.group = group
69 @property
70 def directory(self) -> str:
71 assert self._directory is not None
72 return self._directory
74 def export(
75 self,
76 directory: str,
77 mode: Optional[int] = None,
78 symlink: bool = True,
79 ignore_existing: bool = False,
80 ) -> None:
81 """export a copy of the upload
83 :param directory: directory to export to
84 :param mode: permissions to use for the copied files
85 :param symlink: use symlinks instead of copying the files
86 :param ignore_existing: ignore already existing files
87 """
88 with FilesystemTransaction() as fs:
89 source = self.upload.source
90 queue = self.upload.policy_queue
92 if source is not None:
93 for dsc_file in source.srcfiles:
94 f = dsc_file.poolfile
95 dst = os.path.join(directory, os.path.basename(f.filename))
96 if not os.path.exists(dst) or not ignore_existing: 96 ↛ 93line 96 didn't jump to line 93 because the condition on line 96 was always true
97 fs.copy(f.fullpath, dst, mode=mode, symlink=symlink)
99 for binary in self.upload.binaries:
100 f = binary.poolfile
101 dst = os.path.join(directory, os.path.basename(f.filename))
102 if not os.path.exists(dst) or not ignore_existing: 102 ↛ 99line 102 didn't jump to line 99 because the condition on line 102 was always true
103 fs.copy(f.fullpath, dst, mode=mode, symlink=symlink)
105 # copy byhand files
106 for byhand in self.upload.byhand: 106 ↛ 107line 106 didn't jump to line 107 because the loop on line 106 never started
107 src = os.path.join(queue.path, byhand.filename)
108 dst = os.path.join(directory, byhand.filename)
109 if os.path.exists(src) and (
110 not os.path.exists(dst) or not ignore_existing
111 ):
112 fs.copy(src, dst, mode=mode, symlink=symlink)
114 # copy .changes
115 src = os.path.join(queue.path, self.upload.changes.changesname)
116 dst = os.path.join(directory, self.upload.changes.changesname)
117 if not os.path.exists(dst) or not ignore_existing: 117 ↛ 88line 117 didn't jump to line 88
118 fs.copy(src, dst, mode=mode, symlink=symlink)
120 def __enter__(self):
121 assert self._directory is None
123 mode = 0o0700
124 symlink = True
125 if self.group is not None: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 mode = 0o2750
127 symlink = False
129 cnf = Config()
130 self._directory = utils.temp_dirname(
131 parent=cnf.get("Dir::TempPath"), mode=mode, group=self.group
132 )
133 self.export(self.directory, symlink=symlink)
134 return self
136 def __exit__(self, *args):
137 if self._directory is not None: 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was always true
138 shutil.rmtree(self._directory)
139 self._directory = None
140 return None
143class MissingOverride(TypedDict):
144 package: str
145 priority: str
146 section: str
147 component: str
148 type: str
149 included: bool
150 valid: NotRequired[bool]
153class PolicyQueueUploadHandler:
154 """process uploads to policy queues
156 This class allows to accept or reject uploads and to get a list of missing
157 overrides (for NEW processing).
158 """
160 def __init__(self, upload: PolicyQueueUpload, session: "Session"):
161 """initializer
163 :param upload: upload to process
164 :param session: database session
165 """
166 self.upload = upload
167 self.session = session
169 @property
170 def _overridesuite(self) -> Suite:
171 overridesuite = self.upload.target_suite
172 if overridesuite.overridesuite is not None:
173 overridesuite = (
174 self.session.query(Suite)
175 .filter_by(suite_name=overridesuite.overridesuite)
176 .one()
177 )
178 return overridesuite
180 def _source_override(self, component_name: str) -> Override | None:
181 assert self.upload.source is not None
182 package = self.upload.source.source
183 suite = self._overridesuite
184 component = get_mapped_component(component_name, self.session)
185 query = (
186 self.session.query(Override)
187 .filter_by(package=package, suite=suite)
188 .join(OverrideType)
189 .filter(OverrideType.overridetype == "dsc")
190 .filter(Override.component == component)
191 )
192 return query.first()
194 def _binary_override(
195 self, name: str, binarytype, component_name: str
196 ) -> Override | None:
197 suite = self._overridesuite
198 component = get_mapped_component(component_name, self.session)
199 query = (
200 self.session.query(Override)
201 .filter_by(package=name, suite=suite)
202 .join(OverrideType)
203 .filter(OverrideType.overridetype == binarytype)
204 .filter(Override.component == component)
205 )
206 return query.first()
208 @property
209 def _changes_prefix(self) -> str:
210 changesname = self.upload.changes.changesname
211 assert changesname.endswith(".changes")
212 assert re_file_changes.match(changesname)
213 return changesname[0:-8]
215 def accept(self) -> None:
216 """mark upload as accepted"""
217 assert len(self.missing_overrides()) == 0
219 fn1 = "ACCEPT.{0}".format(self._changes_prefix)
220 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1)
221 try:
222 fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
223 with os.fdopen(fh, "wt") as f:
224 f.write("OK\n")
225 except OSError as e:
226 if e.errno == errno.EEXIST:
227 pass
228 else:
229 raise
231 def reject(self, reason: str) -> None:
232 """mark upload as rejected
234 :param reason: reason for the rejection
235 """
236 cnf = Config()
238 fn1 = "REJECT.{0}".format(self._changes_prefix)
239 assert re_file_safe.match(fn1)
241 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1)
242 try:
243 fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
244 with os.fdopen(fh, "wt") as f:
245 f.write("NOTOK\n")
246 f.write(
247 "From: {0} <{1}>\n\n".format(
248 utils.whoami(), cnf["Dinstall::MyAdminAddress"]
249 )
250 )
251 f.write(reason)
252 except OSError as e:
253 if e.errno == errno.EEXIST:
254 pass
255 else:
256 raise
258 def get_action(self) -> Optional[str]:
259 """get current action
261 :return: string giving the current action, one of 'ACCEPT', 'ACCEPTED', 'REJECT'
262 """
263 changes_prefix = self._changes_prefix
265 for action in ("ACCEPT", "ACCEPTED", "REJECT"):
266 fn1 = "{0}.{1}".format(action, changes_prefix)
267 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1)
268 if os.path.exists(fn):
269 return action
271 return None
273 def missing_overrides(
274 self, hints: Optional[list[MissingOverride]] = None
275 ) -> list[MissingOverride]:
276 """get missing override entries for the upload
278 :param hints: suggested hints for new overrides in the same format as
279 the return value
280 :return: list of dicts with the following keys:
282 - package: package name
283 - priority: default priority (from upload)
284 - section: default section (from upload)
285 - component: default component (from upload)
286 - type: type of required override ('dsc', 'deb' or 'udeb')
288 All values are strings.
289 """
290 # TODO: use Package-List field
291 missing: list[MissingOverride] = []
292 components = set()
294 source = self.upload.source
296 if hints is None:
297 hints = []
298 hints_map = dict([((o["type"] or "", o["package"]), o) for o in hints])
300 def check_override(
301 name: str,
302 type: str | None,
303 priority: str | None,
304 section: str | None,
305 included: bool,
306 ) -> None:
307 type = type or ""
308 section = section or ""
309 component = "main"
310 if section.find("/") != -1:
311 component = section.split("/", 1)[0]
312 override = self._binary_override(name, type, component)
313 if override is None and not any(
314 o["package"] == name and o["type"] == type for o in missing
315 ):
316 hint = hints_map.get((type, name))
317 if hint is not None:
318 missing.append(hint)
319 component = hint["component"]
320 else:
321 missing.append(
322 {
323 "package": name,
324 "priority": priority or "",
325 "section": section,
326 "component": component or "",
327 "type": type or "",
328 "included": included,
329 }
330 )
331 components.add(component)
333 for binary in self.upload.binaries:
334 binary_proxy = binary.proxy
335 priority = binary_proxy.get("Priority", "optional")
336 section = binary_proxy["Section"]
337 check_override(
338 binary.package, binary.binarytype, priority, section, included=True
339 )
341 if source is not None:
342 source_proxy = source.proxy
343 package_list = PackageList(source_proxy)
344 if not package_list.fallback: 344 ↛ 354line 344 didn't jump to line 354 because the condition on line 344 was always true
345 packages = package_list.packages_for_suite(self.upload.target_suite)
346 for p in packages:
347 check_override(
348 p.name, p.type, p.priority, p.section, included=False
349 )
351 # see daklib.archive.source_component_from_package_list
352 # which we cannot use here as we might not have a Package-List
353 # field for old packages
354 mapped_components = [get_mapped_component_name(c) for c in components]
355 source_component_db = (
356 self.session.query(Component)
357 .order_by(Component.ordering)
358 .filter(Component.component_name.in_(mapped_components))
359 .first()
360 )
361 assert source_component_db is not None
362 source_component = source_component_db.component_name
364 override = self._source_override(source_component)
365 if override is None:
366 hint = hints_map.get(("dsc", source.source))
367 if hint is not None:
368 missing.append(hint)
369 else:
370 section = "misc"
371 if source_component != "main":
372 section = "{0}/{1}".format(source_component, section)
373 missing.append(
374 dict(
375 package=source.source,
376 priority="optional",
377 section=section,
378 component=source_component,
379 type="dsc",
380 included=True,
381 )
382 )
384 return missing
386 def add_overrides(self, new_overrides, suite: Suite) -> None:
387 if suite.overridesuite is not None:
388 suite = (
389 self.session.query(Suite)
390 .filter_by(suite_name=suite.overridesuite)
391 .one()
392 )
394 for override in new_overrides:
395 package = override["package"]
396 priority = (
397 self.session.query(Priority)
398 .filter_by(priority=override["priority"])
399 .first()
400 )
401 section = (
402 self.session.query(Section)
403 .filter_by(section=override["section"])
404 .first()
405 )
406 component = get_mapped_component(override["component"], self.session)
407 overridetype = (
408 self.session.query(OverrideType)
409 .filter_by(overridetype=override["type"])
410 .one()
411 )
413 if priority is None: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true
414 raise Exception(
415 "Invalid priority {0} for package {1}".format(priority, package)
416 )
417 if section is None: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 raise Exception(
419 "Invalid section {0} for package {1}".format(section, package)
420 )
421 if component is None: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 raise Exception(
423 "Invalid component {0} for package {1}".format(component, package)
424 )
426 o = Override(
427 package=package,
428 suite=suite,
429 component=component,
430 priority=priority,
431 section=section,
432 overridetype=overridetype,
433 )
434 self.session.add(o)
436 self.session.commit()