Coverage for dak/import_keyring.py: 68%
136 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1#! /usr/bin/env python3
3"""Imports a keyring into the database"""
4# Copyright (C) 2007 Anthony Towns <aj@erisian.com.au>
5# Copyright (C) 2009 Mark Hymers <mhy@debian.org>
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
17# You should have received a copy of the GNU General Public License
18# along with this program; if not, write to the Free Software
19# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21################################################################################
23import sys
25import apt_pkg
26from sqlalchemy import sql
27from sqlalchemy.orm.exc import NoResultFound
29from daklib.config import Config
30from daklib.dbconn import DBConn, Fingerprint, Keyring, Uid, get_keyring
32# Globals
33Options = None
35################################################################################
38def get_uid_info(session):
39 byname = {}
40 byid = {}
41 q = session.execute(sql.text("SELECT id, uid, name FROM uid"))
42 for keyid, uid, name in q.fetchall():
43 byname[uid] = (keyid, name)
44 byid[keyid] = (uid, name)
46 return (byname, byid)
49def get_fingerprint_info(session):
50 fins = {}
51 q = session.execute(
52 sql.text("SELECT f.fingerprint, f.id, f.uid, f.keyring FROM fingerprint f")
53 )
54 for fingerprint, fingerprint_id, uid, keyring in q.fetchall():
55 fins[fingerprint] = (uid, fingerprint_id, keyring)
56 return fins
59def list_uids(session, pattern):
60 sql_pattern = f"%{pattern}%"
61 message = "List UIDs matching pattern %s" % sql_pattern
62 message += "\n" + ("=" * len(message))
63 print(message)
64 uid_query = session.query(Uid).filter(Uid.uid.ilike(sql_pattern))
65 for uid in uid_query.all():
66 print("\nuid %s" % uid.uid)
67 for fp in uid.fingerprint:
68 print(" fingerprint %s" % fp.fingerprint)
69 keyring = "unknown"
70 if fp.keyring:
71 keyring = fp.keyring.keyring_name
72 print(" keyring %s" % keyring)
75################################################################################
78def usage(exit_code=0):
79 print(
80 """Usage: dak import-keyring [OPTION]... [KEYRING]
81 -h, --help show this help and exit.
82 -L, --import-ldap-users generate uid entries for keyring from LDAP
83 -U, --generate-users FMT generate uid entries from keyring as FMT
84 -l, --list-uids STRING list all uids matching *STRING*
85 -n, --no-action don't change database"""
86 )
87 sys.exit(exit_code)
90################################################################################
93def main():
94 global Options
96 cnf = Config()
97 Arguments = [
98 ("h", "help", "Import-Keyring::Options::Help"),
99 ("L", "import-ldap-users", "Import-Keyring::Options::Import-Ldap-Users"),
100 ("U", "generate-users", "Import-Keyring::Options::Generate-Users", "HasArg"),
101 ("l", "list-uids", "Import-Keyring::Options::List-UIDs", "HasArg"),
102 ("n", "no-action", "Import-Keyring::Options::No-Action"),
103 ]
105 for i in [
106 "help",
107 "report-changes",
108 "generate-users",
109 "import-ldap-users",
110 "list-uids",
111 "no-action",
112 ]:
113 key = "Import-Keyring::Options::%s" % i
114 if key not in cnf: 114 ↛ 105line 114 didn't jump to line 105
115 cnf[key] = ""
117 keyring_names = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv) # type: ignore[attr-defined]
119 ### Parse options
121 Options = cnf.subtree("Import-Keyring::Options")
123 if Options["Help"]:
124 usage()
126 ### Initialise
127 session = DBConn().session()
129 if Options["List-UIDs"]: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 list_uids(session, Options["List-UIDs"])
131 sys.exit(0)
133 if len(keyring_names) != 1: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 usage(1)
136 ### Keep track of changes made
137 changes = [] # (uid, changes strings)
139 ### Cache all the existing fingerprint entries
140 db_fin_info = get_fingerprint_info(session)
142 ### Parse the keyring
144 keyringname = keyring_names[0]
145 keyring = get_keyring(keyringname, session)
146 if not keyring: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 print("E: Can't load keyring %s from database" % keyringname)
148 sys.exit(1)
150 keyring.load_keys(keyringname)
152 ### Generate new uid entries if they're needed (from LDAP or the keyring)
153 if Options["Generate-Users"]: 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true
154 _, desuid_byid = keyring.generate_users_from_keyring(
155 Options["Generate-Users"], session
156 )
157 elif Options["Import-Ldap-Users"]:
158 _, desuid_byid = keyring.import_users_from_ldap(session)
159 else:
160 desuid_byid = {}
162 ### Cache all the existing uid entries
163 (db_uid_byname, db_uid_byid) = get_uid_info(session)
165 ### Update full names of applicable users
166 for keyid in desuid_byid.keys():
167 uid = (keyid, desuid_byid[keyid][0])
168 name = desuid_byid[keyid][1]
169 oname = db_uid_byid[keyid][1]
170 if name and oname != name:
171 changes.append((uid[1], "Full name: %s" % (name)))
172 session.execute(
173 sql.text("UPDATE uid SET name = :name WHERE id = :keyid"),
174 {"name": name, "keyid": keyid},
175 )
177 # The fingerprint table (fpr) points to a uid and a keyring.
178 # If the uid is being decided here (ldap/generate) we set it to it.
179 # Otherwise, if the fingerprint table already has a uid (which we've
180 # cached earlier), we preserve it.
181 # Otherwise we leave it as None
183 fpr = {}
184 for z in keyring.keys.keys():
185 keyid = db_uid_byname.get(keyring.keys[z].get("uid", None), [None])[0]
186 if keyid is None: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 keyid = db_fin_info.get(keyring.keys[z]["fingerprints"][0], [None])[0]
188 for y in keyring.keys[z]["fingerprints"]:
189 fpr[y] = (keyid, keyring.keyring_id)
191 # For any keys that used to be in this keyring, disassociate them.
192 # We don't change the uid, leaving that for historical info; if
193 # the id should change, it'll be set when importing another keyring.
195 for f, (u, fid, kr) in db_fin_info.items():
196 if kr != keyring.keyring_id: 196 ↛ 199line 196 didn't jump to line 199 because the condition on line 196 was always true
197 continue
199 if f in fpr:
200 continue
202 changes.append((db_uid_byid.get(u, [None])[0], "Removed key: %s" % (f)))
203 session.execute(
204 sql.text(
205 """UPDATE fingerprint
206 SET keyring = NULL
207 WHERE id = :fprid"""
208 ),
209 {"fprid": fid},
210 )
212 # For the keys in this keyring, add/update any fingerprints that've
213 # changed.
215 for f in fpr:
216 newuid = fpr[f][0]
217 newuiduid = db_uid_byid.get(newuid, [None])[0]
219 (olduid, oldfid, oldkid) = db_fin_info.get(f, [-1, -1, -1])
221 if olduid is None: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 olduid = -1
224 if oldkid is None: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 oldkid = -1
227 if oldfid == -1:
228 changes.append((newuiduid, "Added key: %s" % (f)))
229 fp = Fingerprint()
230 fp.fingerprint = f
231 fp.keyring_id = keyring.keyring_id
232 if newuid: 232 ↛ 235line 232 didn't jump to line 235 because the condition on line 232 was always true
233 fp.uid_id = newuid
235 session.add(fp)
236 session.flush()
238 else:
239 if newuid and olduid != newuid and olduid == -1: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 changes.append((newuiduid, "Linked key: %s" % f))
241 changes.append((newuiduid, " (formerly unowned)"))
242 session.execute(
243 sql.text("UPDATE fingerprint SET uid = :uid WHERE id = :fpr"),
244 {"uid": newuid, "fpr": oldfid},
245 )
247 # Don't move a key from a keyring with a higher priority to a lower one
248 if oldkid != keyring.keyring_id: 248 ↛ 215line 248 didn't jump to line 215 because the condition on line 248 was always true
249 movekey = False
250 if oldkid == -1: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 movekey = True
252 else:
253 try:
254 oldkeyring = (
255 session.query(Keyring).filter_by(keyring_id=oldkid).one()
256 )
257 except NoResultFound:
258 print("ERROR: Cannot find old keyring with id %s" % oldkid)
259 sys.exit(1)
261 if oldkeyring.priority < keyring.priority: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 movekey = True
264 # Only change the keyring if it won't result in a loss of permissions
265 if movekey: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 session.execute(
267 sql.text(
268 """UPDATE fingerprint
269 SET keyring = :keyring
270 WHERE id = :fpr"""
271 ),
272 {"keyring": keyring.keyring_id, "fpr": oldfid},
273 )
275 session.flush()
277 else:
278 print(
279 "Key %s exists in both %s and %s keyrings. Not demoting."
280 % (f, oldkeyring.keyring_name, keyring.keyring_name)
281 )
283 # All done!
284 if Options["No-Action"]: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 session.rollback()
286 else:
287 session.commit()
289 # Print a summary
290 changesd = {}
291 for k, v in changes:
292 if k not in changesd:
293 changesd[k] = ""
294 changesd[k] += " %s\n" % (v)
296 for k in sorted(changesd):
297 print("%s\n%s\n" % (k, changesd[k]))
300################################################################################
303if __name__ == "__main__":
304 main()