1#! /usr/bin/env python3 

2# 

3# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

4# Copyright (C) 2023 Emilio Pozuelo Monfort <pochu@debian.org> 

5# 

6# This program is free software; you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation; either version 2 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License along 

17# with this program; if not, write to the Free Software Foundation, Inc., 

18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

19 

20import os 

21import sys 

22from collections.abc import Iterable 

23from typing import NoReturn, Optional 

24 

25from daklib.dbconn import ACL, DBConn, Fingerprint, Keyring, Suite, Uid 

26 

27 

28def usage(status: int = 0) -> NoReturn: 

29 print( 

30 """Usage: 

31 dak acl set-fingerprints <acl-name> 

32 dak acl export-per-source <acl-name> 

33 dak acl allow <acl-name> <fingerprint> <source>... 

34 dak acl deny <acl-name> <fingerprint> <source>... 

35 dak acl export-per-suite <acl-name> 

36 dak acl allow-suite <acl-name> <fingerprint> <suite>... 

37 dak acl deny-suite <acl-name> <fingerprint> <suite>... 

38 

39 set-fingerprints: 

40 Reads list of fingerprints from stdin and sets the ACL <acl-name> to these. 

41 Accepted input formats are "uid:<uid>", "name:<name>" and 

42 "fpr:<fingerprint>". 

43 

44 export-per-source: 

45 Export per source upload rights for ACL <acl-name>. 

46 

47 allow, deny: 

48 Grant (revoke) per-source upload rights for ACL <acl-name>. 

49 

50 export-per-suite: 

51 Export per suite upload rights for ACL <acl-name>. 

52 

53 allow-suite, deny-suite: 

54 Grant (revoke) per-suite upload rights for ACL <acl-name>. 

55""" 

56 ) 

57 sys.exit(status) 

58 

59 

60def get_fingerprint(entry: str, session) -> Optional[Fingerprint]: 

61 """get fingerprint for given ACL entry 

62 

63 The entry is a string in one of these formats:: 

64 

65 uid:<uid> 

66 name:<name> 

67 fpr:<fingerprint> 

68 keyring:<keyring-name> 

69 

70 :param entry: ACL entry 

71 :param session: database session 

72 :return: fingerprint for the entry 

73 """ 

74 field, value = entry.split(":", 1) 

75 q = ( 

76 session.query(Fingerprint) 

77 .join(Fingerprint.keyring) 

78 .filter(Keyring.active == True) # noqa:E712 

79 ) 

80 

81 if field == "uid": 

82 q = q.join(Fingerprint.uid).filter(Uid.uid == value) 

83 elif field == "name": 

84 q = q.join(Fingerprint.uid).filter(Uid.name == value) 

85 elif field == "fpr": 

86 q = q.filter(Fingerprint.fingerprint == value) 

87 elif field == "keyring": 

88 q = q.filter(Keyring.keyring_name == value) 

89 else: 

90 raise Exception('Unknown selector "{0}".'.format(field)) 

91 

92 return q.all() 

93 

94 

95def acl_set_fingerprints(acl_name: str, entries: Iterable[str]) -> None: 

96 session = DBConn().session() 

97 acl = session.query(ACL).filter_by(name=acl_name).one() 

98 

99 acl.fingerprints.clear() 

100 for entry in entries: 

101 entry = entry.strip() 

102 if entry.startswith("#") or len(entry) == 0: 

103 continue 

104 

105 fps = get_fingerprint(entry, session) 

106 if len(fps) == 0: 

107 print("Unknown key for '{0}'".format(entry)) 

108 else: 

109 acl.fingerprints.update(fps) 

110 

111 session.commit() 

112 

113 

114def acl_export_per_source(acl_name: str) -> None: 

115 session = DBConn().session() 

116 acl = session.query(ACL).filter_by(name=acl_name).one() 

117 

118 query = r""" 

119 SELECT 

120 f.fingerprint, 

121 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>' 

122 FROM uid u 

123 JOIN fingerprint f2 ON u.id = f2.uid 

124 WHERE f2.id = f.id) AS name, 

125 STRING_AGG( 

126 a.source 

127 || COALESCE(' (' || (SELECT fingerprint FROM fingerprint WHERE id = a.created_by_id) || ')', ''), 

128 E',\n ' ORDER BY a.source) 

129 FROM acl_per_source a 

130 JOIN fingerprint f ON a.fingerprint_id = f.id 

131 LEFT JOIN uid u ON f.uid = u.id 

132 WHERE a.acl_id = :acl_id 

133 GROUP BY f.id, f.fingerprint 

134 ORDER BY name 

135 """ 

136 

137 for row in session.execute(query, {"acl_id": acl.id}): 

138 print("Fingerprint:", row[0]) 

139 print("Uid:", row[1]) 

140 print("Allow:", row[2]) 

141 print() 

142 

143 session.rollback() 

144 session.close() 

145 

146 

147def acl_export_per_suite(acl_name: str) -> None: 

148 session = DBConn().session() 

149 acl = session.query(ACL).filter_by(name=acl_name).one() 

150 

151 query = r""" 

152 SELECT 

153 f.fingerprint, 

154 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>' 

155 FROM uid u 

156 JOIN fingerprint f2 ON u.id = f2.uid 

157 WHERE f2.id = f.id) AS name, 

158 s.suite_name 

159 FROM acl_per_suite a 

160 JOIN fingerprint f ON a.fingerprint_id = f.id 

161 JOIN suite s ON a.suite_id = s.id 

162 LEFT JOIN uid u ON f.uid = u.id 

163 WHERE a.acl_id = :acl_id 

164 GROUP BY f.id, f.fingerprint, s.suite_name 

165 ORDER BY name 

166 """ 

167 

168 for row in session.execute(query, {"acl_id": acl.id}): 

169 print("Fingerprint:", row[0]) 

170 print("Uid:", row[1]) 

171 print("Allow:", row[2]) 

172 print() 

173 

174 session.rollback() 

175 session.close() 

176 

177 

178def acl_allow(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None: 

179 tbl = DBConn().tbl_acl_per_source 

180 

181 session = DBConn().session() 

182 

183 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

184 fingerprint_id = ( 

185 session.query(Fingerprint) 

186 .filter_by(fingerprint=fingerprint) 

187 .one() 

188 .fingerprint_id 

189 ) 

190 

191 # TODO: check that fpr is in ACL 

192 

193 data = [ 

194 { 

195 "acl_id": acl_id, 

196 "fingerprint_id": fingerprint_id, 

197 "source": source, 

198 "reason": "set by {} via CLI".format(os.environ.get("USER", "(unknown)")), 

199 } 

200 for source in sources 

201 ] 

202 

203 session.execute(tbl.insert(), data) 

204 

205 session.commit() 

206 

207 

208def acl_allow_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None: 

209 tbl = DBConn().tbl_acl_per_suite 

210 

211 session = DBConn().session() 

212 

213 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

214 fingerprint_id = ( 

215 session.query(Fingerprint) 

216 .filter_by(fingerprint=fingerprint) 

217 .one() 

218 .fingerprint_id 

219 ) 

220 

221 # TODO: check that fpr is in ACL 

222 

223 data = [] 

224 

225 for suite in suites: 

226 try: 

227 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id 

228 except: 

229 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id 

230 

231 data.append( 

232 { 

233 "acl_id": acl_id, 

234 "fingerprint_id": fingerprint_id, 

235 "suite_id": suite_id, 

236 "reason": "set by {} via CLI".format( 

237 os.environ.get("USER", "(unknown)") 

238 ), 

239 } 

240 ) 

241 

242 session.execute(tbl.insert(), data) 

243 

244 session.commit() 

245 

246 

247def acl_deny(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None: 

248 tbl = DBConn().tbl_acl_per_source 

249 

250 session = DBConn().session() 

251 

252 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

253 fingerprint_id = ( 

254 session.query(Fingerprint) 

255 .filter_by(fingerprint=fingerprint) 

256 .one() 

257 .fingerprint_id 

258 ) 

259 

260 # TODO: check that fpr is in ACL 

261 

262 for source in sources: 

263 result = session.execute( 

264 tbl.delete() 

265 .where(tbl.c.acl_id == acl_id) 

266 .where(tbl.c.fingerprint_id == fingerprint_id) 

267 .where(tbl.c.source == source) 

268 ) 

269 if result.rowcount < 1: 

270 print( 

271 "W: Tried to deny uploads of '{}', but was not allowed before.".format( 

272 source 

273 ) 

274 ) 

275 

276 session.commit() 

277 

278 

279def acl_deny_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None: 

280 tbl = DBConn().tbl_acl_per_suite 

281 

282 session = DBConn().session() 

283 

284 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

285 fingerprint_id = ( 

286 session.query(Fingerprint) 

287 .filter_by(fingerprint=fingerprint) 

288 .one() 

289 .fingerprint_id 

290 ) 

291 

292 # TODO: check that fpr is in ACL 

293 

294 for suite in suites: 

295 try: 

296 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id 

297 except: 

298 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id 

299 

300 result = session.execute( 

301 tbl.delete() 

302 .where(tbl.c.acl_id == acl_id) 

303 .where(tbl.c.fingerprint_id == fingerprint_id) 

304 .where(tbl.c.suite_id == suite_id) 

305 ) 

306 if result.rowcount < 1: 

307 print( 

308 "W: Tried to deny uploads for suite '{}', but was not allowed before.".format( 

309 suite 

310 ) 

311 ) 

312 

313 session.commit() 

314 

315 

316def main(argv=None): 

317 if argv is None: 317 ↛ 320line 317 didn't jump to line 320, because the condition on line 317 was never false

318 argv = sys.argv 

319 

320 if len(argv) > 1 and argv[1] in ("-h", "--help"): 

321 usage(0) 

322 

323 if len(argv) < 3: 323 ↛ 324line 323 didn't jump to line 324, because the condition on line 323 was never true

324 usage(1) 

325 

326 if argv[1] == "set-fingerprints": 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true

327 acl_set_fingerprints(argv[2], sys.stdin) 

328 elif argv[1] == "export-per-source": 

329 acl_export_per_source(argv[2]) 

330 elif argv[1] == "export-per-suite": 

331 acl_export_per_suite(argv[2]) 

332 elif argv[1] == "allow": 

333 acl_allow(argv[2], argv[3], argv[4:]) 

334 elif argv[1] == "deny": 334 ↛ 335line 334 didn't jump to line 335, because the condition on line 334 was never true

335 acl_deny(argv[2], argv[3], argv[4:]) 

336 elif argv[1] == "allow-suite": 336 ↛ 338line 336 didn't jump to line 338, because the condition on line 336 was never false

337 acl_allow_suite(argv[2], argv[3], argv[4:]) 

338 elif argv[1] == "deny-suite": 

339 acl_deny_suite(argv[2], argv[3], argv[4:]) 

340 else: 

341 usage(1)