Source code for daklib.rpc_log
# SPDX-License-Identifier: GPL-2.0-or-later
# © 2026, Ansgar 🙀 <ansgar@debian.org>
"""
Logging middleware for the DAK RPC server.
"""
import contextvars
import logging
import time
import uuid
from collections.abc import Callable
from typing import Any, override
import grpc
from daklib.rpc_auth import current_auth
logger = logging.getLogger("dak.rpc")
current_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"current_request_id", default=None
)
[docs]
class RequestContextFilter(logging.Filter):
"""Logging filter that injects request_id and auth_sub from context vars."""
[docs]
@override
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = current_request_id.get() or "-" # type: ignore[attr-defined]
auth = current_auth.get()
record.auth_sub = auth.sub if auth is not None else "-" # type: ignore[attr-defined]
return True
[docs]
class LoggingInterceptor(grpc.ServerInterceptor):
"""gRPC interceptor for request logging and error sanitization."""
[docs]
@override
def intercept_service(
self,
continuation: "Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[Any, Any] | None]",
handler_call_details: grpc.HandlerCallDetails,
) -> "grpc.RpcMethodHandler[Any, Any] | None":
request_id = uuid.uuid4().hex
current_request_id.set(request_id)
method = handler_call_details.method
logger.info("request started: method=%s", method)
handler = continuation(handler_call_details)
if handler is None:
logger.warning(
"request finished: method=%s status=UNIMPLEMENTED (no handler)", method
)
return None
if handler.unary_unary is not None:
original_fn = handler.unary_unary
def logging_wrapper(request: Any, context: grpc.ServicerContext) -> Any:
start = time.monotonic()
try:
response = original_fn(request, context)
elapsed = time.monotonic() - start
code = context.code() # type: ignore[attr-defined]
status = code.name if code is not None else "OK"
logger.info(
"request finished: method=%s status=%s duration=%.3fs",
method,
status,
elapsed,
)
return response
except Exception:
elapsed = time.monotonic() - start
code = context.code() # type: ignore[attr-defined]
if code is not None:
# Intentional abort (NOT_FOUND, PERMISSION_DENIED, etc.)
# context.abort() sets state.code then raises bare Exception().
# Re-raise so the framework sends the intended error.
logger.info(
"request finished: method=%s status=%s duration=%.3fs",
method,
code.name,
elapsed,
)
raise
else:
# Unhandled exception (bug). Log full traceback server-side,
# then replace with sanitized INTERNAL error.
logger.exception(
"unhandled exception: method=%s duration=%.3fs",
method,
elapsed,
)
context.abort(
grpc.StatusCode.INTERNAL,
f"internal error (request {request_id})",
)
return grpc.unary_unary_rpc_method_handler(
logging_wrapper,
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler