1#! /usr/bin/env python3 

2# 

3# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 

4# Copyright (C) 2023 Emilio Pozuelo Monfort <pochu@debian.org> 

5# 

6# This program is free software; you can redistribute it and/or modify 

7# it under the terms of the GNU General Public License as published by 

8# the Free Software Foundation; either version 2 of the License, or 

9# (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, 

12# but WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14# GNU General Public License for more details. 

15# 

16# You should have received a copy of the GNU General Public License along 

17# with this program; if not, write to the Free Software Foundation, Inc., 

18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

19 

20import os 

21import sys 

22from collections.abc import Iterable 

23from typing import NoReturn, Optional 

24 

25from daklib.dbconn import DBConn, Fingerprint, Keyring, Suite, Uid, ACL 

26 

27 

28def usage(status: int = 0) -> NoReturn: 

29 print("""Usage: 

30 dak acl set-fingerprints <acl-name> 

31 dak acl export-per-source <acl-name> 

32 dak acl allow <acl-name> <fingerprint> <source>... 

33 dak acl deny <acl-name> <fingerprint> <source>... 

34 dak acl export-per-suite <acl-name> 

35 dak acl allow-suite <acl-name> <fingerprint> <suite>... 

36 dak acl deny-suite <acl-name> <fingerprint> <suite>... 

37 

38 set-fingerprints: 

39 Reads list of fingerprints from stdin and sets the ACL <acl-name> to these. 

40 Accepted input formats are "uid:<uid>", "name:<name>" and 

41 "fpr:<fingerprint>". 

42 

43 export-per-source: 

44 Export per source upload rights for ACL <acl-name>. 

45 

46 allow, deny: 

47 Grant (revoke) per-source upload rights for ACL <acl-name>. 

48 

49 export-per-suite: 

50 Export per suite upload rights for ACL <acl-name>. 

51 

52 allow-suite, deny-suite: 

53 Grant (revoke) per-suite upload rights for ACL <acl-name>. 

54""") 

55 sys.exit(status) 

56 

57 

58def get_fingerprint(entry: str, session) -> Optional[Fingerprint]: 

59 """get fingerprint for given ACL entry 

60 

61 The entry is a string in one of these formats:: 

62 

63 uid:<uid> 

64 name:<name> 

65 fpr:<fingerprint> 

66 keyring:<keyring-name> 

67 

68 :param entry: ACL entry 

69 :param session: database session 

70 :return: fingerprint for the entry 

71 """ 

72 field, value = entry.split(":", 1) 

73 q = session.query(Fingerprint).join(Fingerprint.keyring).filter(Keyring.active == True) # noqa:E712 

74 

75 if field == 'uid': 

76 q = q.join(Fingerprint.uid).filter(Uid.uid == value) 

77 elif field == 'name': 

78 q = q.join(Fingerprint.uid).filter(Uid.name == value) 

79 elif field == 'fpr': 

80 q = q.filter(Fingerprint.fingerprint == value) 

81 elif field == 'keyring': 

82 q = q.filter(Keyring.keyring_name == value) 

83 else: 

84 raise Exception('Unknown selector "{0}".'.format(field)) 

85 

86 return q.all() 

87 

88 

89def acl_set_fingerprints(acl_name: str, entries: Iterable[str]) -> None: 

90 session = DBConn().session() 

91 acl = session.query(ACL).filter_by(name=acl_name).one() 

92 

93 acl.fingerprints.clear() 

94 for entry in entries: 

95 entry = entry.strip() 

96 if entry.startswith('#') or len(entry) == 0: 

97 continue 

98 

99 fps = get_fingerprint(entry, session) 

100 if len(fps) == 0: 

101 print("Unknown key for '{0}'".format(entry)) 

102 else: 

103 acl.fingerprints.update(fps) 

104 

105 session.commit() 

106 

107 

108def acl_export_per_source(acl_name: str) -> None: 

109 session = DBConn().session() 

110 acl = session.query(ACL).filter_by(name=acl_name).one() 

111 

112 query = r""" 

113 SELECT 

114 f.fingerprint, 

115 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>' 

116 FROM uid u 

117 JOIN fingerprint f2 ON u.id = f2.uid 

118 WHERE f2.id = f.id) AS name, 

119 STRING_AGG( 

120 a.source 

121 || COALESCE(' (' || (SELECT fingerprint FROM fingerprint WHERE id = a.created_by_id) || ')', ''), 

122 E',\n ' ORDER BY a.source) 

123 FROM acl_per_source a 

124 JOIN fingerprint f ON a.fingerprint_id = f.id 

125 LEFT JOIN uid u ON f.uid = u.id 

126 WHERE a.acl_id = :acl_id 

127 GROUP BY f.id, f.fingerprint 

128 ORDER BY name 

129 """ 

130 

131 for row in session.execute(query, {'acl_id': acl.id}): 

132 print("Fingerprint:", row[0]) 

133 print("Uid:", row[1]) 

134 print("Allow:", row[2]) 

135 print() 

136 

137 session.rollback() 

138 session.close() 

139 

140 

141def acl_export_per_suite(acl_name: str) -> None: 

142 session = DBConn().session() 

143 acl = session.query(ACL).filter_by(name=acl_name).one() 

144 

145 query = r""" 

146 SELECT 

147 f.fingerprint, 

148 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>' 

149 FROM uid u 

150 JOIN fingerprint f2 ON u.id = f2.uid 

151 WHERE f2.id = f.id) AS name, 

152 s.suite_name 

153 FROM acl_per_suite a 

154 JOIN fingerprint f ON a.fingerprint_id = f.id 

155 JOIN suite s ON a.suite_id = s.id 

156 LEFT JOIN uid u ON f.uid = u.id 

157 WHERE a.acl_id = :acl_id 

158 GROUP BY f.id, f.fingerprint, s.suite_name 

159 ORDER BY name 

160 """ 

161 

162 for row in session.execute(query, {'acl_id': acl.id}): 

163 print("Fingerprint:", row[0]) 

164 print("Uid:", row[1]) 

165 print("Allow:", row[2]) 

166 print() 

167 

168 session.rollback() 

169 session.close() 

170 

171 

172def acl_allow(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None: 

173 tbl = DBConn().tbl_acl_per_source 

174 

175 session = DBConn().session() 

176 

177 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

178 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id 

179 

180 # TODO: check that fpr is in ACL 

181 

182 data = [ 

183 { 

184 'acl_id': acl_id, 

185 'fingerprint_id': fingerprint_id, 

186 'source': source, 

187 'reason': 'set by {} via CLI'.format(os.environ.get('USER', '(unknown)')), 

188 } 

189 for source in sources 

190 ] 

191 

192 session.execute(tbl.insert(), data) 

193 

194 session.commit() 

195 

196 

197def acl_allow_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None: 

198 tbl = DBConn().tbl_acl_per_suite 

199 

200 session = DBConn().session() 

201 

202 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

203 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id 

204 

205 # TODO: check that fpr is in ACL 

206 

207 data = [] 

208 

209 for suite in suites: 

210 try: 

211 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id 

212 except: 

213 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id 

214 

215 data.append({ 

216 'acl_id': acl_id, 

217 'fingerprint_id': fingerprint_id, 

218 'suite_id': suite_id, 

219 'reason': 'set by {} via CLI'.format(os.environ.get('USER', '(unknown)')), 

220 }) 

221 

222 session.execute(tbl.insert(), data) 

223 

224 session.commit() 

225 

226 

227def acl_deny(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None: 

228 tbl = DBConn().tbl_acl_per_source 

229 

230 session = DBConn().session() 

231 

232 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

233 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id 

234 

235 # TODO: check that fpr is in ACL 

236 

237 for source in sources: 

238 result = session.execute(tbl.delete().where(tbl.c.acl_id == acl_id).where(tbl.c.fingerprint_id == fingerprint_id).where(tbl.c.source == source)) 

239 if result.rowcount < 1: 

240 print("W: Tried to deny uploads of '{}', but was not allowed before.".format(source)) 

241 

242 session.commit() 

243 

244 

245def acl_deny_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None: 

246 tbl = DBConn().tbl_acl_per_suite 

247 

248 session = DBConn().session() 

249 

250 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 

251 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id 

252 

253 # TODO: check that fpr is in ACL 

254 

255 for suite in suites: 

256 try: 

257 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id 

258 except: 

259 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id 

260 

261 result = session.execute(tbl.delete().where(tbl.c.acl_id == acl_id).where(tbl.c.fingerprint_id == fingerprint_id).where(tbl.c.suite_id == suite_id)) 

262 if result.rowcount < 1: 

263 print("W: Tried to deny uploads for suite '{}', but was not allowed before.".format(suite)) 

264 

265 session.commit() 

266 

267 

268def main(argv=None): 

269 if argv is None: 269 ↛ 272line 269 didn't jump to line 272, because the condition on line 269 was never false

270 argv = sys.argv 

271 

272 if len(argv) > 1 and argv[1] in ('-h', '--help'): 

273 usage(0) 

274 

275 if len(argv) < 3: 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true

276 usage(1) 

277 

278 if argv[1] == 'set-fingerprints': 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true

279 acl_set_fingerprints(argv[2], sys.stdin) 

280 elif argv[1] == 'export-per-source': 

281 acl_export_per_source(argv[2]) 

282 elif argv[1] == 'export-per-suite': 

283 acl_export_per_suite(argv[2]) 

284 elif argv[1] == 'allow': 

285 acl_allow(argv[2], argv[3], argv[4:]) 

286 elif argv[1] == 'deny': 286 ↛ 287line 286 didn't jump to line 287, because the condition on line 286 was never true

287 acl_deny(argv[2], argv[3], argv[4:]) 

288 elif argv[1] == 'allow-suite': 288 ↛ 290line 288 didn't jump to line 290, because the condition on line 288 was never false

289 acl_allow_suite(argv[2], argv[3], argv[4:]) 

290 elif argv[1] == 'deny-suite': 

291 acl_deny_suite(argv[2], argv[3], argv[4:]) 

292 else: 

293 usage(1)