1#! /usr/bin/env python3
2#
3# Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
4# Copyright (C) 2023 Emilio Pozuelo Monfort <pochu@debian.org>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License along
17# with this program; if not, write to the Free Software Foundation, Inc.,
18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20import os
21import sys
22from collections.abc import Iterable
23from typing import NoReturn, Optional
25from daklib.dbconn import DBConn, Fingerprint, Keyring, Suite, Uid, ACL
28def usage(status: int = 0) -> NoReturn:
29 print("""Usage:
30 dak acl set-fingerprints <acl-name>
31 dak acl export-per-source <acl-name>
32 dak acl allow <acl-name> <fingerprint> <source>...
33 dak acl deny <acl-name> <fingerprint> <source>...
34 dak acl export-per-suite <acl-name>
35 dak acl allow-suite <acl-name> <fingerprint> <suite>...
36 dak acl deny-suite <acl-name> <fingerprint> <suite>...
38 set-fingerprints:
39 Reads list of fingerprints from stdin and sets the ACL <acl-name> to these.
40 Accepted input formats are "uid:<uid>", "name:<name>" and
41 "fpr:<fingerprint>".
43 export-per-source:
44 Export per source upload rights for ACL <acl-name>.
46 allow, deny:
47 Grant (revoke) per-source upload rights for ACL <acl-name>.
49 export-per-suite:
50 Export per suite upload rights for ACL <acl-name>.
52 allow-suite, deny-suite:
53 Grant (revoke) per-suite upload rights for ACL <acl-name>.
54""")
55 sys.exit(status)
58def get_fingerprint(entry: str, session) -> Optional[Fingerprint]:
59 """get fingerprint for given ACL entry
61 The entry is a string in one of these formats::
63 uid:<uid>
64 name:<name>
65 fpr:<fingerprint>
66 keyring:<keyring-name>
68 :param entry: ACL entry
69 :param session: database session
70 :return: fingerprint for the entry
71 """
72 field, value = entry.split(":", 1)
73 q = session.query(Fingerprint).join(Fingerprint.keyring).filter(Keyring.active == True) # noqa:E712
75 if field == 'uid':
76 q = q.join(Fingerprint.uid).filter(Uid.uid == value)
77 elif field == 'name':
78 q = q.join(Fingerprint.uid).filter(Uid.name == value)
79 elif field == 'fpr':
80 q = q.filter(Fingerprint.fingerprint == value)
81 elif field == 'keyring':
82 q = q.filter(Keyring.keyring_name == value)
83 else:
84 raise Exception('Unknown selector "{0}".'.format(field))
86 return q.all()
89def acl_set_fingerprints(acl_name: str, entries: Iterable[str]) -> None:
90 session = DBConn().session()
91 acl = session.query(ACL).filter_by(name=acl_name).one()
93 acl.fingerprints.clear()
94 for entry in entries:
95 entry = entry.strip()
96 if entry.startswith('#') or len(entry) == 0:
97 continue
99 fps = get_fingerprint(entry, session)
100 if len(fps) == 0:
101 print("Unknown key for '{0}'".format(entry))
102 else:
103 acl.fingerprints.update(fps)
105 session.commit()
108def acl_export_per_source(acl_name: str) -> None:
109 session = DBConn().session()
110 acl = session.query(ACL).filter_by(name=acl_name).one()
112 query = r"""
113 SELECT
114 f.fingerprint,
115 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>'
116 FROM uid u
117 JOIN fingerprint f2 ON u.id = f2.uid
118 WHERE f2.id = f.id) AS name,
119 STRING_AGG(
120 a.source
121 || COALESCE(' (' || (SELECT fingerprint FROM fingerprint WHERE id = a.created_by_id) || ')', ''),
122 E',\n ' ORDER BY a.source)
123 FROM acl_per_source a
124 JOIN fingerprint f ON a.fingerprint_id = f.id
125 LEFT JOIN uid u ON f.uid = u.id
126 WHERE a.acl_id = :acl_id
127 GROUP BY f.id, f.fingerprint
128 ORDER BY name
129 """
131 for row in session.execute(query, {'acl_id': acl.id}):
132 print("Fingerprint:", row[0])
133 print("Uid:", row[1])
134 print("Allow:", row[2])
135 print()
137 session.rollback()
138 session.close()
141def acl_export_per_suite(acl_name: str) -> None:
142 session = DBConn().session()
143 acl = session.query(ACL).filter_by(name=acl_name).one()
145 query = r"""
146 SELECT
147 f.fingerprint,
148 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>'
149 FROM uid u
150 JOIN fingerprint f2 ON u.id = f2.uid
151 WHERE f2.id = f.id) AS name,
152 s.suite_name
153 FROM acl_per_suite a
154 JOIN fingerprint f ON a.fingerprint_id = f.id
155 JOIN suite s ON a.suite_id = s.id
156 LEFT JOIN uid u ON f.uid = u.id
157 WHERE a.acl_id = :acl_id
158 GROUP BY f.id, f.fingerprint, s.suite_name
159 ORDER BY name
160 """
162 for row in session.execute(query, {'acl_id': acl.id}):
163 print("Fingerprint:", row[0])
164 print("Uid:", row[1])
165 print("Allow:", row[2])
166 print()
168 session.rollback()
169 session.close()
172def acl_allow(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None:
173 tbl = DBConn().tbl_acl_per_source
175 session = DBConn().session()
177 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
178 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id
180 # TODO: check that fpr is in ACL
182 data = [
183 {
184 'acl_id': acl_id,
185 'fingerprint_id': fingerprint_id,
186 'source': source,
187 'reason': 'set by {} via CLI'.format(os.environ.get('USER', '(unknown)')),
188 }
189 for source in sources
190 ]
192 session.execute(tbl.insert(), data)
194 session.commit()
197def acl_allow_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None:
198 tbl = DBConn().tbl_acl_per_suite
200 session = DBConn().session()
202 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
203 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id
205 # TODO: check that fpr is in ACL
207 data = []
209 for suite in suites:
210 try:
211 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id
212 except:
213 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id
215 data.append({
216 'acl_id': acl_id,
217 'fingerprint_id': fingerprint_id,
218 'suite_id': suite_id,
219 'reason': 'set by {} via CLI'.format(os.environ.get('USER', '(unknown)')),
220 })
222 session.execute(tbl.insert(), data)
224 session.commit()
227def acl_deny(acl_name: str, fingerprint: str, sources: Iterable[str]) -> None:
228 tbl = DBConn().tbl_acl_per_source
230 session = DBConn().session()
232 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
233 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id
235 # TODO: check that fpr is in ACL
237 for source in sources:
238 result = session.execute(tbl.delete().where(tbl.c.acl_id == acl_id).where(tbl.c.fingerprint_id == fingerprint_id).where(tbl.c.source == source))
239 if result.rowcount < 1:
240 print("W: Tried to deny uploads of '{}', but was not allowed before.".format(source))
242 session.commit()
245def acl_deny_suite(acl_name: str, fingerprint: str, suites: Iterable[str]) -> None:
246 tbl = DBConn().tbl_acl_per_suite
248 session = DBConn().session()
250 acl_id = session.query(ACL).filter_by(name=acl_name).one().id
251 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id
253 # TODO: check that fpr is in ACL
255 for suite in suites:
256 try:
257 suite_id = session.query(Suite).filter_by(suite_name=suite).one().suite_id
258 except:
259 suite_id = session.query(Suite).filter_by(codename=suite).one().suite_id
261 result = session.execute(tbl.delete().where(tbl.c.acl_id == acl_id).where(tbl.c.fingerprint_id == fingerprint_id).where(tbl.c.suite_id == suite_id))
262 if result.rowcount < 1:
263 print("W: Tried to deny uploads for suite '{}', but was not allowed before.".format(suite))
265 session.commit()
268def main(argv=None):
269 if argv is None: 269 ↛ 272line 269 didn't jump to line 272, because the condition on line 269 was never false
270 argv = sys.argv
272 if len(argv) > 1 and argv[1] in ('-h', '--help'):
273 usage(0)
275 if len(argv) < 3: 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true
276 usage(1)
278 if argv[1] == 'set-fingerprints': 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true
279 acl_set_fingerprints(argv[2], sys.stdin)
280 elif argv[1] == 'export-per-source':
281 acl_export_per_source(argv[2])
282 elif argv[1] == 'export-per-suite':
283 acl_export_per_suite(argv[2])
284 elif argv[1] == 'allow':
285 acl_allow(argv[2], argv[3], argv[4:])
286 elif argv[1] == 'deny': 286 ↛ 287line 286 didn't jump to line 287, because the condition on line 286 was never true
287 acl_deny(argv[2], argv[3], argv[4:])
288 elif argv[1] == 'allow-suite': 288 ↛ 290line 288 didn't jump to line 290, because the condition on line 288 was never false
289 acl_allow_suite(argv[2], argv[3], argv[4:])
290 elif argv[1] == 'deny-suite':
291 acl_deny_suite(argv[2], argv[3], argv[4:])
292 else:
293 usage(1)