Coverage for dak/rpc_server.py: 40%

37 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1# SPDX-License-Identifier: GPL-2.0-or-later 

2# © 2026, Ansgar 🙀 <ansgar@debian.org> 

3 

4""" 

5RPC server for DAK 

6""" 

7 

8import logging 

9import sys 

10from concurrent import futures 

11 

12import apt_pkg 

13import grpc 

14 

15import daklib.daklog as daklog 

16import daklib.policy_rpc 

17from dak.policyqueue.v1 import policyqueue_pb2_grpc 

18from daklib.config import Config 

19from daklib.daklog import DakLogHandler 

20from daklib.dbconn import DBConn 

21from daklib.rpc_auth import AuthenticationInterceptor, TokenAuth, load_tokens_from_file 

22from daklib.rpc_log import LoggingInterceptor, RequestContextFilter 

23 

24 

25def init_server( 

26 *, interceptors: list[grpc.ServerInterceptor] | None = None 

27) -> grpc.Server: 

28 server = grpc.server( 

29 thread_pool=futures.ThreadPoolExecutor(max_workers=10), 

30 interceptors=interceptors or [], 

31 ) 

32 policyqueue_pb2_grpc.add_PolicyQueueServiceServicer_to_server( 

33 daklib.policy_rpc.PolicyQueueServiceServicer(conn=DBConn()), server 

34 ) 

35 return server 

36 

37 

38def main() -> None: 

39 cnf = Config() 

40 

41 apt_pkg.parse_commandline( # type: ignore[attr-defined] 

42 cnf.Cnf, 

43 [ 

44 ("o", "option", "", "ArbItem"), 

45 ], 

46 sys.argv, 

47 ) 

48 

49 daklog.Logger("rpc-server") 

50 

51 context_filter = RequestContextFilter() 

52 daklog_formatter = logging.Formatter( 

53 "[%(request_id)s] [%(auth_sub)s] %(name)s: %(message)s" 

54 ) 

55 for h in logging.getLogger().handlers: 

56 h.addFilter(context_filter) 

57 if isinstance(h, DakLogHandler): 

58 h.setFormatter(daklog_formatter) 

59 

60 if "RPC::Authorization::TokenFile" not in cnf: 

61 raise Exception("RPC::Authorization::TokenFile is required.") 

62 

63 interceptors: list[grpc.ServerInterceptor] = [ 

64 LoggingInterceptor(), 

65 AuthenticationInterceptor( 

66 TokenAuth(load_tokens_from_file(cnf["RPC::Authorization::TokenFile"])) 

67 ), 

68 ] 

69 

70 server = init_server(interceptors=interceptors) 

71 listen_addr = cnf.get("RPC::ListenAddress") 

72 if not listen_addr: 

73 raise Exception("RPC::ListenAddress is required.") 

74 server.add_insecure_port(listen_addr) 

75 server.start() 

76 server.wait_for_termination()