Coverage for daklib/policy_rpc.py: 29%
85 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1# SPDX-License-Identifier: GPL-2.0-or-later
2# © 2026, Ansgar 🙀 <ansgar@debian.org>
4import logging
5from typing import override
7import grpc
8from google.protobuf import empty_pb2
9from sqlalchemy import select
10from sqlalchemy.orm import Session, joinedload
12from dak.policyqueue.v1 import policyqueue_pb2, policyqueue_pb2_grpc
13from daklib.dbconn import DBChange, DBConn, PolicyQueue, PolicyQueueUpload
14from daklib.policy import PolicyQueueUploadHandler
15from daklib.rpc import to_timestamp
16from daklib.rpc_auth import require_any_scope, require_scope
18logger = logging.getLogger(__name__)
21class PolicyQueueServiceServicer(policyqueue_pb2_grpc.PolicyQueueServiceServicer):
22 def __init__(self, conn: DBConn) -> None:
23 self._conn = conn
25 _query = select(PolicyQueueUpload).options(
26 joinedload(PolicyQueueUpload.changes),
27 joinedload(PolicyQueueUpload.policy_queue),
28 joinedload(PolicyQueueUpload.target_suite),
29 )
31 def _upload(
32 self, session: Session, policy_queue: str, name: str, *, for_update=False
33 ) -> PolicyQueueUpload | None:
34 query = (
35 self._query.join(PolicyQueueUpload.changes)
36 .join(PolicyQueueUpload.policy_queue)
37 .where(PolicyQueue.queue_name == policy_queue, DBChange.changesname == name)
38 )
39 if for_update:
40 query = query.with_for_update(of=PolicyQueueUpload)
41 return session.execute(query).scalar_one_or_none()
43 @override
44 def ListUploads(
45 self, request: policyqueue_pb2.ListUploadsRequest, context: grpc.ServicerContext
46 ) -> policyqueue_pb2.ListUploadsResponse:
47 require_scope(context, "policyqueue:read")
48 logger.debug("list uploads: policy_queue=%s", request.policy_queue)
49 with self._conn.session() as session:
50 query = self._query.join(PolicyQueueUpload.policy_queue).where(
51 PolicyQueue.queue_name == request.policy_queue
52 )
53 uploads = session.execute(query).scalars()
54 return policyqueue_pb2.ListUploadsResponse(
55 uploads=[
56 policyqueue_pb2.Upload(
57 name=u.changes.changesname,
58 policy_queue=u.policy_queue.queue_name,
59 target_suite=u.target_suite.suite_name,
60 create_time=to_timestamp(u.changes.created),
61 )
62 for u in uploads
63 ],
64 )
66 @override
67 def GetUpload(
68 self, request: policyqueue_pb2.GetUploadRequest, context: grpc.ServicerContext
69 ) -> policyqueue_pb2.Upload:
70 require_scope(context, "policyqueue:read")
71 logger.debug(
72 "get upload: policy_queue=%s name=%s",
73 request.policy_queue,
74 request.name,
75 )
76 with self._conn.session() as session:
77 upload = self._upload(session, request.policy_queue, request.name)
78 if upload is None:
79 context.abort(grpc.StatusCode.NOT_FOUND, "not found")
81 handler = PolicyQueueUploadHandler(upload, session)
82 missing_overrides = handler.missing_overrides()
84 return policyqueue_pb2.Upload(
85 name=upload.changes.changesname,
86 policy_queue=upload.policy_queue.queue_name,
87 target_suite=upload.target_suite.suite_name,
88 missing_overrides=[
89 policyqueue_pb2.Override(
90 package=o["package"],
91 component=o["component"],
92 priority=o["priority"],
93 section=o["section"],
94 type=o["type"],
95 )
96 for o in missing_overrides
97 ],
98 create_time=to_timestamp(upload.changes.created),
99 )
101 @override
102 def AddOverrides(
103 self,
104 request: policyqueue_pb2.AddOverridesRequest,
105 context: grpc.ServicerContext,
106 ) -> empty_pb2.Empty:
107 require_any_scope(
108 context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"]
109 )
110 logger.info(
111 "add overrides: policy_queue=%s name=%s count=%d",
112 request.policy_queue,
113 request.name,
114 len(request.overrides),
115 )
117 with self._conn.session() as session:
118 upload = self._upload(
119 session, request.policy_queue, request.name, for_update=True
120 )
121 if not upload:
122 context.abort(grpc.StatusCode.NOT_FOUND, "not found")
124 handler = PolicyQueueUploadHandler(upload, session)
125 missing_overrides = {
126 (o["component"], o["type"], o["package"])
127 for o in handler.missing_overrides()
128 }
129 unneeded_overrides = [
130 o
131 for o in request.overrides
132 if (o.component, o.type, o.package) not in missing_overrides
133 ]
134 if unneeded_overrides:
135 packages = ", ".join(
136 f"{o.type}:{o.package}" for o in unneeded_overrides
137 )
138 context.abort(
139 grpc.StatusCode.FAILED_PRECONDITION,
140 f"no new override needed for {packages}",
141 )
143 handler.add_overrides(
144 [
145 {
146 "component": o.component,
147 "type": o.type,
148 "package": o.package,
149 "priority": o.priority,
150 "section": o.section,
151 }
152 for o in request.overrides
153 ],
154 upload.target_suite,
155 )
157 return empty_pb2.Empty()
159 @override
160 def AcceptUpload(
161 self,
162 request: policyqueue_pb2.AcceptUploadRequest,
163 context: grpc.ServicerContext,
164 ) -> empty_pb2.Empty:
165 require_any_scope(
166 context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"]
167 )
168 logger.info(
169 "accept upload: policy_queue=%s name=%s",
170 request.policy_queue,
171 request.name,
172 )
173 with self._conn.session() as session:
174 upload = self._upload(
175 session, request.policy_queue, request.name, for_update=True
176 )
177 if not upload:
178 context.abort(grpc.StatusCode.NOT_FOUND, "not found")
180 handler = PolicyQueueUploadHandler(upload, session)
181 if missing_overrides := handler.missing_overrides():
182 packages = ", ".join(
183 f"{o['type']}:{o['package']}" for o in missing_overrides
184 )
185 context.abort(
186 grpc.StatusCode.FAILED_PRECONDITION,
187 f"missing overrides for {packages}",
188 )
189 if (action := handler.get_action()) and action != "ACCEPT":
190 context.abort(
191 grpc.StatusCode.FAILED_PRECONDITION,
192 f"upload is already processed with action {action}",
193 )
195 handler.accept()
197 return empty_pb2.Empty()
199 @override
200 def RejectUpload(
201 self,
202 request: policyqueue_pb2.RejectUploadRequest,
203 context: grpc.ServicerContext,
204 ) -> empty_pb2.Empty:
205 require_any_scope(
206 context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"]
207 )
208 logger.info(
209 "reject upload: policy_queue=%s name=%s rejected_by=%s",
210 request.policy_queue,
211 request.name,
212 request.rejected_by,
213 )
214 with self._conn.session() as session:
215 upload = self._upload(
216 session, request.policy_queue, request.name, for_update=True
217 )
218 if not upload:
219 context.abort(grpc.StatusCode.NOT_FOUND, "not found")
221 handler = PolicyQueueUploadHandler(upload, session)
222 if (action := handler.get_action()) and action != "REJECT":
223 context.abort(
224 grpc.StatusCode.FAILED_PRECONDITION,
225 f"upload is already processed with action {action}",
226 )
228 handler.reject(request.reason, rejected_by=request.rejected_by)
230 return empty_pb2.Empty()