Coverage for dak/import_keyring.py: 68%

136 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1#! /usr/bin/env python3 

2 

3"""Imports a keyring into the database""" 

4# Copyright (C) 2007 Anthony Towns <aj@erisian.com.au> 

5# Copyright (C) 2009 Mark Hymers <mhy@debian.org> 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16 

17# You should have received a copy of the GNU General Public License 

18# along with this program; if not, write to the Free Software 

19# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

20 

21################################################################################ 

22 

23import sys 

24 

25import apt_pkg 

26from sqlalchemy import sql 

27from sqlalchemy.orm.exc import NoResultFound 

28 

29from daklib.config import Config 

30from daklib.dbconn import DBConn, Fingerprint, Keyring, Uid, get_keyring 

31 

32# Globals 

33Options = None 

34 

35################################################################################ 

36 

37 

38def get_uid_info(session): 

39 byname = {} 

40 byid = {} 

41 q = session.execute(sql.text("SELECT id, uid, name FROM uid")) 

42 for keyid, uid, name in q.fetchall(): 

43 byname[uid] = (keyid, name) 

44 byid[keyid] = (uid, name) 

45 

46 return (byname, byid) 

47 

48 

49def get_fingerprint_info(session): 

50 fins = {} 

51 q = session.execute( 

52 sql.text("SELECT f.fingerprint, f.id, f.uid, f.keyring FROM fingerprint f") 

53 ) 

54 for fingerprint, fingerprint_id, uid, keyring in q.fetchall(): 

55 fins[fingerprint] = (uid, fingerprint_id, keyring) 

56 return fins 

57 

58 

59def list_uids(session, pattern): 

60 sql_pattern = f"%{pattern}%" 

61 message = "List UIDs matching pattern %s" % sql_pattern 

62 message += "\n" + ("=" * len(message)) 

63 print(message) 

64 uid_query = session.query(Uid).filter(Uid.uid.ilike(sql_pattern)) 

65 for uid in uid_query.all(): 

66 print("\nuid %s" % uid.uid) 

67 for fp in uid.fingerprint: 

68 print(" fingerprint %s" % fp.fingerprint) 

69 keyring = "unknown" 

70 if fp.keyring: 

71 keyring = fp.keyring.keyring_name 

72 print(" keyring %s" % keyring) 

73 

74 

75################################################################################ 

76 

77 

78def usage(exit_code=0): 

79 print( 

80 """Usage: dak import-keyring [OPTION]... [KEYRING] 

81 -h, --help show this help and exit. 

82 -L, --import-ldap-users generate uid entries for keyring from LDAP 

83 -U, --generate-users FMT generate uid entries from keyring as FMT 

84 -l, --list-uids STRING list all uids matching *STRING* 

85 -n, --no-action don't change database""" 

86 ) 

87 sys.exit(exit_code) 

88 

89 

90################################################################################ 

91 

92 

93def main(): 

94 global Options 

95 

96 cnf = Config() 

97 Arguments = [ 

98 ("h", "help", "Import-Keyring::Options::Help"), 

99 ("L", "import-ldap-users", "Import-Keyring::Options::Import-Ldap-Users"), 

100 ("U", "generate-users", "Import-Keyring::Options::Generate-Users", "HasArg"), 

101 ("l", "list-uids", "Import-Keyring::Options::List-UIDs", "HasArg"), 

102 ("n", "no-action", "Import-Keyring::Options::No-Action"), 

103 ] 

104 

105 for i in [ 

106 "help", 

107 "report-changes", 

108 "generate-users", 

109 "import-ldap-users", 

110 "list-uids", 

111 "no-action", 

112 ]: 

113 key = "Import-Keyring::Options::%s" % i 

114 if key not in cnf: 114 ↛ 105line 114 didn't jump to line 105

115 cnf[key] = "" 

116 

117 keyring_names = apt_pkg.parse_commandline(cnf.Cnf, Arguments, sys.argv) # type: ignore[attr-defined] 

118 

119 ### Parse options 

120 

121 Options = cnf.subtree("Import-Keyring::Options") 

122 

123 if Options["Help"]: 

124 usage() 

125 

126 ### Initialise 

127 session = DBConn().session() 

128 

129 if Options["List-UIDs"]: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 list_uids(session, Options["List-UIDs"]) 

131 sys.exit(0) 

132 

133 if len(keyring_names) != 1: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 usage(1) 

135 

136 ### Keep track of changes made 

137 changes = [] # (uid, changes strings) 

138 

139 ### Cache all the existing fingerprint entries 

140 db_fin_info = get_fingerprint_info(session) 

141 

142 ### Parse the keyring 

143 

144 keyringname = keyring_names[0] 

145 keyring = get_keyring(keyringname, session) 

146 if not keyring: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 print("E: Can't load keyring %s from database" % keyringname) 

148 sys.exit(1) 

149 

150 keyring.load_keys(keyringname) 

151 

152 ### Generate new uid entries if they're needed (from LDAP or the keyring) 

153 if Options["Generate-Users"]: 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true

154 _, desuid_byid = keyring.generate_users_from_keyring( 

155 Options["Generate-Users"], session 

156 ) 

157 elif Options["Import-Ldap-Users"]: 

158 _, desuid_byid = keyring.import_users_from_ldap(session) 

159 else: 

160 desuid_byid = {} 

161 

162 ### Cache all the existing uid entries 

163 (db_uid_byname, db_uid_byid) = get_uid_info(session) 

164 

165 ### Update full names of applicable users 

166 for keyid in desuid_byid.keys(): 

167 uid = (keyid, desuid_byid[keyid][0]) 

168 name = desuid_byid[keyid][1] 

169 oname = db_uid_byid[keyid][1] 

170 if name and oname != name: 

171 changes.append((uid[1], "Full name: %s" % (name))) 

172 session.execute( 

173 sql.text("UPDATE uid SET name = :name WHERE id = :keyid"), 

174 {"name": name, "keyid": keyid}, 

175 ) 

176 

177 # The fingerprint table (fpr) points to a uid and a keyring. 

178 # If the uid is being decided here (ldap/generate) we set it to it. 

179 # Otherwise, if the fingerprint table already has a uid (which we've 

180 # cached earlier), we preserve it. 

181 # Otherwise we leave it as None 

182 

183 fpr = {} 

184 for z in keyring.keys.keys(): 

185 keyid = db_uid_byname.get(keyring.keys[z].get("uid", None), [None])[0] 

186 if keyid is None: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 keyid = db_fin_info.get(keyring.keys[z]["fingerprints"][0], [None])[0] 

188 for y in keyring.keys[z]["fingerprints"]: 

189 fpr[y] = (keyid, keyring.keyring_id) 

190 

191 # For any keys that used to be in this keyring, disassociate them. 

192 # We don't change the uid, leaving that for historical info; if 

193 # the id should change, it'll be set when importing another keyring. 

194 

195 for f, (u, fid, kr) in db_fin_info.items(): 

196 if kr != keyring.keyring_id: 196 ↛ 199line 196 didn't jump to line 199 because the condition on line 196 was always true

197 continue 

198 

199 if f in fpr: 

200 continue 

201 

202 changes.append((db_uid_byid.get(u, [None])[0], "Removed key: %s" % (f))) 

203 session.execute( 

204 sql.text( 

205 """UPDATE fingerprint 

206 SET keyring = NULL 

207 WHERE id = :fprid""" 

208 ), 

209 {"fprid": fid}, 

210 ) 

211 

212 # For the keys in this keyring, add/update any fingerprints that've 

213 # changed. 

214 

215 for f in fpr: 

216 newuid = fpr[f][0] 

217 newuiduid = db_uid_byid.get(newuid, [None])[0] 

218 

219 (olduid, oldfid, oldkid) = db_fin_info.get(f, [-1, -1, -1]) 

220 

221 if olduid is None: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 olduid = -1 

223 

224 if oldkid is None: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 oldkid = -1 

226 

227 if oldfid == -1: 

228 changes.append((newuiduid, "Added key: %s" % (f))) 

229 fp = Fingerprint() 

230 fp.fingerprint = f 

231 fp.keyring_id = keyring.keyring_id 

232 if newuid: 232 ↛ 235line 232 didn't jump to line 235 because the condition on line 232 was always true

233 fp.uid_id = newuid 

234 

235 session.add(fp) 

236 session.flush() 

237 

238 else: 

239 if newuid and olduid != newuid and olduid == -1: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 changes.append((newuiduid, "Linked key: %s" % f)) 

241 changes.append((newuiduid, " (formerly unowned)")) 

242 session.execute( 

243 sql.text("UPDATE fingerprint SET uid = :uid WHERE id = :fpr"), 

244 {"uid": newuid, "fpr": oldfid}, 

245 ) 

246 

247 # Don't move a key from a keyring with a higher priority to a lower one 

248 if oldkid != keyring.keyring_id: 248 ↛ 215line 248 didn't jump to line 215 because the condition on line 248 was always true

249 movekey = False 

250 if oldkid == -1: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 movekey = True 

252 else: 

253 try: 

254 oldkeyring = ( 

255 session.query(Keyring).filter_by(keyring_id=oldkid).one() 

256 ) 

257 except NoResultFound: 

258 print("ERROR: Cannot find old keyring with id %s" % oldkid) 

259 sys.exit(1) 

260 

261 if oldkeyring.priority < keyring.priority: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 movekey = True 

263 

264 # Only change the keyring if it won't result in a loss of permissions 

265 if movekey: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 session.execute( 

267 sql.text( 

268 """UPDATE fingerprint 

269 SET keyring = :keyring 

270 WHERE id = :fpr""" 

271 ), 

272 {"keyring": keyring.keyring_id, "fpr": oldfid}, 

273 ) 

274 

275 session.flush() 

276 

277 else: 

278 print( 

279 "Key %s exists in both %s and %s keyrings. Not demoting." 

280 % (f, oldkeyring.keyring_name, keyring.keyring_name) 

281 ) 

282 

283 # All done! 

284 if Options["No-Action"]: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 session.rollback() 

286 else: 

287 session.commit() 

288 

289 # Print a summary 

290 changesd = {} 

291 for k, v in changes: 

292 if k not in changesd: 

293 changesd[k] = "" 

294 changesd[k] += " %s\n" % (v) 

295 

296 for k in sorted(changesd): 

297 print("%s\n%s\n" % (k, changesd[k])) 

298 

299 

300################################################################################ 

301 

302 

303if __name__ == "__main__": 

304 main()