Coverage for daklib/policy_rpc.py: 29%

85 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-05-10 21:38 +0000

1# SPDX-License-Identifier: GPL-2.0-or-later 

2# © 2026, Ansgar 🙀 <ansgar@debian.org> 

3 

4import logging 

5from typing import override 

6 

7import grpc 

8from google.protobuf import empty_pb2 

9from sqlalchemy import select 

10from sqlalchemy.orm import Session, joinedload 

11 

12from dak.policyqueue.v1 import policyqueue_pb2, policyqueue_pb2_grpc 

13from daklib.dbconn import DBChange, DBConn, PolicyQueue, PolicyQueueUpload 

14from daklib.policy import PolicyQueueUploadHandler 

15from daklib.rpc import to_timestamp 

16from daklib.rpc_auth import require_any_scope, require_scope 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class PolicyQueueServiceServicer(policyqueue_pb2_grpc.PolicyQueueServiceServicer): 

22 def __init__(self, conn: DBConn) -> None: 

23 self._conn = conn 

24 

25 _query = select(PolicyQueueUpload).options( 

26 joinedload(PolicyQueueUpload.changes), 

27 joinedload(PolicyQueueUpload.policy_queue), 

28 joinedload(PolicyQueueUpload.target_suite), 

29 ) 

30 

31 def _upload( 

32 self, session: Session, policy_queue: str, name: str, *, for_update=False 

33 ) -> PolicyQueueUpload | None: 

34 query = ( 

35 self._query.join(PolicyQueueUpload.changes) 

36 .join(PolicyQueueUpload.policy_queue) 

37 .where(PolicyQueue.queue_name == policy_queue, DBChange.changesname == name) 

38 ) 

39 if for_update: 

40 query = query.with_for_update(of=PolicyQueueUpload) 

41 return session.execute(query).scalar_one_or_none() 

42 

43 @override 

44 def ListUploads( 

45 self, request: policyqueue_pb2.ListUploadsRequest, context: grpc.ServicerContext 

46 ) -> policyqueue_pb2.ListUploadsResponse: 

47 require_scope(context, "policyqueue:read") 

48 logger.debug("list uploads: policy_queue=%s", request.policy_queue) 

49 with self._conn.session() as session: 

50 query = self._query.join(PolicyQueueUpload.policy_queue).where( 

51 PolicyQueue.queue_name == request.policy_queue 

52 ) 

53 uploads = session.execute(query).scalars() 

54 return policyqueue_pb2.ListUploadsResponse( 

55 uploads=[ 

56 policyqueue_pb2.Upload( 

57 name=u.changes.changesname, 

58 policy_queue=u.policy_queue.queue_name, 

59 target_suite=u.target_suite.suite_name, 

60 create_time=to_timestamp(u.changes.created), 

61 ) 

62 for u in uploads 

63 ], 

64 ) 

65 

66 @override 

67 def GetUpload( 

68 self, request: policyqueue_pb2.GetUploadRequest, context: grpc.ServicerContext 

69 ) -> policyqueue_pb2.Upload: 

70 require_scope(context, "policyqueue:read") 

71 logger.debug( 

72 "get upload: policy_queue=%s name=%s", 

73 request.policy_queue, 

74 request.name, 

75 ) 

76 with self._conn.session() as session: 

77 upload = self._upload(session, request.policy_queue, request.name) 

78 if upload is None: 

79 context.abort(grpc.StatusCode.NOT_FOUND, "not found") 

80 

81 handler = PolicyQueueUploadHandler(upload, session) 

82 missing_overrides = handler.missing_overrides() 

83 

84 return policyqueue_pb2.Upload( 

85 name=upload.changes.changesname, 

86 policy_queue=upload.policy_queue.queue_name, 

87 target_suite=upload.target_suite.suite_name, 

88 missing_overrides=[ 

89 policyqueue_pb2.Override( 

90 package=o["package"], 

91 component=o["component"], 

92 priority=o["priority"], 

93 section=o["section"], 

94 type=o["type"], 

95 ) 

96 for o in missing_overrides 

97 ], 

98 create_time=to_timestamp(upload.changes.created), 

99 ) 

100 

101 @override 

102 def AddOverrides( 

103 self, 

104 request: policyqueue_pb2.AddOverridesRequest, 

105 context: grpc.ServicerContext, 

106 ) -> empty_pb2.Empty: 

107 require_any_scope( 

108 context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"] 

109 ) 

110 logger.info( 

111 "add overrides: policy_queue=%s name=%s count=%d", 

112 request.policy_queue, 

113 request.name, 

114 len(request.overrides), 

115 ) 

116 

117 with self._conn.session() as session: 

118 upload = self._upload( 

119 session, request.policy_queue, request.name, for_update=True 

120 ) 

121 if not upload: 

122 context.abort(grpc.StatusCode.NOT_FOUND, "not found") 

123 

124 handler = PolicyQueueUploadHandler(upload, session) 

125 missing_overrides = { 

126 (o["component"], o["type"], o["package"]) 

127 for o in handler.missing_overrides() 

128 } 

129 unneeded_overrides = [ 

130 o 

131 for o in request.overrides 

132 if (o.component, o.type, o.package) not in missing_overrides 

133 ] 

134 if unneeded_overrides: 

135 packages = ", ".join( 

136 f"{o.type}:{o.package}" for o in unneeded_overrides 

137 ) 

138 context.abort( 

139 grpc.StatusCode.FAILED_PRECONDITION, 

140 f"no new override needed for {packages}", 

141 ) 

142 

143 handler.add_overrides( 

144 [ 

145 { 

146 "component": o.component, 

147 "type": o.type, 

148 "package": o.package, 

149 "priority": o.priority, 

150 "section": o.section, 

151 } 

152 for o in request.overrides 

153 ], 

154 upload.target_suite, 

155 ) 

156 

157 return empty_pb2.Empty() 

158 

159 @override 

160 def AcceptUpload( 

161 self, 

162 request: policyqueue_pb2.AcceptUploadRequest, 

163 context: grpc.ServicerContext, 

164 ) -> empty_pb2.Empty: 

165 require_any_scope( 

166 context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"] 

167 ) 

168 logger.info( 

169 "accept upload: policy_queue=%s name=%s", 

170 request.policy_queue, 

171 request.name, 

172 ) 

173 with self._conn.session() as session: 

174 upload = self._upload( 

175 session, request.policy_queue, request.name, for_update=True 

176 ) 

177 if not upload: 

178 context.abort(grpc.StatusCode.NOT_FOUND, "not found") 

179 

180 handler = PolicyQueueUploadHandler(upload, session) 

181 if missing_overrides := handler.missing_overrides(): 

182 packages = ", ".join( 

183 f"{o['type']}:{o['package']}" for o in missing_overrides 

184 ) 

185 context.abort( 

186 grpc.StatusCode.FAILED_PRECONDITION, 

187 f"missing overrides for {packages}", 

188 ) 

189 if (action := handler.get_action()) and action != "ACCEPT": 

190 context.abort( 

191 grpc.StatusCode.FAILED_PRECONDITION, 

192 f"upload is already processed with action {action}", 

193 ) 

194 

195 handler.accept() 

196 

197 return empty_pb2.Empty() 

198 

199 @override 

200 def RejectUpload( 

201 self, 

202 request: policyqueue_pb2.RejectUploadRequest, 

203 context: grpc.ServicerContext, 

204 ) -> empty_pb2.Empty: 

205 require_any_scope( 

206 context, ["policyqueue:write", f"policyqueue:write:{request.policy_queue}"] 

207 ) 

208 logger.info( 

209 "reject upload: policy_queue=%s name=%s rejected_by=%s", 

210 request.policy_queue, 

211 request.name, 

212 request.rejected_by, 

213 ) 

214 with self._conn.session() as session: 

215 upload = self._upload( 

216 session, request.policy_queue, request.name, for_update=True 

217 ) 

218 if not upload: 

219 context.abort(grpc.StatusCode.NOT_FOUND, "not found") 

220 

221 handler = PolicyQueueUploadHandler(upload, session) 

222 if (action := handler.get_action()) and action != "REJECT": 

223 context.abort( 

224 grpc.StatusCode.FAILED_PRECONDITION, 

225 f"upload is already processed with action {action}", 

226 ) 

227 

228 handler.reject(request.reason, rejected_by=request.rejected_by) 

229 

230 return empty_pb2.Empty()