# SPDX-License-Identifier: GPL-2.0-or-later
# © 2026, Ansgar 🙀 <ansgar@debian.org>
import logging
from typing import override
import grpc
from google.protobuf import empty_pb2
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload
from dak.policyqueue.v1 import policyqueue_pb2, policyqueue_pb2_grpc
from daklib.dbconn import DBChange, DBConn, PolicyQueue, PolicyQueueUpload
from daklib.policy import PolicyQueueUploadHandler
from daklib.rpc import to_timestamp
from daklib.rpc_auth import require_any_scope, require_scope
logger = logging.getLogger(__name__)
[docs]
class PolicyQueueServiceServicer(policyqueue_pb2_grpc.PolicyQueueServiceServicer):
def __init__(self, conn: DBConn) -> None:
self._conn = conn
_query = select(PolicyQueueUpload).options(
joinedload(PolicyQueueUpload.changes),
joinedload(PolicyQueueUpload.policy_queue),
joinedload(PolicyQueueUpload.target_suite),
)
[docs]
def _upload(
self, session: Session, policy_queue: str, name: str, *, for_update=False
) -> PolicyQueueUpload | None:
query = (
self._query.join(PolicyQueueUpload.changes)
.join(PolicyQueueUpload.policy_queue)
.where(PolicyQueue.queue_name == policy_queue, DBChange.changesname == name)
)
if for_update:
query = query.with_for_update(of=PolicyQueueUpload)
return session.execute(query).scalar_one_or_none()
[docs]
@override
def ListUploads(
self, request: policyqueue_pb2.ListUploadsRequest, context: grpc.ServicerContext
) -> policyqueue_pb2.ListUploadsResponse:
require_scope(context, "policyqueue:read")
logger.debug("list uploads: policy_queue=%s", request.policy_queue)
with self._conn.session() as session:
query = self._query.join(PolicyQueueUpload.policy_queue).where(
PolicyQueue.queue_name == request.policy_queue
)
uploads = session.execute(query).scalars()
return policyqueue_pb2.ListUploadsResponse(
uploads=[
policyqueue_pb2.Upload(
name=u.changes.changesname,
policy_queue=u.policy_queue.queue_name,
target_suite=u.target_suite.suite_name,
create_time=to_timestamp(u.changes.created),
)
for u in uploads
],
)
[docs]
@override
def GetUpload(
self, request: policyqueue_pb2.GetUploadRequest, context: grpc.ServicerContext
) -> policyqueue_pb2.Upload:
require_scope(context, "policyqueue:read")
logger.debug(
"get upload: policy_queue=%s name=%s",
request.policy_queue,
request.name,
)
with self._conn.session() as session:
upload = self._upload(session, request.policy_queue, request.name)
if upload is None:
context.abort(grpc.StatusCode.NOT_FOUND, "not found")
handler = PolicyQueueUploadHandler(upload, session)
missing_overrides = handler.missing_overrides()
return policyqueue_pb2.Upload(
name=upload.changes.changesname,
policy_queue=upload.policy_queue.queue_name,
target_suite=upload.target_suite.suite_name,
missing_overrides=[
policyqueue_pb2.Override(
package=o["package"],
component=o["component"],
priority=o["priority"],
section=o["section"],
type=o["type"],
)
for o in missing_overrides
],
create_time=to_timestamp(upload.changes.created),
)
[docs]
@override
def AddOverrides(
self,
request: policyqueue_pb2.AddOverridesRequest,
context: grpc.ServicerContext,
) -> empty_pb2.Empty:
require_any_scope(
context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"]
)
logger.info(
"add overrides: policy_queue=%s name=%s count=%d",
request.policy_queue,
request.name,
len(request.overrides),
)
with self._conn.session() as session:
upload = self._upload(
session, request.policy_queue, request.name, for_update=True
)
if not upload:
context.abort(grpc.StatusCode.NOT_FOUND, "not found")
handler = PolicyQueueUploadHandler(upload, session)
missing_overrides = {
(o["component"], o["type"], o["package"])
for o in handler.missing_overrides()
}
unneeded_overrides = [
o
for o in request.overrides
if (o.component, o.type, o.package) not in missing_overrides
]
if unneeded_overrides:
packages = ", ".join(
f"{o.type}:{o.package}" for o in unneeded_overrides
)
context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
f"no new override needed for {packages}",
)
handler.add_overrides(
[
{
"component": o.component,
"type": o.type,
"package": o.package,
"priority": o.priority,
"section": o.section,
}
for o in request.overrides
],
upload.target_suite,
)
return empty_pb2.Empty()
[docs]
@override
def AcceptUpload(
self,
request: policyqueue_pb2.AcceptUploadRequest,
context: grpc.ServicerContext,
) -> empty_pb2.Empty:
require_any_scope(
context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"]
)
logger.info(
"accept upload: policy_queue=%s name=%s",
request.policy_queue,
request.name,
)
with self._conn.session() as session:
upload = self._upload(
session, request.policy_queue, request.name, for_update=True
)
if not upload:
context.abort(grpc.StatusCode.NOT_FOUND, "not found")
handler = PolicyQueueUploadHandler(upload, session)
if missing_overrides := handler.missing_overrides():
packages = ", ".join(
f"{o['type']}:{o['package']}" for o in missing_overrides
)
context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
f"missing overrides for {packages}",
)
if (action := handler.get_action()) and action != "ACCEPT":
context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
f"upload is already processed with action {action}",
)
handler.accept()
return empty_pb2.Empty()
[docs]
@override
def RejectUpload(
self,
request: policyqueue_pb2.RejectUploadRequest,
context: grpc.ServicerContext,
) -> empty_pb2.Empty:
require_any_scope(
context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"]
)
logger.info(
"reject upload: policy_queue=%s name=%s rejected_by=%s",
request.policy_queue,
request.name,
request.rejected_by,
)
with self._conn.session() as session:
upload = self._upload(
session, request.policy_queue, request.name, for_update=True
)
if not upload:
context.abort(grpc.StatusCode.NOT_FOUND, "not found")
handler = PolicyQueueUploadHandler(upload, session)
if (action := handler.get_action()) and action != "REJECT":
context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
f"upload is already processed with action {action}",
)
handler.reject(request.reason, rejected_by=request.rejected_by)
return empty_pb2.Empty()