Coverage for daklib/rpc_auth.py: 100%
85 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-05-10 21:38 +0000
1# SPDX-License-Identifier: GPL-2.0-or-later
2# © 2026, Ansgar 🙀 <ansgar@debian.org>
4"""
5Authentication and authorization for the DAK RPC server.
6"""
8import contextvars
9import datetime
10import hashlib
11import hmac
12import tomllib
13from collections.abc import Callable, Collection
14from dataclasses import dataclass
15from typing import Any, BinaryIO, NoReturn, override
17import grpc
19current_auth: contextvars.ContextVar["AuthResult | None"] = contextvars.ContextVar(
20 "current_auth", default=None
21)
24@dataclass(frozen=True)
25class AuthResult:
26 sub: str
27 scopes: frozenset[str]
29 def has_scope(self, scope: str) -> bool:
30 return scope in self.scopes
32 def has_any_scope(self, scopes: Collection[str]) -> bool:
33 return not self.scopes.isdisjoint(scopes)
36def require_auth(context: grpc.ServicerContext) -> AuthResult:
37 """Get the current authentication or abort with UNAUTHENTICATED."""
39 auth = current_auth.get()
40 if auth is None:
41 context.abort(grpc.StatusCode.UNAUTHENTICATED, "authentication required")
42 return auth
45def require_scope(context: grpc.ServicerContext, scope: str) -> AuthResult:
46 """Get the current authentication and verify it has the given scope,
47 or abort with PERMISSION_DENIED."""
49 auth = require_auth(context)
50 if not auth.has_scope(scope):
51 context.abort(grpc.StatusCode.PERMISSION_DENIED, f"missing scope: {scope}")
52 return auth
55def require_any_scope(
56 context: grpc.ServicerContext, scopes: Collection[str]
57) -> AuthResult:
58 """Get the current authentication and verify it has at least one of the
59 given scopes, or abort with PERMISSION_DENIED."""
61 auth = require_auth(context)
62 if not auth.has_any_scope(scopes):
63 context.abort(grpc.StatusCode.PERMISSION_DENIED, "missing scope")
64 return auth
67@dataclass(frozen=True)
68class Token:
69 hash: str
70 sub: str
71 scopes: frozenset[str]
72 active: bool
73 exp: datetime.date | None
76# TODO[py3.14]: use io.Reader[bytes]
77# def load_tokens(stream: io.Reader[bytes]) -> dict[str, Token]:
78def load_tokens(stream: BinaryIO) -> dict[str, Token]:
79 config = tomllib.load(stream)
81 tokens: dict[str, Token] = {}
82 for selector, token in config.get("tokens", {}).items():
83 tokens[selector] = Token(
84 hash=token["hash"],
85 sub=token["sub"],
86 scopes=frozenset(token.get("scopes", [])),
87 active=token.get("active", True),
88 exp=token.get("exp"),
89 )
90 return tokens
93def load_tokens_from_file(token_file: str) -> dict[str, Token]:
94 with open(token_file, "rb") as f:
95 return load_tokens(f)
98class TokenAuth:
99 def __init__(self, tokens: dict[str, Token]) -> None:
100 self._tokens = tokens
102 def authenticate(self, token: str) -> AuthResult | None:
103 parts = token.split("_", 2)
104 if len(parts) != 3 or parts[0] != "dak-rpc":
105 return None
106 selector, secret = parts[1], parts[2]
107 record = self._tokens.get(selector)
108 if record is None:
109 return None
110 if not record.active:
111 return None
112 # exp is the last valid day (token is rejected the day after exp)
113 if record.exp is not None and record.exp < datetime.date.today():
114 return None
115 secret_hash = hashlib.sha256(secret.encode()).hexdigest()
116 if hmac.compare_digest(secret_hash, record.hash):
117 return AuthResult(sub=record.sub, scopes=record.scopes)
118 return None
121def _abort_unauthenticated(request: object, context: grpc.ServicerContext) -> NoReturn:
122 context.abort(grpc.StatusCode.UNAUTHENTICATED, "unauthenticated")
125_abort_unauthenticated_handler: "grpc.RpcMethodHandler[Any, Any]" = (
126 grpc.unary_unary_rpc_method_handler(_abort_unauthenticated)
127)
130class AuthenticationInterceptor(grpc.ServerInterceptor):
131 """gRPC interceptor for authentication"""
133 def __init__(self, token_auth: TokenAuth) -> None:
134 self._token_auth = token_auth
136 @override
137 def intercept_service[_Request, _Response](
138 self,
139 continuation: "Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[_Request, _Response] | None]",
140 handler_call_details: grpc.HandlerCallDetails,
141 ) -> "grpc.RpcMethodHandler[_Request, _Response] | None":
142 # Require "Bearer <token>"
143 metadata = dict(handler_call_details.invocation_metadata)
144 auth_header = metadata.get("authorization", "")
145 if not isinstance(auth_header, str) or not auth_header.startswith("Bearer "):
146 auth = None
147 else:
148 auth = self._token_auth.authenticate(auth_header.removeprefix("Bearer "))
150 if auth is None:
151 return _abort_unauthenticated_handler
153 current_auth.set(auth)
154 return continuation(handler_call_details)