Package dak :: Module acl
[hide private]
[frames] | no frames]

Source Code for Module dak.acl

  1  #! /usr/bin/env python3 
  2  # 
  3  # Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org> 
  4  # 
  5  # This program is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 2 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # This program is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License along 
 16  # with this program; if not, write to the Free Software Foundation, Inc., 
 17  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 
 18   
 19  import os 
 20  import sys 
 21   
 22  from daklib.dbconn import DBConn, Fingerprint, Keyring, Uid, ACL 
 23   
 24   
25 -def usage(status=0):
26 print("""Usage: 27 dak acl set-fingerprints <acl-name> 28 dak acl export-per-source <acl-name> 29 dak acl allow <acl-name> <fingerprint> <source>... 30 dak acl deny <acl-name> <fingerprint> <source>... 31 32 set-fingerprints: 33 Reads list of fingerprints from stdin and sets the ACL <acl-name> to these. 34 Accepted input formats are "uid:<uid>", "name:<name>" and 35 "fpr:<fingerprint>". 36 37 export-per-source: 38 Export per source upload rights for ACL <acl-name>. 39 40 allow, deny: 41 Grant (revoke) per-source upload rights for ACL <acl-name>. 42 """) 43 sys.exit(status)
44 45
46 -def get_fingerprint(entry, session):
47 """get fingerprint for given ACL entry 48 49 The entry is a string in one of these formats:: 50 51 uid:<uid> 52 name:<name> 53 fpr:<fingerprint> 54 keyring:<keyring-name> 55 56 @type entry: string 57 @param entry: ACL entry 58 59 @param session: database session 60 61 @rtype: L{daklib.dbconn.Fingerprint} or C{None} 62 @return: fingerprint for the entry 63 """ 64 field, value = entry.split(":", 1) 65 q = session.query(Fingerprint).join(Fingerprint.keyring).filter(Keyring.active == True) # noqa:E712 66 67 if field == 'uid': 68 q = q.join(Fingerprint.uid).filter(Uid.uid == value) 69 elif field == 'name': 70 q = q.join(Fingerprint.uid).filter(Uid.name == value) 71 elif field == 'fpr': 72 q = q.filter(Fingerprint.fingerprint == value) 73 elif field == 'keyring': 74 q = q.filter(Keyring.keyring_name == value) 75 else: 76 raise Exception('Unknown selector "{0}".'.format(field)) 77 78 return q.all()
79 80
81 -def acl_set_fingerprints(acl_name, entries):
82 session = DBConn().session() 83 acl = session.query(ACL).filter_by(name=acl_name).one() 84 85 acl.fingerprints.clear() 86 for entry in entries: 87 entry = entry.strip() 88 if entry.startswith('#') or len(entry) == 0: 89 continue 90 91 fps = get_fingerprint(entry, session) 92 if len(fps) == 0: 93 print("Unknown key for '{0}'".format(entry)) 94 else: 95 acl.fingerprints.update(fps) 96 97 session.commit()
98 99
100 -def acl_export_per_source(acl_name):
101 session = DBConn().session() 102 acl = session.query(ACL).filter_by(name=acl_name).one() 103 104 query = r""" 105 SELECT 106 f.fingerprint, 107 (SELECT COALESCE(u.name, '') || ' <' || u.uid || '>' 108 FROM uid u 109 JOIN fingerprint f2 ON u.id = f2.uid 110 WHERE f2.id = f.id) AS name, 111 STRING_AGG( 112 a.source 113 || COALESCE(' (' || (SELECT fingerprint FROM fingerprint WHERE id = a.created_by_id) || ')', ''), 114 E',\n ' ORDER BY a.source) 115 FROM acl_per_source a 116 JOIN fingerprint f ON a.fingerprint_id = f.id 117 LEFT JOIN uid u ON f.uid = u.id 118 WHERE a.acl_id = :acl_id 119 GROUP BY f.id, f.fingerprint 120 ORDER BY name 121 """ 122 123 for row in session.execute(query, {'acl_id': acl.id}): 124 print("Fingerprint:", row[0]) 125 print("Uid:", row[1]) 126 print("Allow:", row[2]) 127 print() 128 129 session.rollback() 130 session.close()
131 132
133 -def acl_allow(acl_name, fingerprint, sources):
134 tbl = DBConn().tbl_acl_per_source 135 136 session = DBConn().session() 137 138 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 139 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id 140 141 # TODO: check that fpr is in ACL 142 143 data = [ 144 { 145 'acl_id': acl_id, 146 'fingerprint_id': fingerprint_id, 147 'source': source, 148 'reason': 'set by {} via CLI'.format(os.environ.get('USER', '(unknown)')), 149 } 150 for source in sources 151 ] 152 153 session.execute(tbl.insert(), data) 154 155 session.commit()
156 157
158 -def acl_deny(acl_name, fingerprint, sources):
159 tbl = DBConn().tbl_acl_per_source 160 161 session = DBConn().session() 162 163 acl_id = session.query(ACL).filter_by(name=acl_name).one().id 164 fingerprint_id = session.query(Fingerprint).filter_by(fingerprint=fingerprint).one().fingerprint_id 165 166 # TODO: check that fpr is in ACL 167 168 for source in sources: 169 result = session.execute(tbl.delete().where(tbl.c.acl_id == acl_id).where(tbl.c.fingerprint_id == fingerprint_id).where(tbl.c.source == source)) 170 if result.rowcount < 1: 171 print("W: Tried to deny uploads of '{}', but was not allowed before.".format(source)) 172 173 session.commit()
174 175
176 -def main(argv=None):
177 if argv is None: 178 argv = sys.argv 179 180 if len(argv) > 1 and argv[1] in ('-h', '--help'): 181 usage(0) 182 183 if len(argv) < 3: 184 usage(1) 185 186 if argv[1] == 'set-fingerprints': 187 acl_set_fingerprints(argv[2], sys.stdin) 188 elif argv[1] == 'export-per-source': 189 acl_export_per_source(argv[2]) 190 elif argv[1] == 'allow': 191 acl_allow(argv[2], argv[3], argv[4:]) 192 elif argv[1] == 'deny': 193 acl_deny(argv[2], argv[3], argv[4:]) 194 else: 195 usage(1)
196