Coverage for daklib/rpc_log.py: 97%

49 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1# SPDX-License-Identifier: GPL-2.0-or-later 

2# © 2026, Ansgar 🙀 <ansgar@debian.org> 

3 

4""" 

5Logging middleware for the DAK RPC server. 

6""" 

7 

8import contextvars 

9import logging 

10import time 

11import uuid 

12from collections.abc import Callable 

13from typing import Any, override 

14 

15import grpc 

16 

17from daklib.rpc_auth import current_auth 

18 

19logger = logging.getLogger("dak.rpc") 

20 

21current_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( 

22 "current_request_id", default=None 

23) 

24 

25 

26class RequestContextFilter(logging.Filter): 

27 """Logging filter that injects request_id and auth_sub from context vars.""" 

28 

29 @override 

30 def filter(self, record: logging.LogRecord) -> bool: 

31 record.request_id = current_request_id.get() or "-" # type: ignore[attr-defined] 

32 auth = current_auth.get() 

33 record.auth_sub = auth.sub if auth is not None else "-" # type: ignore[attr-defined] 

34 return True 

35 

36 

37class LoggingInterceptor(grpc.ServerInterceptor): 

38 """gRPC interceptor for request logging and error sanitization.""" 

39 

40 @override 

41 def intercept_service( 

42 self, 

43 continuation: "Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[Any, Any] | None]", 

44 handler_call_details: grpc.HandlerCallDetails, 

45 ) -> "grpc.RpcMethodHandler[Any, Any] | None": 

46 request_id = uuid.uuid4().hex 

47 current_request_id.set(request_id) 

48 

49 method = handler_call_details.method 

50 logger.info("request started: method=%s", method) 

51 

52 handler = continuation(handler_call_details) 

53 

54 if handler is None: 

55 logger.warning( 

56 "request finished: method=%s status=UNIMPLEMENTED (no handler)", method 

57 ) 

58 return None 

59 

60 if handler.unary_unary is not None: 60 ↛ 110line 60 didn't jump to line 110 because the condition on line 60 was always true

61 original_fn = handler.unary_unary 

62 

63 def logging_wrapper(request: Any, context: grpc.ServicerContext) -> Any: 

64 start = time.monotonic() 

65 try: 

66 response = original_fn(request, context) 

67 elapsed = time.monotonic() - start 

68 code = context.code() # type: ignore[attr-defined] 

69 status = code.name if code is not None else "OK" 

70 logger.info( 

71 "request finished: method=%s status=%s duration=%.3fs", 

72 method, 

73 status, 

74 elapsed, 

75 ) 

76 return response 

77 except Exception: 

78 elapsed = time.monotonic() - start 

79 code = context.code() # type: ignore[attr-defined] 

80 if code is not None: 

81 # Intentional abort (NOT_FOUND, PERMISSION_DENIED, etc.) 

82 # context.abort() sets state.code then raises bare Exception(). 

83 # Re-raise so the framework sends the intended error. 

84 logger.info( 

85 "request finished: method=%s status=%s duration=%.3fs", 

86 method, 

87 code.name, 

88 elapsed, 

89 ) 

90 raise 

91 else: 

92 # Unhandled exception (bug). Log full traceback server-side, 

93 # then replace with sanitized INTERNAL error. 

94 logger.exception( 

95 "unhandled exception: method=%s duration=%.3fs", 

96 method, 

97 elapsed, 

98 ) 

99 context.abort( 

100 grpc.StatusCode.INTERNAL, 

101 f"internal error (request {request_id})", 

102 ) 

103 

104 return grpc.unary_unary_rpc_method_handler( 

105 logging_wrapper, 

106 request_deserializer=handler.request_deserializer, 

107 response_serializer=handler.response_serializer, 

108 ) 

109 

110 return handler