1#! /usr/bin/env python3
2#
3# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
4# Copyright (C) 2023 Emilio Pozuelo Monfort <pochu@debian.org>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License along
17# with this program; if not, write to the Free Software Foundation, Inc.,
18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20import os
21import sys
22from collections.abc import Iterable
23from typing import NoReturn, Optional
25from daklib.dbconn import ACL, DBConn, Fingerprint, Keyring, Suite, Uid
28def usage(status: int = 0) -> NoReturn:
29 print(
30 """Usage:
31 dak acl set-fingerprints <acl-name>
32 dak acl export-per-source <acl-name>
33 dak acl allow <acl-name> <fingerprint> <source>...
34 dak acl deny <acl-name> <fingerprint> <source>...
35 dak acl export-per-suite <acl-name>
36 dak acl allow-suite <acl-name> <fingerprint> <suite>...
37 dak acl deny-suite <acl-name> <fingerprint> <suite>...
39 set-fingerprints:
40 Reads list of fingerprints from stdin and sets the ACL <acl-name> to these.
41 Accepted input formats are "uid:<uid>", "name:<name>" and
42 "fpr:<fingerprint>".
44 export-per-source:
45 Export per source upload rights for ACL <acl-name>.
47 allow, deny:
48 Grant (revoke) per-source upload rights for ACL <acl-name>.
50 export-per-suite:
51 Export per suite upload rights for ACL <acl-name>.
53 allow-suite, deny-suite:
54 Grant (revoke) per-suite upload rights for ACL <acl-name>.
55"""
56 )
57 sys.exit(status)
60def get_fingerprint(entry: str, session) -> Optional[Fingerprint]:
61 """get fingerprint for given ACL entry
63 The entry is a string in one of these formats::
65 uid:<uid>
66 name:<name>
67 fpr:<fingerprint>
68 keyring:<keyring-name>
70 :param entry: ACL entry
71 :param session: database session
72 :return: fingerprint for the entry
73 """
74 field, value = entry.split(":", 1)
75 q = (
76 session.query(Fingerprint)
77 .join(Fingerprint.keyring)
78 .filter(Keyring.active == True) # noqa:E712
79 )
81 if field == "uid":
82 q = q.join(Fingerprint.uid).filter(Uid.uid == value)
83 elif field == "name":
84 q = q.join(Fingerprint.uid).filter(Uid.name == value)
85 elif field == "fpr":
86 q = q.filter(Fingerprint.fingerprint == value)
87 elif field == "keyring":
88 q = q.filter(Keyring.keyring_name == value)
89 else:
90 raise Exception('Unknown selector "{0}".'.format(field))
92 return q.all()
95def acl_set_fingerprints(acl_name: str, entries: Iterable[str]) -> None:
96 session = DBConn().session()
97 acl = session.query(ACL).filter_by(name=acl_name).one()
99 acl.fingerprints.clear()
100 for entry in entries:
101 entry = entry.strip()
102 if entry.startswith("#") or len(entry) == 0:
103 continue
105 fps = get_fingerprint(entry, session)
106 if len(fps) == 0:
107 print("Unknown key for '{0}'".format(entry))
108 else:
109 acl.fingerprints.update(fps)
111 session.commit()
114def acl_export_per_source(acl_name: str) -> None:
115 session = DBConn().session()
116 acl = session.query(ACL).filter_by(name=acl_name).one()
118 query = r"""
119 SELECT
120 f.fingerprint,
121 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>'
122 FROM uid u
123 JOIN fingerprint f2 ON u.id = f2.uid
124 WHERE f2.id = f.id) AS name,
125 STRING_AGG(
126 a.source
127 || COALESCE(' (' || (SELECT fingerprint FROM fingerprint WHERE id = a.created_by_id) || ')', ''),
128 E',\n ' ORDER BY a.source)
129 FROM acl_per_source a
130 JOIN fingerprint f ON a.fingerprint_id = f.id
131 LEFT JOIN uid u ON f.uid = u.id
132 WHERE a.acl_id = :acl_id
133 GROUP BY f.id, f.fingerprint
134 ORDER BY name
135 """
137 for row in session.execute(query, {"acl_id": acl.id}):
138 print("Fingerprint:", row[0])
139 print("Uid:", row[1])
140 print("Allow:", row[2])
141 print()
143 session.rollback()
144 session.close()
147def acl_export_per_suite(acl_name: str) -> None:
148 session = DBConn().session()
149 acl = session.query(ACL).filter_by(name=acl_name).one()
151 query = r"""
152 SELECT
153 f.fingerprint,
154 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>'
155 FROM uid u
156 JOIN fingerprint f2 ON u.id = f2.uid
157 WHERE f2.id = f.id) AS name,
158 s.suite_name
159 FROM acl_per_suite a
160 JOIN fingerprint f ON a.fingerprint_id = f.id
161 JOIN suite s ON a.suite_id = s.id
162 LEFT JOIN uid u ON f.uid = u.id
163 WHERE a.acl_id = :acl_id
164 GROUP BY f.id, f.fingerprint, s.suite_name
165 ORDER BY name
166 """
168 for row in session.execute(query, {"acl_id": acl.id}):
169 print("Fingerprint:", row[0])
170 print("Uid:", row[1])
171 print("Allow:", row[2])
172 print()
174 session.rollback()
175 session.close()
178def acl_allow(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None:
179 tbl = DBConn().tbl_acl_per_source
181 session = DBConn().session()
183 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
184 fingerprint_id = (
185 session.query(Fingerprint)
186 .filter_by(fingerprint=fingerprint)
187 .one()
188 .fingerprint_id
189 )
191 # TODO: check that fpr is in ACL
193 data = [
194 {
195 "acl_id": acl_id,
196 "fingerprint_id": fingerprint_id,
197 "source": source,
198 "reason": "set by {} via CLI".format(os.environ.get("USER", "(unknown)")),
199 }
200 for source in sources
201 ]
203 session.execute(tbl.insert(), data)
205 session.commit()
208def acl_allow_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None:
209 tbl = DBConn().tbl_acl_per_suite
211 session = DBConn().session()
213 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
214 fingerprint_id = (
215 session.query(Fingerprint)
216 .filter_by(fingerprint=fingerprint)
217 .one()
218 .fingerprint_id
219 )
221 # TODO: check that fpr is in ACL
223 data = []
225 for suite in suites:
226 try:
227 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id
228 except:
229 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id
231 data.append(
232 {
233 "acl_id": acl_id,
234 "fingerprint_id": fingerprint_id,
235 "suite_id": suite_id,
236 "reason": "set by {} via CLI".format(
237 os.environ.get("USER", "(unknown)")
238 ),
239 }
240 )
242 session.execute(tbl.insert(), data)
244 session.commit()
247def acl_deny(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None:
248 tbl = DBConn().tbl_acl_per_source
250 session = DBConn().session()
252 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
253 fingerprint_id = (
254 session.query(Fingerprint)
255 .filter_by(fingerprint=fingerprint)
256 .one()
257 .fingerprint_id
258 )
260 # TODO: check that fpr is in ACL
262 for source in sources:
263 result = session.execute(
264 tbl.delete()
265 .where(tbl.c.acl_id == acl_id)
266 .where(tbl.c.fingerprint_id == fingerprint_id)
267 .where(tbl.c.source == source)
268 )
269 if result.rowcount < 1:
270 print(
271 "W: Tried to deny uploads of '{}', but was not allowed before.".format(
272 source
273 )
274 )
276 session.commit()
279def acl_deny_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None:
280 tbl = DBConn().tbl_acl_per_suite
282 session = DBConn().session()
284 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
285 fingerprint_id = (
286 session.query(Fingerprint)
287 .filter_by(fingerprint=fingerprint)
288 .one()
289 .fingerprint_id
290 )
292 # TODO: check that fpr is in ACL
294 for suite in suites:
295 try:
296 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id
297 except:
298 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id
300 result = session.execute(
301 tbl.delete()
302 .where(tbl.c.acl_id == acl_id)
303 .where(tbl.c.fingerprint_id == fingerprint_id)
304 .where(tbl.c.suite_id == suite_id)
305 )
306 if result.rowcount < 1:
307 print(
308 "W: Tried to deny uploads for suite '{}', but was not allowed before.".format(
309 suite
310 )
311 )
313 session.commit()
316def main(argv=None):
317 if argv is None: 317 ↛ 320line 317 didn't jump to line 320, because the condition on line 317 was never false
318 argv = sys.argv
320 if len(argv) > 1 and argv[1] in ("-h", "--help"):
321 usage(0)
323 if len(argv) < 3: 323 ↛ 324line 323 didn't jump to line 324, because the condition on line 323 was never true
324 usage(1)
326 if argv[1] == "set-fingerprints": 326 ↛ 327line 326 didn't jump to line 327, because the condition on line 326 was never true
327 acl_set_fingerprints(argv[2], sys.stdin)
328 elif argv[1] == "export-per-source":
329 acl_export_per_source(argv[2])
330 elif argv[1] == "export-per-suite":
331 acl_export_per_suite(argv[2])
332 elif argv[1] == "allow":
333 acl_allow(argv[2], argv[3], argv[4:])
334 elif argv[1] == "deny": 334 ↛ 335line 334 didn't jump to line 335, because the condition on line 334 was never true
335 acl_deny(argv[2], argv[3], argv[4:])
336 elif argv[1] == "allow-suite": 336 ↛ 338line 336 didn't jump to line 338, because the condition on line 336 was never false
337 acl_allow_suite(argv[2], argv[3], argv[4:])
338 elif argv[1] == "deny-suite":
339 acl_deny_suite(argv[2], argv[3], argv[4:])
340 else:
341 usage(1)