Coverage for dak/rpc_server.py: 40%
37 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1# SPDX-License-Identifier: GPL-2.0-or-later
2# © 2026, Ansgar 🙀 <ansgar@debian.org>
4"""
5RPC server for DAK
6"""
8import logging
9import sys
10from concurrent import futures
12import apt_pkg
13import grpc
15import daklib.daklog as daklog
16import daklib.policy_rpc
17from dak.policyqueue.v1 import policyqueue_pb2_grpc
18from daklib.config import Config
19from daklib.daklog import DakLogHandler
20from daklib.dbconn import DBConn
21from daklib.rpc_auth import AuthenticationInterceptor, TokenAuth, load_tokens_from_file
22from daklib.rpc_log import LoggingInterceptor, RequestContextFilter
25def init_server(
26 *, interceptors: list[grpc.ServerInterceptor] | None = None
27) -> grpc.Server:
28 server = grpc.server(
29 thread_pool=futures.ThreadPoolExecutor(max_workers=10),
30 interceptors=interceptors or [],
31 )
32 policyqueue_pb2_grpc.add_PolicyQueueServiceServicer_to_server(
33 daklib.policy_rpc.PolicyQueueServiceServicer(conn=DBConn()), server
34 )
35 return server
38def main() -> None:
39 cnf = Config()
41 apt_pkg.parse_commandline( # type: ignore[attr-defined]
42 cnf.Cnf,
43 [
44 ("o", "option", "", "ArbItem"),
45 ],
46 sys.argv,
47 )
49 daklog.Logger("rpc-server")
51 context_filter = RequestContextFilter()
52 daklog_formatter = logging.Formatter(
53 "[%(request_id)s] [%(auth_sub)s] %(name)s: %(message)s"
54 )
55 for h in logging.getLogger().handlers:
56 h.addFilter(context_filter)
57 if isinstance(h, DakLogHandler):
58 h.setFormatter(daklog_formatter)
60 if "RPC::Authorization::TokenFile" not in cnf:
61 raise Exception("RPC::Authorization::TokenFile is required.")
63 interceptors: list[grpc.ServerInterceptor] = [
64 LoggingInterceptor(),
65 AuthenticationInterceptor(
66 TokenAuth(load_tokens_from_file(cnf["RPC::Authorization::TokenFile"]))
67 ),
68 ]
70 server = init_server(interceptors=interceptors)
71 listen_addr = cnf.get("RPC::ListenAddress")
72 if not listen_addr:
73 raise Exception("RPC::ListenAddress is required.")
74 server.add_insecure_port(listen_addr)
75 server.start()
76 server.wait_for_termination()