Source code for daklib.rpc_log

# SPDX-License-Identifier: GPL-2.0-or-later
# © 2026, Ansgar 🙀 <ansgar@debian.org>

"""
Logging middleware for the DAK RPC server.
"""

import contextvars
import logging
import time
import uuid
from collections.abc import Callable
from typing import Any, override

import grpc

from daklib.rpc_auth import current_auth

logger = logging.getLogger("dak.rpc")

current_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
    "current_request_id", default=None
)


[docs] class RequestContextFilter(logging.Filter): """Logging filter that injects request_id and auth_sub from context vars."""
[docs] @override def filter(self, record: logging.LogRecord) -> bool: record.request_id = current_request_id.get() or "-" # type: ignore[attr-defined] auth = current_auth.get() record.auth_sub = auth.sub if auth is not None else "-" # type: ignore[attr-defined] return True
[docs] class LoggingInterceptor(grpc.ServerInterceptor): """gRPC interceptor for request logging and error sanitization."""
[docs] @override def intercept_service( self, continuation: "Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[Any, Any] | None]", handler_call_details: grpc.HandlerCallDetails, ) -> "grpc.RpcMethodHandler[Any, Any] | None": request_id = uuid.uuid4().hex current_request_id.set(request_id) method = handler_call_details.method logger.info("request started: method=%s", method) handler = continuation(handler_call_details) if handler is None: logger.warning( "request finished: method=%s status=UNIMPLEMENTED (no handler)", method ) return None if handler.unary_unary is not None: original_fn = handler.unary_unary def logging_wrapper(request: Any, context: grpc.ServicerContext) -> Any: start = time.monotonic() try: response = original_fn(request, context) elapsed = time.monotonic() - start code = context.code() # type: ignore[attr-defined] status = code.name if code is not None else "OK" logger.info( "request finished: method=%s status=%s duration=%.3fs", method, status, elapsed, ) return response except Exception: elapsed = time.monotonic() - start code = context.code() # type: ignore[attr-defined] if code is not None: # Intentional abort (NOT_FOUND, PERMISSION_DENIED, etc.) # context.abort() sets state.code then raises bare Exception(). # Re-raise so the framework sends the intended error. logger.info( "request finished: method=%s status=%s duration=%.3fs", method, code.name, elapsed, ) raise else: # Unhandled exception (bug). Log full traceback server-side, # then replace with sanitized INTERNAL error. logger.exception( "unhandled exception: method=%s duration=%.3fs", method, elapsed, ) context.abort( grpc.StatusCode.INTERNAL, f"internal error (request {request_id})", ) return grpc.unary_unary_rpc_method_handler( logging_wrapper, request_deserializer=handler.request_deserializer, response_serializer=handler.response_serializer, ) return handler