Coverage for dak/acl.py: 52%

121 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1#! /usr/bin/env python3 

2# 

3# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

4# Copyright (C) 2023 Emilio Pozuelo Monfort <pochu@debian.org> 

5# 

6# This program is free software; you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation; either version 2 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License along 

17# with this program; if not, write to the Free Software Foundation, Inc., 

18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

19 

20import os 

21import sys 

22from collections.abc import Iterable, Sequence 

23from typing import NoReturn 

24 

25from sqlalchemy import delete, insert, sql 

26 

27from daklib.dbconn import ( 

28 ACL, 

29 ACLPerSource, 

30 ACLPerSuite, 

31 DBConn, 

32 Fingerprint, 

33 Keyring, 

34 Suite, 

35 Uid, 

36) 

37 

38 

39def usage(status: int = 0) -> NoReturn: 

40 print( 

41 """Usage: 

42 dak acl set-fingerprints <acl-name> 

43 dak acl export-per-source <acl-name> 

44 dak acl allow <acl-name> <fingerprint> <source>... 

45 dak acl deny <acl-name> <fingerprint> <source>... 

46 dak acl export-per-suite <acl-name> 

47 dak acl allow-suite <acl-name> <fingerprint> <suite>... 

48 dak acl deny-suite <acl-name> <fingerprint> <suite>... 

49 

50 set-fingerprints: 

51 Reads list of fingerprints from stdin and sets the ACL <acl-name> to these. 

52 Accepted input formats are "uid:<uid>", "name:<name>" and 

53 "fpr:<fingerprint>". 

54 

55 export-per-source: 

56 Export per source upload rights for ACL <acl-name>. 

57 

58 allow, deny: 

59 Grant (revoke) per-source upload rights for ACL <acl-name>. 

60 

61 export-per-suite: 

62 Export per suite upload rights for ACL <acl-name>. 

63 

64 allow-suite, deny-suite: 

65 Grant (revoke) per-suite upload rights for ACL <acl-name>. 

66""" 

67 ) 

68 sys.exit(status) 

69 

70 

71def get_fingerprint(entry: str, session) -> Sequence[Fingerprint]: 

72 """get fingerprint for given ACL entry 

73 

74 The entry is a string in one of these formats:: 

75 

76 uid:<uid> 

77 name:<name> 

78 fpr:<fingerprint> 

79 keyring:<keyring-name> 

80 

81 :param entry: ACL entry 

82 :param session: database session 

83 :return: fingerprint for the entry 

84 """ 

85 field, value = entry.split(":", 1) 

86 q = ( 

87 session.query(Fingerprint) 

88 .join(Fingerprint.keyring) 

89 .filter(Keyring.active == True) # noqa:E712 

90 ) 

91 

92 if field == "uid": 

93 q = q.join(Fingerprint.uid).filter(Uid.uid == value) 

94 elif field == "name": 

95 q = q.join(Fingerprint.uid).filter(Uid.name == value) 

96 elif field == "fpr": 

97 q = q.filter(Fingerprint.fingerprint == value) 

98 elif field == "keyring": 

99 q = q.filter(Keyring.keyring_name == value) 

100 else: 

101 raise Exception('Unknown selector "{0}".'.format(field)) 

102 

103 return q.all() 

104 

105 

106def acl_set_fingerprints(acl_name: str, entries: Iterable[str]) -> None: 

107 session = DBConn().session() 

108 acl = session.query(ACL).filter_by(name=acl_name).one() 

109 

110 acl.fingerprints.clear() 

111 for entry in entries: 

112 entry = entry.strip() 

113 if entry.startswith("#") or len(entry) == 0: 

114 continue 

115 

116 fps = get_fingerprint(entry, session) 

117 if len(fps) == 0: 

118 print("Unknown key for '{0}'".format(entry)) 

119 else: 

120 acl.fingerprints.update(fps) 

121 

122 session.commit() 

123 

124 

125def acl_export_per_source(acl_name: str) -> None: 

126 session = DBConn().session() 

127 acl = session.query(ACL).filter_by(name=acl_name).one() 

128 

129 query = r""" 

130 SELECT 

131 f.fingerprint, 

132 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>' 

133 FROM uid u 

134 JOIN fingerprint f2 ON u.id = f2.uid 

135 WHERE f2.id = f.id) AS name, 

136 STRING_AGG( 

137 a.source 

138 || COALESCE(' (' || (SELECT fingerprint FROM fingerprint WHERE id = a.created_by_id) || ')', ''), 

139 E',\n ' ORDER BY a.source) 

140 FROM acl_per_source a 

141 JOIN fingerprint f ON a.fingerprint_id = f.id 

142 LEFT JOIN uid u ON f.uid = u.id 

143 WHERE a.acl_id = :acl_id 

144 GROUP BY f.id, f.fingerprint 

145 ORDER BY name 

146 """ 

147 

148 for row in session.execute(sql.text(query), {"acl_id": acl.id}): 

149 print("Fingerprint:", row[0]) 

150 print("Uid:", row[1]) 

151 print("Allow:", row[2]) 

152 print() 

153 

154 session.rollback() 

155 session.close() 

156 

157 

158def acl_export_per_suite(acl_name: str) -> None: 

159 session = DBConn().session() 

160 acl = session.query(ACL).filter_by(name=acl_name).one() 

161 

162 query = r""" 

163 SELECT 

164 f.fingerprint, 

165 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>' 

166 FROM uid u 

167 JOIN fingerprint f2 ON u.id = f2.uid 

168 WHERE f2.id = f.id) AS name, 

169 s.suite_name 

170 FROM acl_per_suite a 

171 JOIN fingerprint f ON a.fingerprint_id = f.id 

172 JOIN suite s ON a.suite_id = s.id 

173 LEFT JOIN uid u ON f.uid = u.id 

174 WHERE a.acl_id = :acl_id 

175 GROUP BY f.id, f.fingerprint, s.suite_name 

176 ORDER BY name 

177 """ 

178 

179 for row in session.execute(sql.text(query), {"acl_id": acl.id}): 

180 print("Fingerprint:", row[0]) 

181 print("Uid:", row[1]) 

182 print("Allow:", row[2]) 

183 print() 

184 

185 session.rollback() 

186 session.close() 

187 

188 

189def acl_allow(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None: 

190 session = DBConn().session() 

191 

192 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

193 fingerprint_id = ( 

194 session.query(Fingerprint) 

195 .filter_by(fingerprint=fingerprint) 

196 .one() 

197 .fingerprint_id 

198 ) 

199 

200 # TODO: check that fpr is in ACL 

201 

202 data = [ 

203 { 

204 "acl_id": acl_id, 

205 "fingerprint_id": fingerprint_id, 

206 "source": source, 

207 "reason": "set by {} via CLI".format(os.environ.get("USER", "(unknown)")), 

208 } 

209 for source in sources 

210 ] 

211 

212 session.execute(insert(ACLPerSource), data) 

213 

214 session.commit() 

215 

216 

217def acl_allow_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None: 

218 session = DBConn().session() 

219 

220 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

221 fingerprint_id = ( 

222 session.query(Fingerprint) 

223 .filter_by(fingerprint=fingerprint) 

224 .one() 

225 .fingerprint_id 

226 ) 

227 

228 # TODO: check that fpr is in ACL 

229 

230 data = [] 

231 

232 for suite in suites: 

233 try: 

234 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id 

235 except: 

236 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id 

237 

238 data.append( 

239 { 

240 "acl_id": acl_id, 

241 "fingerprint_id": fingerprint_id, 

242 "suite_id": suite_id, 

243 "reason": "set by {} via CLI".format( 

244 os.environ.get("USER", "(unknown)") 

245 ), 

246 } 

247 ) 

248 

249 session.execute(insert(ACLPerSuite), data) 

250 

251 session.commit() 

252 

253 

254def acl_deny(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None: 

255 session = DBConn().session() 

256 

257 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

258 fingerprint_id = ( 

259 session.query(Fingerprint) 

260 .filter_by(fingerprint=fingerprint) 

261 .one() 

262 .fingerprint_id 

263 ) 

264 

265 # TODO: check that fpr is in ACL 

266 

267 for source in sources: 

268 result = session.execute( 

269 delete(ACLPerSource) 

270 .where(ACLPerSource.acl_id == acl_id) 

271 .where(ACLPerSource.fingerprint_id == fingerprint_id) 

272 .where(ACLPerSource.source == source) 

273 ) 

274 if result.rowcount < 1: 

275 print( 

276 "W: Tried to deny uploads of '{}', but was not allowed before.".format( 

277 source 

278 ) 

279 ) 

280 

281 session.commit() 

282 

283 

284def acl_deny_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None: 

285 session = DBConn().session() 

286 

287 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

288 fingerprint_id = ( 

289 session.query(Fingerprint) 

290 .filter_by(fingerprint=fingerprint) 

291 .one() 

292 .fingerprint_id 

293 ) 

294 

295 # TODO: check that fpr is in ACL 

296 

297 for suite in suites: 

298 try: 

299 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id 

300 except: 

301 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id 

302 

303 result = session.execute( 

304 delete(ACLPerSuite) 

305 .where(ACLPerSuite.acl_id == acl_id) 

306 .where(ACLPerSuite.fingerprint_id == fingerprint_id) 

307 .where(ACLPerSuite.suite_id == suite_id) 

308 ) 

309 if result.rowcount < 1: 

310 print( 

311 "W: Tried to deny uploads for suite '{}', but was not allowed before.".format( 

312 suite 

313 ) 

314 ) 

315 

316 session.commit() 

317 

318 

319def main(argv=None): 

320 if argv is None: 320 ↛ 323line 320 didn't jump to line 323 because the condition on line 320 was always true

321 argv = sys.argv 

322 

323 if len(argv) > 1 and argv[1] in ("-h", "--help"): 

324 usage(0) 

325 

326 if len(argv) < 3: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 usage(1) 

328 

329 if argv[1] == "set-fingerprints": 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 acl_set_fingerprints(argv[2], sys.stdin) 

331 elif argv[1] == "export-per-source": 

332 acl_export_per_source(argv[2]) 

333 elif argv[1] == "export-per-suite": 

334 acl_export_per_suite(argv[2]) 

335 elif argv[1] == "allow": 

336 acl_allow(argv[2], argv[3], argv[4:]) 

337 elif argv[1] == "deny": 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 acl_deny(argv[2], argv[3], argv[4:]) 

339 elif argv[1] == "allow-suite": 339 ↛ 341line 339 didn't jump to line 341 because the condition on line 339 was always true

340 acl_allow_suite(argv[2], argv[3], argv[4:]) 

341 elif argv[1] == "deny-suite": 

342 acl_deny_suite(argv[2], argv[3], argv[4:]) 

343 else: 

344 usage(1)