Coverage for dak/acl.py: 52%
121 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1#! /usr/bin/env python3
2#
3# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
4# Copyright (C) 2023 Emilio Pozuelo Monfort <pochu@debian.org>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License along
17# with this program; if not, write to the Free Software Foundation, Inc.,
18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20import os
21import sys
22from collections.abc import Iterable, Sequence
23from typing import NoReturn
25from sqlalchemy import delete, insert, sql
27from daklib.dbconn import (
28 ACL,
29 ACLPerSource,
30 ACLPerSuite,
31 DBConn,
32 Fingerprint,
33 Keyring,
34 Suite,
35 Uid,
36)
39def usage(status: int = 0) -> NoReturn:
40 print(
41 """Usage:
42 dak acl set-fingerprints <acl-name>
43 dak acl export-per-source <acl-name>
44 dak acl allow <acl-name> <fingerprint> <source>...
45 dak acl deny <acl-name> <fingerprint> <source>...
46 dak acl export-per-suite <acl-name>
47 dak acl allow-suite <acl-name> <fingerprint> <suite>...
48 dak acl deny-suite <acl-name> <fingerprint> <suite>...
50 set-fingerprints:
51 Reads list of fingerprints from stdin and sets the ACL <acl-name> to these.
52 Accepted input formats are "uid:<uid>", "name:<name>" and
53 "fpr:<fingerprint>".
55 export-per-source:
56 Export per source upload rights for ACL <acl-name>.
58 allow, deny:
59 Grant (revoke) per-source upload rights for ACL <acl-name>.
61 export-per-suite:
62 Export per suite upload rights for ACL <acl-name>.
64 allow-suite, deny-suite:
65 Grant (revoke) per-suite upload rights for ACL <acl-name>.
66"""
67 )
68 sys.exit(status)
71def get_fingerprint(entry: str, session) -> Sequence[Fingerprint]:
72 """get fingerprint for given ACL entry
74 The entry is a string in one of these formats::
76 uid:<uid>
77 name:<name>
78 fpr:<fingerprint>
79 keyring:<keyring-name>
81 :param entry: ACL entry
82 :param session: database session
83 :return: fingerprint for the entry
84 """
85 field, value = entry.split(":", 1)
86 q = (
87 session.query(Fingerprint)
88 .join(Fingerprint.keyring)
89 .filter(Keyring.active == True) # noqa:E712
90 )
92 if field == "uid":
93 q = q.join(Fingerprint.uid).filter(Uid.uid == value)
94 elif field == "name":
95 q = q.join(Fingerprint.uid).filter(Uid.name == value)
96 elif field == "fpr":
97 q = q.filter(Fingerprint.fingerprint == value)
98 elif field == "keyring":
99 q = q.filter(Keyring.keyring_name == value)
100 else:
101 raise Exception('Unknown selector "{0}".'.format(field))
103 return q.all()
106def acl_set_fingerprints(acl_name: str, entries: Iterable[str]) -> None:
107 session = DBConn().session()
108 acl = session.query(ACL).filter_by(name=acl_name).one()
110 acl.fingerprints.clear()
111 for entry in entries:
112 entry = entry.strip()
113 if entry.startswith("#") or len(entry) == 0:
114 continue
116 fps = get_fingerprint(entry, session)
117 if len(fps) == 0:
118 print("Unknown key for '{0}'".format(entry))
119 else:
120 acl.fingerprints.update(fps)
122 session.commit()
125def acl_export_per_source(acl_name: str) -> None:
126 session = DBConn().session()
127 acl = session.query(ACL).filter_by(name=acl_name).one()
129 query = r"""
130 SELECT
131 f.fingerprint,
132 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>'
133 FROM uid u
134 JOIN fingerprint f2 ON u.id = f2.uid
135 WHERE f2.id = f.id) AS name,
136 STRING_AGG(
137 a.source
138 || COALESCE(' (' || (SELECT fingerprint FROM fingerprint WHERE id = a.created_by_id) || ')', ''),
139 E',\n ' ORDER BY a.source)
140 FROM acl_per_source a
141 JOIN fingerprint f ON a.fingerprint_id = f.id
142 LEFT JOIN uid u ON f.uid = u.id
143 WHERE a.acl_id = :acl_id
144 GROUP BY f.id, f.fingerprint
145 ORDER BY name
146 """
148 for row in session.execute(sql.text(query), {"acl_id": acl.id}):
149 print("Fingerprint:", row[0])
150 print("Uid:", row[1])
151 print("Allow:", row[2])
152 print()
154 session.rollback()
155 session.close()
158def acl_export_per_suite(acl_name: str) -> None:
159 session = DBConn().session()
160 acl = session.query(ACL).filter_by(name=acl_name).one()
162 query = r"""
163 SELECT
164 f.fingerprint,
165 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>'
166 FROM uid u
167 JOIN fingerprint f2 ON u.id = f2.uid
168 WHERE f2.id = f.id) AS name,
169 s.suite_name
170 FROM acl_per_suite a
171 JOIN fingerprint f ON a.fingerprint_id = f.id
172 JOIN suite s ON a.suite_id = s.id
173 LEFT JOIN uid u ON f.uid = u.id
174 WHERE a.acl_id = :acl_id
175 GROUP BY f.id, f.fingerprint, s.suite_name
176 ORDER BY name
177 """
179 for row in session.execute(sql.text(query), {"acl_id": acl.id}):
180 print("Fingerprint:", row[0])
181 print("Uid:", row[1])
182 print("Allow:", row[2])
183 print()
185 session.rollback()
186 session.close()
189def acl_allow(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None:
190 session = DBConn().session()
192 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
193 fingerprint_id = (
194 session.query(Fingerprint)
195 .filter_by(fingerprint=fingerprint)
196 .one()
197 .fingerprint_id
198 )
200 # TODO: check that fpr is in ACL
202 data = [
203 {
204 "acl_id": acl_id,
205 "fingerprint_id": fingerprint_id,
206 "source": source,
207 "reason": "set by {} via CLI".format(os.environ.get("USER", "(unknown)")),
208 }
209 for source in sources
210 ]
212 session.execute(insert(ACLPerSource), data)
214 session.commit()
217def acl_allow_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None:
218 session = DBConn().session()
220 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
221 fingerprint_id = (
222 session.query(Fingerprint)
223 .filter_by(fingerprint=fingerprint)
224 .one()
225 .fingerprint_id
226 )
228 # TODO: check that fpr is in ACL
230 data = []
232 for suite in suites:
233 try:
234 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id
235 except:
236 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id
238 data.append(
239 {
240 "acl_id": acl_id,
241 "fingerprint_id": fingerprint_id,
242 "suite_id": suite_id,
243 "reason": "set by {} via CLI".format(
244 os.environ.get("USER", "(unknown)")
245 ),
246 }
247 )
249 session.execute(insert(ACLPerSuite), data)
251 session.commit()
254def acl_deny(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None:
255 session = DBConn().session()
257 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
258 fingerprint_id = (
259 session.query(Fingerprint)
260 .filter_by(fingerprint=fingerprint)
261 .one()
262 .fingerprint_id
263 )
265 # TODO: check that fpr is in ACL
267 for source in sources:
268 result = session.execute(
269 delete(ACLPerSource)
270 .where(ACLPerSource.acl_id == acl_id)
271 .where(ACLPerSource.fingerprint_id == fingerprint_id)
272 .where(ACLPerSource.source == source)
273 )
274 if result.rowcount < 1:
275 print(
276 "W: Tried to deny uploads of '{}', but was not allowed before.".format(
277 source
278 )
279 )
281 session.commit()
284def acl_deny_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None:
285 session = DBConn().session()
287 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
288 fingerprint_id = (
289 session.query(Fingerprint)
290 .filter_by(fingerprint=fingerprint)
291 .one()
292 .fingerprint_id
293 )
295 # TODO: check that fpr is in ACL
297 for suite in suites:
298 try:
299 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id
300 except:
301 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id
303 result = session.execute(
304 delete(ACLPerSuite)
305 .where(ACLPerSuite.acl_id == acl_id)
306 .where(ACLPerSuite.fingerprint_id == fingerprint_id)
307 .where(ACLPerSuite.suite_id == suite_id)
308 )
309 if result.rowcount < 1:
310 print(
311 "W: Tried to deny uploads for suite '{}', but was not allowed before.".format(
312 suite
313 )
314 )
316 session.commit()
319def main(argv=None):
320 if argv is None: 320 ↛ 323line 320 didn't jump to line 323 because the condition on line 320 was always true
321 argv = sys.argv
323 if len(argv) > 1 and argv[1] in ("-h", "--help"):
324 usage(0)
326 if len(argv) < 3: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 usage(1)
329 if argv[1] == "set-fingerprints": 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 acl_set_fingerprints(argv[2], sys.stdin)
331 elif argv[1] == "export-per-source":
332 acl_export_per_source(argv[2])
333 elif argv[1] == "export-per-suite":
334 acl_export_per_suite(argv[2])
335 elif argv[1] == "allow":
336 acl_allow(argv[2], argv[3], argv[4:])
337 elif argv[1] == "deny": 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 acl_deny(argv[2], argv[3], argv[4:])
339 elif argv[1] == "allow-suite": 339 ↛ 341line 339 didn't jump to line 341 because the condition on line 339 was always true
340 acl_allow_suite(argv[2], argv[3], argv[4:])
341 elif argv[1] == "deny-suite":
342 acl_deny_suite(argv[2], argv[3], argv[4:])
343 else:
344 usage(1)