# SPDX-License-Identifier: GPL-2.0-or-later
# © 2026, Ansgar 🙀 <ansgar@debian.org>
"""
Authentication and authorization for the DAK RPC server.
"""
import contextvars
import datetime
import hashlib
import hmac
import tomllib
from collections.abc import Callable, Collection
from dataclasses import dataclass
from typing import Any, BinaryIO, NoReturn, override
import grpc
current_auth: contextvars.ContextVar["AuthResult | None"] = contextvars.ContextVar(
"current_auth", default=None
)
[docs]
@dataclass(frozen=True)
class AuthResult:
sub: str
scopes: frozenset[str]
[docs]
def has_scope(self, scope: str) -> bool:
return scope in self.scopes
[docs]
def has_any_scope(self, scopes: Collection[str]) -> bool:
return not self.scopes.isdisjoint(scopes)
[docs]
def require_auth(context: grpc.ServicerContext) -> AuthResult:
"""Get the current authentication or abort with UNAUTHENTICATED."""
auth = current_auth.get()
if auth is None:
context.abort(grpc.StatusCode.UNAUTHENTICATED, "authentication required")
return auth
[docs]
def require_scope(context: grpc.ServicerContext, scope: str) -> AuthResult:
"""Get the current authentication and verify it has the given scope,
or abort with PERMISSION_DENIED."""
auth = require_auth(context)
if not auth.has_scope(scope):
context.abort(grpc.StatusCode.PERMISSION_DENIED, f"missing scope: {scope}")
return auth
[docs]
def require_any_scope(
context: grpc.ServicerContext, scopes: Collection[str]
) -> AuthResult:
"""Get the current authentication and verify it has at least one of the
given scopes, or abort with PERMISSION_DENIED."""
auth = require_auth(context)
if not auth.has_any_scope(scopes):
context.abort(grpc.StatusCode.PERMISSION_DENIED, "missing scope")
return auth
[docs]
@dataclass(frozen=True)
class Token:
hash: str
sub: str
scopes: frozenset[str]
active: bool
exp: datetime.date | None
# TODO[py3.14]: use io.Reader[bytes]
# def load_tokens(stream: io.Reader[bytes]) -> dict[str, Token]:
[docs]
def load_tokens(stream: BinaryIO) -> dict[str, Token]:
config = tomllib.load(stream)
tokens: dict[str, Token] = {}
for selector, token in config.get("tokens", {}).items():
tokens[selector] = Token(
hash=token["hash"],
sub=token["sub"],
scopes=frozenset(token.get("scopes", [])),
active=token.get("active", True),
exp=token.get("exp"),
)
return tokens
[docs]
def load_tokens_from_file(token_file: str) -> dict[str, Token]:
with open(token_file, "rb") as f:
return load_tokens(f)
[docs]
class TokenAuth:
def __init__(self, tokens: dict[str, Token]) -> None:
self._tokens = tokens
[docs]
def authenticate(self, token: str) -> AuthResult | None:
parts = token.split("_", 2)
if len(parts) != 3 or parts[0] != "dak-rpc":
return None
selector, secret = parts[1], parts[2]
record = self._tokens.get(selector)
if record is None:
return None
if not record.active:
return None
# exp is the last valid day (token is rejected the day after exp)
if record.exp is not None and record.exp < datetime.date.today():
return None
secret_hash = hashlib.sha256(secret.encode()).hexdigest()
if hmac.compare_digest(secret_hash, record.hash):
return AuthResult(sub=record.sub, scopes=record.scopes)
return None
[docs]
def _abort_unauthenticated(request: object, context: grpc.ServicerContext) -> NoReturn:
context.abort(grpc.StatusCode.UNAUTHENTICATED, "unauthenticated")
_abort_unauthenticated_handler: "grpc.RpcMethodHandler[Any, Any]" = (
grpc.unary_unary_rpc_method_handler(_abort_unauthenticated)
)
[docs]
class AuthenticationInterceptor(grpc.ServerInterceptor):
"""gRPC interceptor for authentication"""
def __init__(self, token_auth: TokenAuth) -> None:
self._token_auth = token_auth
[docs]
@override
def intercept_service[_Request, _Response](
self,
continuation: "Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[_Request, _Response] | None]",
handler_call_details: grpc.HandlerCallDetails,
) -> "grpc.RpcMethodHandler[_Request, _Response] | None":
# Require "Bearer <token>"
metadata = dict(handler_call_details.invocation_metadata)
auth_header = metadata.get("authorization", "")
if not isinstance(auth_header, str) or not auth_header.startswith("Bearer "):
auth = None
else:
auth = self._token_auth.authenticate(auth_header.removeprefix("Bearer "))
if auth is None:
return _abort_unauthenticated_handler
current_auth.set(auth)
return continuation(handler_call_details)