Coverage for daklib/rpc_auth.py: 100%

85 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1# SPDX-License-Identifier: GPL-2.0-or-later 

2# © 2026, Ansgar 🙀 <ansgar@debian.org> 

3 

4""" 

5Authentication and authorization for the DAK RPC server. 

6""" 

7 

8import contextvars 

9import datetime 

10import hashlib 

11import hmac 

12import tomllib 

13from collections.abc import Callable, Collection 

14from dataclasses import dataclass 

15from typing import Any, BinaryIO, NoReturn, override 

16 

17import grpc 

18 

19current_auth: contextvars.ContextVar["AuthResult | None"] = contextvars.ContextVar( 

20 "current_auth", default=None 

21) 

22 

23 

24@dataclass(frozen=True) 

25class AuthResult: 

26 sub: str 

27 scopes: frozenset[str] 

28 

29 def has_scope(self, scope: str) -> bool: 

30 return scope in self.scopes 

31 

32 def has_any_scope(self, scopes: Collection[str]) -> bool: 

33 return not self.scopes.isdisjoint(scopes) 

34 

35 

36def require_auth(context: grpc.ServicerContext) -> AuthResult: 

37 """Get the current authentication or abort with UNAUTHENTICATED.""" 

38 

39 auth = current_auth.get() 

40 if auth is None: 

41 context.abort(grpc.StatusCode.UNAUTHENTICATED, "authentication required") 

42 return auth 

43 

44 

45def require_scope(context: grpc.ServicerContext, scope: str) -> AuthResult: 

46 """Get the current authentication and verify it has the given scope, 

47 or abort with PERMISSION_DENIED.""" 

48 

49 auth = require_auth(context) 

50 if not auth.has_scope(scope): 

51 context.abort(grpc.StatusCode.PERMISSION_DENIED, f"missing scope: {scope}") 

52 return auth 

53 

54 

55def require_any_scope( 

56 context: grpc.ServicerContext, scopes: Collection[str] 

57) -> AuthResult: 

58 """Get the current authentication and verify it has at least one of the 

59 given scopes, or abort with PERMISSION_DENIED.""" 

60 

61 auth = require_auth(context) 

62 if not auth.has_any_scope(scopes): 

63 context.abort(grpc.StatusCode.PERMISSION_DENIED, "missing scope") 

64 return auth 

65 

66 

67@dataclass(frozen=True) 

68class Token: 

69 hash: str 

70 sub: str 

71 scopes: frozenset[str] 

72 active: bool 

73 exp: datetime.date | None 

74 

75 

76# TODO[py3.14]: use io.Reader[bytes] 

77# def load_tokens(stream: io.Reader[bytes]) -> dict[str, Token]: 

78def load_tokens(stream: BinaryIO) -> dict[str, Token]: 

79 config = tomllib.load(stream) 

80 

81 tokens: dict[str, Token] = {} 

82 for selector, token in config.get("tokens", {}).items(): 

83 tokens[selector] = Token( 

84 hash=token["hash"], 

85 sub=token["sub"], 

86 scopes=frozenset(token.get("scopes", [])), 

87 active=token.get("active", True), 

88 exp=token.get("exp"), 

89 ) 

90 return tokens 

91 

92 

93def load_tokens_from_file(token_file: str) -> dict[str, Token]: 

94 with open(token_file, "rb") as f: 

95 return load_tokens(f) 

96 

97 

98class TokenAuth: 

99 def __init__(self, tokens: dict[str, Token]) -> None: 

100 self._tokens = tokens 

101 

102 def authenticate(self, token: str) -> AuthResult | None: 

103 parts = token.split("_", 2) 

104 if len(parts) != 3 or parts[0] != "dak-rpc": 

105 return None 

106 selector, secret = parts[1], parts[2] 

107 record = self._tokens.get(selector) 

108 if record is None: 

109 return None 

110 if not record.active: 

111 return None 

112 # exp is the last valid day (token is rejected the day after exp) 

113 if record.exp is not None and record.exp < datetime.date.today(): 

114 return None 

115 secret_hash = hashlib.sha256(secret.encode()).hexdigest() 

116 if hmac.compare_digest(secret_hash, record.hash): 

117 return AuthResult(sub=record.sub, scopes=record.scopes) 

118 return None 

119 

120 

121def _abort_unauthenticated(request: object, context: grpc.ServicerContext) -> NoReturn: 

122 context.abort(grpc.StatusCode.UNAUTHENTICATED, "unauthenticated") 

123 

124 

125_abort_unauthenticated_handler: "grpc.RpcMethodHandler[Any, Any]" = ( 

126 grpc.unary_unary_rpc_method_handler(_abort_unauthenticated) 

127) 

128 

129 

130class AuthenticationInterceptor(grpc.ServerInterceptor): 

131 """gRPC interceptor for authentication""" 

132 

133 def __init__(self, token_auth: TokenAuth) -> None: 

134 self._token_auth = token_auth 

135 

136 @override 

137 def intercept_service[_Request, _Response]( 

138 self, 

139 continuation: "Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler[_Request, _Response] | None]", 

140 handler_call_details: grpc.HandlerCallDetails, 

141 ) -> "grpc.RpcMethodHandler[_Request, _Response] | None": 

142 # Require "Bearer <token>" 

143 metadata = dict(handler_call_details.invocation_metadata) 

144 auth_header = metadata.get("authorization", "") 

145 if not isinstance(auth_header, str) or not auth_header.startswith("Bearer "): 

146 auth = None 

147 else: 

148 auth = self._token_auth.authenticate(auth_header.removeprefix("Bearer ")) 

149 

150 if auth is None: 

151 return _abort_unauthenticated_handler 

152 

153 current_auth.set(auth) 

154 return continuation(handler_call_details)