Coverage for daklib/rpc_log.py: 97%
49 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1# SPDX-License-Identifier: GPL-2.0-or-later
2# © 2026, Ansgar 🙀 <ansgar@debian.org>
4"""
5Logging middleware for the DAK RPC server.
6"""
8import contextvars
9import logging
10import time
11import uuid
12from collections.abc import Callable
13from typing import Any, override
15import grpc
17from daklib.rpc_auth import current_auth
19logger = logging.getLogger("dak.rpc")
21current_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
22 "current_request_id", default=None
23)
26class RequestContextFilter(logging.Filter):
27 """Logging filter that injects request_id and auth_sub from context vars."""
29 @override
30 def filter(self, record: logging.LogRecord) -> bool:
31 record.request_id = current_request_id.get() or "-" # type: ignore[attr-defined]
32 auth = current_auth.get()
33 record.auth_sub = auth.sub if auth is not None else "-" # type: ignore[attr-defined]
34 return True
37class LoggingInterceptor(grpc.ServerInterceptor):
38 """gRPC interceptor for request logging and error sanitization."""
40 @override
41 def intercept_service(
42 self,
43 continuation: "Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[Any, Any] | None]",
44 handler_call_details: grpc.HandlerCallDetails,
45 ) -> "grpc.RpcMethodHandler[Any, Any] | None":
46 request_id = uuid.uuid4().hex
47 current_request_id.set(request_id)
49 method = handler_call_details.method
50 logger.info("request started: method=%s", method)
52 handler = continuation(handler_call_details)
54 if handler is None:
55 logger.warning(
56 "request finished: method=%s status=UNIMPLEMENTED (no handler)", method
57 )
58 return None
60 if handler.unary_unary is not None: 60 ↛ 110line 60 didn't jump to line 110 because the condition on line 60 was always true
61 original_fn = handler.unary_unary
63 def logging_wrapper(request: Any, context: grpc.ServicerContext) -> Any:
64 start = time.monotonic()
65 try:
66 response = original_fn(request, context)
67 elapsed = time.monotonic() - start
68 code = context.code() # type: ignore[attr-defined]
69 status = code.name if code is not None else "OK"
70 logger.info(
71 "request finished: method=%s status=%s duration=%.3fs",
72 method,
73 status,
74 elapsed,
75 )
76 return response
77 except Exception:
78 elapsed = time.monotonic() - start
79 code = context.code() # type: ignore[attr-defined]
80 if code is not None:
81 # Intentional abort (NOT_FOUND, PERMISSION_DENIED, etc.)
82 # context.abort() sets state.code then raises bare Exception().
83 # Re-raise so the framework sends the intended error.
84 logger.info(
85 "request finished: method=%s status=%s duration=%.3fs",
86 method,
87 code.name,
88 elapsed,
89 )
90 raise
91 else:
92 # Unhandled exception (bug). Log full traceback server-side,
93 # then replace with sanitized INTERNAL error.
94 logger.exception(
95 "unhandled exception: method=%s duration=%.3fs",
96 method,
97 elapsed,
98 )
99 context.abort(
100 grpc.StatusCode.INTERNAL,
101 f"internal error (request {request_id})",
102 )
104 return grpc.unary_unary_rpc_method_handler(
105 logging_wrapper,
106 request_deserializer=handler.request_deserializer,
107 response_serializer=handler.response_serializer,
108 )
110 return handler