Source code for daklib.policy_rpc

# SPDX-License-Identifier: GPL-2.0-or-later
# © 2026, Ansgar 🙀 <ansgar@debian.org>

import logging
from typing import override

import grpc
from google.protobuf import empty_pb2
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload

from dak.policyqueue.v1 import policyqueue_pb2, policyqueue_pb2_grpc
from daklib.dbconn import DBChange, DBConn, PolicyQueue, PolicyQueueUpload
from daklib.policy import PolicyQueueUploadHandler
from daklib.rpc import to_timestamp
from daklib.rpc_auth import require_any_scope, require_scope

logger = logging.getLogger(__name__)


[docs] class PolicyQueueServiceServicer(policyqueue_pb2_grpc.PolicyQueueServiceServicer): def __init__(self, conn: DBConn) -> None: self._conn = conn _query = select(PolicyQueueUpload).options( joinedload(PolicyQueueUpload.changes), joinedload(PolicyQueueUpload.policy_queue), joinedload(PolicyQueueUpload.target_suite), )
[docs] def _upload( self, session: Session, policy_queue: str, name: str, *, for_update=False ) -> PolicyQueueUpload | None: query = ( self._query.join(PolicyQueueUpload.changes) .join(PolicyQueueUpload.policy_queue) .where(PolicyQueue.queue_name == policy_queue, DBChange.changesname == name) ) if for_update: query = query.with_for_update(of=PolicyQueueUpload) return session.execute(query).scalar_one_or_none()
[docs] @override def ListUploads( self, request: policyqueue_pb2.ListUploadsRequest, context: grpc.ServicerContext ) -> policyqueue_pb2.ListUploadsResponse: require_scope(context, "policyqueue:read") logger.debug("list uploads: policy_queue=%s", request.policy_queue) with self._conn.session() as session: query = self._query.join(PolicyQueueUpload.policy_queue).where( PolicyQueue.queue_name == request.policy_queue ) uploads = session.execute(query).scalars() return policyqueue_pb2.ListUploadsResponse( uploads=[ policyqueue_pb2.Upload( name=u.changes.changesname, policy_queue=u.policy_queue.queue_name, target_suite=u.target_suite.suite_name, create_time=to_timestamp(u.changes.created), ) for u in uploads ], )
[docs] @override def GetUpload( self, request: policyqueue_pb2.GetUploadRequest, context: grpc.ServicerContext ) -> policyqueue_pb2.Upload: require_scope(context, "policyqueue:read") logger.debug( "get upload: policy_queue=%s name=%s", request.policy_queue, request.name, ) with self._conn.session() as session: upload = self._upload(session, request.policy_queue, request.name) if upload is None: context.abort(grpc.StatusCode.NOT_FOUND, "not found") handler = PolicyQueueUploadHandler(upload, session) missing_overrides = handler.missing_overrides() return policyqueue_pb2.Upload( name=upload.changes.changesname, policy_queue=upload.policy_queue.queue_name, target_suite=upload.target_suite.suite_name, missing_overrides=[ policyqueue_pb2.Override( package=o["package"], component=o["component"], priority=o["priority"], section=o["section"], type=o["type"], ) for o in missing_overrides ], create_time=to_timestamp(upload.changes.created), )
[docs] @override def AddOverrides( self, request: policyqueue_pb2.AddOverridesRequest, context: grpc.ServicerContext, ) -> empty_pb2.Empty: require_any_scope( context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"] ) logger.info( "add overrides: policy_queue=%s name=%s count=%d", request.policy_queue, request.name, len(request.overrides), ) with self._conn.session() as session: upload = self._upload( session, request.policy_queue, request.name, for_update=True ) if not upload: context.abort(grpc.StatusCode.NOT_FOUND, "not found") handler = PolicyQueueUploadHandler(upload, session) missing_overrides = { (o["component"], o["type"], o["package"]) for o in handler.missing_overrides() } unneeded_overrides = [ o for o in request.overrides if (o.component, o.type, o.package) not in missing_overrides ] if unneeded_overrides: packages = ", ".join( f"{o.type}:{o.package}" for o in unneeded_overrides ) context.abort( grpc.StatusCode.FAILED_PRECONDITION, f"no new override needed for {packages}", ) handler.add_overrides( [ { "component": o.component, "type": o.type, "package": o.package, "priority": o.priority, "section": o.section, } for o in request.overrides ], upload.target_suite, ) return empty_pb2.Empty()
[docs] @override def AcceptUpload( self, request: policyqueue_pb2.AcceptUploadRequest, context: grpc.ServicerContext, ) -> empty_pb2.Empty: require_any_scope( context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"] ) logger.info( "accept upload: policy_queue=%s name=%s", request.policy_queue, request.name, ) with self._conn.session() as session: upload = self._upload( session, request.policy_queue, request.name, for_update=True ) if not upload: context.abort(grpc.StatusCode.NOT_FOUND, "not found") handler = PolicyQueueUploadHandler(upload, session) if missing_overrides := handler.missing_overrides(): packages = ", ".join( f"{o['type']}:{o['package']}" for o in missing_overrides ) context.abort( grpc.StatusCode.FAILED_PRECONDITION, f"missing overrides for {packages}", ) if (action := handler.get_action()) and action != "ACCEPT": context.abort( grpc.StatusCode.FAILED_PRECONDITION, f"upload is already processed with action {action}", ) handler.accept() return empty_pb2.Empty()
[docs] @override def RejectUpload( self, request: policyqueue_pb2.RejectUploadRequest, context: grpc.ServicerContext, ) -> empty_pb2.Empty: require_any_scope( context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"] ) logger.info( "reject upload: policy_queue=%s name=%s rejected_by=%s", request.policy_queue, request.name, request.rejected_by, ) with self._conn.session() as session: upload = self._upload( session, request.policy_queue, request.name, for_update=True ) if not upload: context.abort(grpc.StatusCode.NOT_FOUND, "not found") handler = PolicyQueueUploadHandler(upload, session) if (action := handler.get_action()) and action != "REJECT": context.abort( grpc.StatusCode.FAILED_PRECONDITION, f"upload is already processed with action {action}", ) handler.reject(request.reason, rejected_by=request.rejected_by) return empty_pb2.Empty()