Coverage for daklib/cruft.py: 89%

86 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1""" 

2helper functions for cruft-report 

3 

4@contact: Debian FTPMaster <ftpmaster@debian.org> 

5@copyright 2011 Torsten Werner <twerner@debian.org> 

6""" 

7 

8# This program is free software; you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation; either version 2 of the License, or 

11# (at your option) any later version. 

12 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17 

18# You should have received a copy of the GNU General Public License 

19# along with this program; if not, write to the Free Software 

20# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

21 

22################################################################################ 

23 

24from collections.abc import Iterable 

25from typing import TYPE_CHECKING, cast, override 

26 

27from sqlalchemy import func, sql 

28from sqlalchemy.engine import CursorResult, Row 

29from sqlalchemy.orm import aliased, object_session 

30 

31from daklib.dbconn import Architecture, DBBinary, DBSource, Suite, get_suite 

32 

33if TYPE_CHECKING: 

34 from sqlalchemy.orm import Session 

35 from sqlalchemy.sql import FromClause 

36 

37 

38def newer_version( 

39 lowersuite_name: str, highersuite_name: str, session: "Session", include_equal=False 

40) -> list[tuple[str, str, str]]: 

41 """ 

42 Finds newer versions in lowersuite_name than in highersuite_name. Returns a 

43 list of tuples (source, higherversion, lowerversion) where higherversion is 

44 the newest version from highersuite_name and lowerversion is the newest 

45 version from lowersuite_name. 

46 """ 

47 

48 lowersuite = get_suite(lowersuite_name, session) 

49 assert lowersuite is not None 

50 highersuite = get_suite(highersuite_name, session) 

51 assert highersuite is not None 

52 

53 def get_suite_sources(suite: Suite) -> "FromClause": 

54 q1 = ( 

55 session.query(DBSource.source, func.max(DBSource.version).label("version")) 

56 .with_parent(suite) 

57 .group_by(DBSource.source) 

58 .subquery() 

59 ) 

60 return aliased(q1) 

61 

62 def get_suite_binaries(suite: Suite) -> "FromClause": 

63 q1 = ( 

64 session.query( 

65 DBBinary.package, 

66 DBSource.source, 

67 func.max(DBSource.version).label("version"), 

68 Architecture.arch_string, 

69 func.max(DBBinary.version).label("binversion"), 

70 ) 

71 .filter(DBBinary.suites.contains(suite)) 

72 .join(DBBinary.source) 

73 .join(DBBinary.architecture) 

74 .group_by( 

75 DBBinary.package, 

76 DBSource.source, 

77 Architecture.arch_string, 

78 ) 

79 .subquery() 

80 ) 

81 return aliased(q1) 

82 

83 highq = get_suite_sources(highersuite) 

84 lowq = get_suite_sources(lowersuite) 

85 

86 query = session.query( 

87 highq.c.source, 

88 highq.c.version.label("higherversion"), 

89 lowq.c.version.label("lowerversion"), 

90 ).join(lowq, highq.c.source == lowq.c.source) 

91 

92 if include_equal: 

93 query = query.filter(highq.c.version <= lowq.c.version) 

94 else: 

95 query = query.filter(highq.c.version < lowq.c.version) 

96 

97 list = [] 

98 # get all sources that have a higher version in lowersuite than in 

99 # highersuite 

100 for source, higherversion, lowerversion in query: 

101 q1 = ( 

102 session.query( 

103 DBBinary.package, 

104 DBSource.source, 

105 DBSource.version, 

106 Architecture.arch_string, 

107 ) 

108 .filter(DBBinary.suites.contains(highersuite)) 

109 .join(DBBinary.source) 

110 .join(DBBinary.architecture) 

111 .filter(DBSource.source == source) 

112 .subquery() 

113 ) 

114 q2 = session.query(q1.c.arch_string).group_by(q1.c.arch_string) 

115 # all architectures for which source has binaries in highersuite 

116 archs_high = set(x[0] for x in q2.all()) 

117 

118 highq = get_suite_binaries(highersuite) 

119 lowq = get_suite_binaries(lowersuite) 

120 

121 query = ( 

122 session.query(highq.c.arch_string) 

123 .join(lowq, highq.c.source == lowq.c.source) 

124 .filter(highq.c.arch_string == lowq.c.arch_string) 

125 .filter(highq.c.package == lowq.c.package) 

126 .filter(highq.c.source == source) 

127 ) 

128 

129 if include_equal: 

130 query = query.filter(highq.c.binversion <= lowq.c.binversion).filter( 

131 highq.c.version <= lowq.c.version 

132 ) 

133 else: 

134 query = query.filter(highq.c.binversion < lowq.c.binversion).filter( 

135 highq.c.version < lowq.c.version 

136 ) 

137 

138 query = query.group_by(highq.c.arch_string) 

139 

140 # all architectures for which source has a newer binary in lowersuite 

141 archs_newer = set(x[0] for x in query.all()) 

142 

143 # if has at least one binary in lowersuite which is newer than the one 

144 # in highersuite on each architecture for which source has binaries in 

145 # highersuite, we know that the builds for all relevant architecture 

146 # are done, so we can remove the old source with it's binaries 

147 if archs_newer >= archs_high: 

148 list.append((source, higherversion, lowerversion)) 

149 

150 list.sort() 

151 return list 

152 

153 

154def get_package_names(suite: Suite) -> "Iterable[Row[tuple[str]]]": 

155 """ 

156 Returns a query that selects all distinct package names from suite ordered 

157 by package name. 

158 """ 

159 

160 session = object_session(suite) 

161 assert session is not None 

162 return ( 

163 session.query(DBBinary.package) 

164 .with_parent(suite) 

165 .group_by(DBBinary.package) 

166 .order_by(DBBinary.package) 

167 ) 

168 

169 

170class NamedSource: 

171 """ 

172 A source package identified by its name with all of its versions in a 

173 suite. 

174 """ 

175 

176 def __init__(self, suite: Suite, source: str): 

177 self.source = source 

178 query = suite.sources.filter_by(source=source).order_by(DBSource.version) 

179 self.versions = [src.version for src in query] 

180 

181 @override 

182 def __str__(self): 

183 return "%s(%s)" % (self.source, ", ".join(self.versions)) 

184 

185 

186class DejavuBinary: 

187 """ 

188 A binary package identified by its name which gets built by multiple source 

189 packages in a suite. The architecture is ignored which leads to the 

190 following corner case, e.g.: 

191 

192 If a source package 'foo-mips' that builds a binary package 'foo' on mips 

193 and another source package 'foo-mipsel' builds a binary package with the 

194 same name 'foo' on mipsel then the binary package 'foo' will be reported as 

195 built from multiple source packages. 

196 """ 

197 

198 def __init__(self, suite: Suite, package: str): 

199 self.package = package 

200 session = object_session(suite) 

201 assert session is not None 

202 # We need a subquery to make sure that both binary and source packages 

203 # are in the right suite. 

204 bin_query = suite.binaries.filter_by(package=package).subquery() 

205 src_query = ( 

206 session.query(DBSource.source) 

207 .with_parent(suite) 

208 .join(bin_query) 

209 .order_by(DBSource.source) 

210 .group_by(DBSource.source) 

211 ) 

212 self.sources = [] 

213 if src_query.count() > 1: 

214 for (source,) in src_query: 

215 self.sources.append(str(NamedSource(suite, source))) 

216 

217 def has_multiple_sources(self) -> bool: 

218 "Has the package been built by multiple sources?" 

219 return len(self.sources) > 1 

220 

221 @override 

222 def __str__(self): 

223 return "%s built by: %s" % (self.package, ", ".join(self.sources)) 

224 

225 

226def report_multiple_source(suite: Suite) -> None: 

227 """ 

228 Reports binary packages built from multiple source package with different 

229 names. 

230 """ 

231 

232 print("Built from multiple source packages") 

233 print("-----------------------------------") 

234 print() 

235 for (package,) in get_package_names(suite): 

236 binary = DejavuBinary(suite, package) 

237 if binary.has_multiple_sources(): 

238 print(binary) 

239 print() 

240 

241 

242def query_without_source( 

243 suite_id: int, session: "Session" 

244) -> CursorResult[tuple[str, str]]: 

245 """searches for arch: all packages from suite that do no longer 

246 reference a source package in the same suite 

247 

248 subquery unique_binaries: selects all packages with only 1 version 

249 in suite since 'dak rm' does not allow to specify version numbers""" 

250 

251 query = """ 

252 with unique_binaries as 

253 (select package, max(version) as version, max(source) as source 

254 from bin_associations_binaries 

255 where architecture = 2 and suite = :suite_id 

256 group by package having count(*) = 1) 

257 select ub.package, ub.version 

258 from unique_binaries ub 

259 left join src_associations_src sas 

260 on ub.source = sas.src and sas.suite = :suite_id 

261 where sas.id is null 

262 order by ub.package""" 

263 return cast(CursorResult, session.execute(sql.text(query), {"suite_id": suite_id})) 

264 

265 

266def queryNBS( 

267 suite_id: int, session: "Session" 

268) -> CursorResult[tuple[list[str], list[str], str, str]]: 

269 """This one is really complex. It searches arch != all packages that 

270 are no longer built from current source packages in suite. 

271 

272 temp table unique_binaries: will be populated with packages that 

273 have only one version in suite because 'dak rm' does not allow 

274 specifying version numbers 

275 

276 temp table newest_binaries: will be populated with packages that are 

277 built from current sources 

278 

279 subquery uptodate_arch: returns all architectures built from current 

280 sources 

281 

282 subquery unique_binaries_uptodate_arch: returns all packages in 

283 architectures from uptodate_arch 

284 

285 subquery unique_binaries_uptodate_arch_agg: same as 

286 unique_binaries_uptodate_arch but with column architecture 

287 aggregated to array 

288 

289 subquery uptodate_packages: similar to uptodate_arch but returns all 

290 packages built from current sources 

291 

292 subquery outdated_packages: returns all packages with architectures 

293 no longer built from current source 

294 """ 

295 

296 query = """ 

297with 

298 unique_binaries as 

299 (select 

300 bab.package, 

301 bab.architecture, 

302 max(bab.source) as source 

303 from bin_associations_binaries bab 

304 where bab.suite = :suite_id and bab.architecture > 2 

305 group by package, architecture having count(*) = 1), 

306 newest_binaries as 

307 (select ub.package, ub.architecture, nsa.source, nsa.version 

308 from unique_binaries ub 

309 join newest_src_association nsa 

310 on ub.source = nsa.src and nsa.suite = :suite_id), 

311 uptodate_arch as 

312 (select architecture, source, version 

313 from newest_binaries 

314 group by architecture, source, version), 

315 unique_binaries_uptodate_arch as 

316 (select ub.package, (select a.arch_string from architecture a where a.id = ub.architecture) as arch_string, ua.source, ua.version 

317 from unique_binaries ub 

318 join source s 

319 on ub.source = s.id 

320 join uptodate_arch ua 

321 on ub.architecture = ua.architecture and s.source = ua.source), 

322 unique_binaries_uptodate_arch_agg as 

323 (select ubua.package, 

324 array_agg(ubua.arch_string order by ubua.arch_string) as arch_list, 

325 ubua.source, ubua.version 

326 from unique_binaries_uptodate_arch ubua 

327 group by ubua.source, ubua.version, ubua.package), 

328 uptodate_packages as 

329 (select package, source, version 

330 from newest_binaries 

331 group by package, source, version), 

332 outdated_packages as 

333 (select array_agg(package order by package) as pkg_list, 

334 arch_list, source, version 

335 from unique_binaries_uptodate_arch_agg 

336 where package not in 

337 (select package from uptodate_packages) 

338 group by arch_list, source, version) 

339 select * from outdated_packages order by source""" 

340 return cast(CursorResult, session.execute(sql.text(query), {"suite_id": suite_id})) 

341 

342 

343def queryNBS_metadata( 

344 suite_id: int, session: "Session" 

345) -> CursorResult[tuple[str, str]]: 

346 """searches for NBS packages based on metadata extraction of the 

347 newest source for a given suite""" 

348 

349 query = """ 

350 select string_agg(bin.package, ' ' order by bin.package), ( 

351 select arch_string 

352 from architecture 

353 where id = bin.architecture) as architecture, src.source, newsrc.version 

354 from bin_associations_binaries bin 

355 join src_associations_src src 

356 on src.src = bin.source 

357 and src.suite = bin.suite 

358 join newest_src_association newsrc 

359 on newsrc.source = src.source 

360 and newsrc.version != src.version 

361 and newsrc.suite = bin.suite 

362 where bin.suite = :suite_id 

363 and bin.package not in ( 

364 select trim(' \n' from unnest(string_to_array(meta.value, ','))) 

365 from source_metadata meta 

366 where meta.src_id = ( 

367 select newsrc.src 

368 from newest_src_association newsrc 

369 where newsrc.source = ( 

370 select s.source 

371 from source s 

372 where s.id = bin.source) 

373 and newsrc.suite = bin.suite) 

374 and key_id = ( 

375 select key_id 

376 from metadata_keys 

377 where key = 'Binary')) 

378 group by src.source, newsrc.version, architecture 

379 order by src.source, newsrc.version, bin.architecture""" 

380 return cast(CursorResult, session.execute(sql.text(query), {"suite_id": suite_id}))