Coverage for daklib/policy.py: 88%
195 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License along
14# with this program; if not, write to the Free Software Foundation, Inc.,
15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17"""module to process policy queue uploads"""
19import errno
20import os
21import shutil
22from typing import TYPE_CHECKING, NotRequired, Optional, TypedDict
24import daklib.utils as utils
26from .config import Config
27from .dbconn import (
28 Component,
29 Override,
30 OverrideType,
31 PolicyQueueUpload,
32 Priority,
33 Section,
34 Suite,
35 get_mapped_component,
36 get_mapped_component_name,
37)
38from .fstransactions import FilesystemTransaction
39from .packagelist import PackageList
40from .regexes import re_file_changes, re_file_safe
42if TYPE_CHECKING:
43 from collections.abc import Iterable
45 from sqlalchemy.orm import Session
48class UploadCopy:
49 """export a policy queue upload
51 This class can be used in a with-statement::
53 with UploadCopy(...) as copy:
54 ...
56 Doing so will provide a temporary copy of the upload in the directory
57 given by the :attr:`directory` attribute. The copy will be removed
58 on leaving the with-block.
59 """
61 def __init__(self, upload: PolicyQueueUpload, group: Optional[str] = None):
62 """initializer
64 :param upload: upload to handle
65 """
67 self._directory: Optional[str] = None
68 self.upload = upload
69 self.group = group
71 @property
72 def directory(self) -> str:
73 assert self._directory is not None
74 return self._directory
76 def export(
77 self,
78 directory: str,
79 mode: Optional[int] = None,
80 symlink: bool = True,
81 ignore_existing: bool = False,
82 ) -> None:
83 """export a copy of the upload
85 :param directory: directory to export to
86 :param mode: permissions to use for the copied files
87 :param symlink: use symlinks instead of copying the files
88 :param ignore_existing: ignore already existing files
89 """
90 with FilesystemTransaction() as fs:
91 source = self.upload.source
92 queue = self.upload.policy_queue
94 if source is not None:
95 for dsc_file in source.srcfiles:
96 f = dsc_file.poolfile
97 dst = os.path.join(directory, os.path.basename(f.filename))
98 if not os.path.exists(dst) or not ignore_existing: 98 ↛ 95line 98 didn't jump to line 95 because the condition on line 98 was always true
99 fs.copy(f.fullpath, dst, mode=mode, symlink=symlink)
101 for binary in self.upload.binaries:
102 f = binary.poolfile
103 dst = os.path.join(directory, os.path.basename(f.filename))
104 if not os.path.exists(dst) or not ignore_existing: 104 ↛ 101line 104 didn't jump to line 101 because the condition on line 104 was always true
105 fs.copy(f.fullpath, dst, mode=mode, symlink=symlink)
107 # copy byhand files
108 for byhand in self.upload.byhand: 108 ↛ 109line 108 didn't jump to line 109 because the loop on line 108 never started
109 src = os.path.join(queue.path, byhand.filename)
110 dst = os.path.join(directory, byhand.filename)
111 if os.path.exists(src) and (
112 not os.path.exists(dst) or not ignore_existing
113 ):
114 fs.copy(src, dst, mode=mode, symlink=symlink)
116 # copy .changes
117 src = os.path.join(queue.path, self.upload.changes.changesname)
118 dst = os.path.join(directory, self.upload.changes.changesname)
119 if not os.path.exists(dst) or not ignore_existing: 119 ↛ 90line 119 didn't jump to line 90
120 fs.copy(src, dst, mode=mode, symlink=symlink)
122 def __enter__(self):
123 assert self._directory is None
125 mode = 0o0700
126 symlink = True
127 if self.group is not None: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 mode = 0o2750
129 symlink = False
131 cnf = Config()
132 self._directory = utils.temp_dirname(
133 parent=cnf.get("Dir::TempPath"), mode=mode, group=self.group
134 )
135 self.export(self.directory, symlink=symlink)
136 return self
138 def __exit__(self, *args):
139 if self._directory is not None: 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true
140 shutil.rmtree(self._directory)
141 self._directory = None
142 return None
145class MissingOverride(TypedDict):
146 package: str
147 priority: str
148 section: str
149 component: str
150 type: str
151 included: NotRequired[bool]
152 valid: NotRequired[bool]
155class PolicyQueueUploadHandler:
156 """process uploads to policy queues
158 This class allows to accept or reject uploads and to get a list of missing
159 overrides (for NEW processing).
160 """
162 def __init__(self, upload: PolicyQueueUpload, session: "Session"):
163 """initializer
165 :param upload: upload to process
166 :param session: database session
167 """
168 self.upload = upload
169 self.session = session
171 @property
172 def _overridesuite(self) -> Suite:
173 overridesuite = self.upload.target_suite
174 if overridesuite.overridesuite is not None:
175 overridesuite = (
176 self.session.query(Suite)
177 .filter_by(suite_name=overridesuite.overridesuite)
178 .one()
179 )
180 return overridesuite
182 def _source_override(self, component_name: str) -> Override | None:
183 assert self.upload.source is not None
184 package = self.upload.source.source
185 suite = self._overridesuite
186 component = get_mapped_component(component_name, self.session)
187 query = (
188 self.session.query(Override)
189 .filter_by(package=package, suite=suite)
190 .join(OverrideType)
191 .filter(OverrideType.overridetype == "dsc")
192 .filter(Override.component == component)
193 )
194 return query.first()
196 def _binary_override(
197 self, name: str, binarytype, component_name: str
198 ) -> Override | None:
199 suite = self._overridesuite
200 component = get_mapped_component(component_name, self.session)
201 query = (
202 self.session.query(Override)
203 .filter_by(package=name, suite=suite)
204 .join(OverrideType)
205 .filter(OverrideType.overridetype == binarytype)
206 .filter(Override.component == component)
207 )
208 return query.first()
210 @property
211 def _changes_prefix(self) -> str:
212 changesname = self.upload.changes.changesname
213 assert changesname.endswith(".changes")
214 assert re_file_changes.match(changesname)
215 return changesname[0:-8]
217 def accept(self) -> None:
218 """mark upload as accepted"""
219 assert len(self.missing_overrides()) == 0
221 fn1 = "ACCEPT.{0}".format(self._changes_prefix)
222 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1)
223 try:
224 fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
225 with os.fdopen(fh, "wt") as f:
226 f.write("OK\n")
227 except OSError as e:
228 if e.errno == errno.EEXIST:
229 pass
230 else:
231 raise
233 def reject(self, reason: str, *, rejected_by: str | None) -> None:
234 """mark upload as rejected
236 :param reason: reason for the rejection
237 """
238 fn1 = "REJECT.{0}".format(self._changes_prefix)
239 assert re_file_safe.match(fn1)
241 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1)
242 try:
243 fh = os.open(fn, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
244 with os.fdopen(fh, "wt") as f:
245 f.write("NOTOK\n")
246 if rejected_by: 246 ↛ 248line 246 didn't jump to line 248 because the condition on line 246 was always true
247 f.write(f"From: {rejected_by}\n\n")
248 f.write(reason)
249 except OSError as e:
250 if e.errno == errno.EEXIST:
251 pass
252 else:
253 raise
255 def get_action(self) -> Optional[str]:
256 """get current action
258 :return: string giving the current action, one of 'ACCEPT', 'ACCEPTED', 'REJECT'
259 """
260 changes_prefix = self._changes_prefix
262 for action in ("ACCEPT", "ACCEPTED", "REJECT"):
263 fn1 = "{0}.{1}".format(action, changes_prefix)
264 fn = os.path.join(self.upload.policy_queue.path, "COMMENTS", fn1)
265 if os.path.exists(fn):
266 return action
268 return None
270 def missing_overrides(
271 self, hints: Optional[list[MissingOverride]] = None
272 ) -> list[MissingOverride]:
273 """get missing override entries for the upload
275 :param hints: suggested hints for new overrides in the same format as
276 the return value
277 :return: list of dicts with the following keys:
279 - package: package name
280 - priority: default priority (from upload)
281 - section: default section (from upload)
282 - component: default component (from upload)
283 - type: type of required override ('dsc', 'deb' or 'udeb')
285 All values are strings.
286 """
287 # TODO: use Package-List field
288 missing: list[MissingOverride] = []
289 components = set()
291 source = self.upload.source
293 if hints is None:
294 hints = []
295 hints_map = dict([((o["type"] or "", o["package"]), o) for o in hints])
297 def check_override(
298 name: str,
299 type: str | None,
300 priority: str | None,
301 section: str | None,
302 included: bool,
303 ) -> None:
304 type = type or ""
305 section = section or ""
306 component = "main"
307 if section.find("/") != -1:
308 component = section.split("/", 1)[0]
309 override = self._binary_override(name, type, component)
310 if override is None and not any(
311 o["package"] == name and o["type"] == type for o in missing
312 ):
313 hint = hints_map.get((type, name))
314 if hint is not None:
315 missing.append(hint)
316 component = hint["component"]
317 else:
318 missing.append(
319 {
320 "package": name,
321 "priority": priority or "",
322 "section": section,
323 "component": component or "",
324 "type": type or "",
325 "included": included,
326 }
327 )
328 components.add(component)
330 for binary in self.upload.binaries:
331 binary_proxy = binary.proxy
332 priority = binary_proxy.get("Priority", "optional")
333 section = binary_proxy["Section"]
334 check_override(
335 binary.package, binary.binarytype, priority, section, included=True
336 )
338 if source is not None:
339 source_proxy = source.proxy
340 package_list = PackageList(source_proxy)
341 if not package_list.fallback: 341 ↛ 351line 341 didn't jump to line 351 because the condition on line 341 was always true
342 packages = package_list.packages_for_suite(self.upload.target_suite)
343 for p in packages:
344 check_override(
345 p.name, p.type, p.priority, p.section, included=False
346 )
348 # see daklib.archive.source_component_from_package_list
349 # which we cannot use here as we might not have a Package-List
350 # field for old packages
351 mapped_components = [get_mapped_component_name(c) for c in components]
352 source_component_db = (
353 self.session.query(Component)
354 .order_by(Component.ordering)
355 .filter(Component.component_name.in_(mapped_components))
356 .first()
357 )
358 assert source_component_db is not None
359 source_component = source_component_db.component_name
361 override = self._source_override(source_component)
362 if override is None:
363 hint = hints_map.get(("dsc", source.source))
364 if hint is not None:
365 missing.append(hint)
366 else:
367 section = "misc"
368 if source_component != "main":
369 section = "{0}/{1}".format(source_component, section)
370 missing.append(
371 dict(
372 package=source.source,
373 priority="optional",
374 section=section,
375 component=source_component,
376 type="dsc",
377 included=True,
378 )
379 )
381 return missing
383 def add_overrides(
384 self, new_overrides: "Iterable[MissingOverride]", suite: Suite
385 ) -> None:
386 if suite.overridesuite is not None:
387 suite = (
388 self.session.query(Suite)
389 .filter_by(suite_name=suite.overridesuite)
390 .one()
391 )
393 for override in new_overrides:
394 package = override["package"]
395 priority = (
396 self.session.query(Priority)
397 .filter_by(priority=override["priority"])
398 .first()
399 )
400 section = (
401 self.session.query(Section)
402 .filter_by(section=override["section"])
403 .first()
404 )
405 component = get_mapped_component(override["component"], self.session)
406 overridetype = (
407 self.session.query(OverrideType)
408 .filter_by(overridetype=override["type"])
409 .one()
410 )
412 if priority is None: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 raise Exception(
414 "Invalid priority {0} for package {1}".format(priority, package)
415 )
416 if section is None: 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true
417 raise Exception(
418 "Invalid section {0} for package {1}".format(section, package)
419 )
420 if component is None: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 raise Exception(
422 "Invalid component {0} for package {1}".format(component, package)
423 )
425 o = Override(
426 package=package,
427 suite=suite,
428 component=component,
429 priority=priority,
430 section=section,
431 overridetype=overridetype,
432 )
433 self.session.add(o)
435 self.session.commit()