Coverage for dak/make_changelog.py: 82%

160 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1#! /usr/bin/env python3 

2 

3""" 

4Generate changelog entry between two suites 

5 

6@contact: Debian FTP Master <ftpmaster@debian.org> 

7@copyright: 2010 Luca Falavigna <dktrkranz@debian.org> 

8@license: GNU General Public License version 2 or later 

9""" 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21# You should have received a copy of the GNU General Public License 

22# along with this program; if not, write to the Free Software 

23# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

24 

25################################################################################ 

26 

27# <bdefreese> !dinstall 

28# <dak> bdefreese: I guess the next dinstall will be in 0hr 1min 35sec 

29# <bdefreese> Wow I have great timing 

30# <DktrKranz> dating with dinstall, part II 

31# <bdefreese> heh 

32# <Ganneff> dating with that monster? do you have good combat armor? 

33# <bdefreese> +5 Plate :) 

34# <Ganneff> not a good one then 

35# <Ganneff> so you wont even manage to bypass the lesser monster in front, unchecked 

36# <DktrKranz> asbesto belt 

37# <Ganneff> helps only a step 

38# <DktrKranz> the Ultimate Weapon: cron_turned_off 

39# <bdefreese> heh 

40# <Ganneff> thats debadmin limited 

41# <Ganneff> no option for you 

42# <DktrKranz> bdefreese: it seems ftp-masters want dinstall to sexual harass us, are you good in running? 

43# <Ganneff> you can run but you can not hide 

44# <bdefreese> No, I'm old and fat :) 

45# <Ganneff> you can roll but you can not hide 

46# <Ganneff> :) 

47# <bdefreese> haha 

48# <DktrKranz> damn dinstall, you racist bastard 

49 

50################################################################################ 

51 

52import os 

53import sys 

54from glob import glob 

55from shutil import rmtree 

56from typing import TYPE_CHECKING, NoReturn 

57 

58import apt_pkg 

59from sqlalchemy import sql 

60from yaml import safe_dump 

61 

62from daklib import utils 

63from daklib.contents import UnpackedSource 

64from daklib.dbconn import Archive, DBConn, get_suite 

65from daklib.regexes import re_no_epoch 

66 

67if TYPE_CHECKING: 

68 from sqlalchemy.engine import Result 

69 from sqlalchemy.orm import Session 

70 

71################################################################################ 

72 

73filelist = "filelist.yaml" 

74 

75 

76def usage(exit_code=0) -> NoReturn: 

77 print( 

78 """Generate changelog between two suites 

79 

80 Usage: 

81 make-changelog -s <suite> -b <base_suite> [OPTION]... 

82 make-changelog -e -a <archive> 

83 

84Options: 

85 

86 -h, --help show this help and exit 

87 -s, --suite suite providing packages to compare 

88 -b, --base-suite suite to be taken as reference for comparison 

89 -n, --binnmu display binNMUs uploads instead of source ones 

90 

91 -e, --export export interesting files from source packages 

92 -a, --archive archive to fetch data from 

93 -p, --progress display progress status""" 

94 ) 

95 

96 sys.exit(exit_code) 

97 

98 

99def get_source_uploads( 

100 suite: str, base_suite: str, session: "Session" 

101) -> "Result[tuple[str, str, str]]": 

102 """ 

103 Returns changelogs for source uploads where version is newer than base. 

104 """ 

105 

106 query = """WITH base AS ( 

107 SELECT source, max(version) AS version 

108 FROM source_suite 

109 WHERE suite_name = :base_suite 

110 GROUP BY source 

111 UNION (SELECT source, CAST(0 AS debversion) AS version 

112 FROM source_suite 

113 WHERE suite_name = :suite 

114 EXCEPT SELECT source, CAST(0 AS debversion) AS version 

115 FROM source_suite 

116 WHERE suite_name = :base_suite 

117 ORDER BY source)), 

118 cur_suite AS ( 

119 SELECT source, max(version) AS version 

120 FROM source_suite 

121 WHERE suite_name = :suite 

122 GROUP BY source) 

123 SELECT DISTINCT c.source, c.version, c.changelog 

124 FROM changelogs c 

125 JOIN base b ON b.source = c.source 

126 JOIN cur_suite cs ON cs.source = c.source 

127 WHERE c.version > b.version 

128 AND c.version <= cs.version 

129 AND c.architecture LIKE '%source%' 

130 ORDER BY c.source, c.version DESC""" 

131 

132 return session.execute(sql.text(query), {"suite": suite, "base_suite": base_suite}) 

133 

134 

135def get_binary_uploads( 

136 suite: str, base_suite: str, session: "Session" 

137) -> "Result[tuple[str, str, str, str]]": 

138 """ 

139 Returns changelogs for binary uploads where version is newer than base. 

140 """ 

141 

142 query = """WITH base as ( 

143 SELECT s.source, max(b.version) AS version, a.arch_string 

144 FROM source s 

145 JOIN binaries b ON b.source = s.id 

146 JOIN bin_associations ba ON ba.bin = b.id 

147 JOIN architecture a ON a.id = b.architecture 

148 WHERE ba.suite = ( 

149 SELECT id 

150 FROM suite 

151 WHERE suite_name = :base_suite) 

152 GROUP BY s.source, a.arch_string), 

153 cur_suite as ( 

154 SELECT s.source, max(b.version) AS version, a.arch_string 

155 FROM source s 

156 JOIN binaries b ON b.source = s.id 

157 JOIN bin_associations ba ON ba.bin = b.id 

158 JOIN architecture a ON a.id = b.architecture 

159 WHERE ba.suite = ( 

160 SELECT id 

161 FROM suite 

162 WHERE suite_name = :suite) 

163 GROUP BY s.source, a.arch_string) 

164 SELECT DISTINCT c.source, c.version, c.architecture, c.changelog 

165 FROM changelogs c 

166 JOIN base b on b.source = c.source 

167 JOIN cur_suite cs ON cs.source = c.source 

168 WHERE c.version > b.version 

169 AND c.version <= cs.version 

170 AND c.architecture = b.arch_string 

171 AND c.architecture = cs.arch_string 

172 ORDER BY c.source, c.version DESC, c.architecture""" 

173 

174 return session.execute(sql.text(query), {"suite": suite, "base_suite": base_suite}) 

175 

176 

177def display_changes(uploads, index): 

178 prev_upload = None 

179 for upload in uploads: 

180 if prev_upload and prev_upload != upload[0]: 

181 print() 

182 print(upload[index]) 

183 prev_upload = upload[0] 

184 

185 

186def export_files( 

187 session: "Session", archive: Archive, clpool: str, progress=False 

188) -> None: 

189 """ 

190 Export interesting files from source packages. 

191 """ 

192 pool = os.path.join(archive.path, "pool") 

193 

194 sources: dict[str, dict[str, tuple[str, str]]] = {} 

195 unpack: dict[str, tuple[str, set[str]]] = {} 

196 files = ("changelog", "copyright", "NEWS", "NEWS.Debian", "README.Debian") 

197 stats = {"unpack": 0, "created": 0, "removed": 0, "errors": 0, "files": 0} 

198 query = """SELECT DISTINCT s.source, su.suite_name AS suite, s.version, c.name || '/' || f.filename AS filename 

199 FROM source s 

200 JOIN newest_source n ON n.source = s.source AND n.version = s.version 

201 JOIN src_associations sa ON sa.source = s.id 

202 JOIN suite su ON su.id = sa.suite 

203 JOIN files f ON f.id = s.file 

204 JOIN files_archive_map fam ON f.id = fam.file_id AND fam.archive_id = su.archive_id 

205 JOIN component c ON fam.component_id = c.id 

206 WHERE su.archive_id = :archive_id 

207 ORDER BY s.source, suite""" 

208 

209 for row in session.execute(sql.text(query), {"archive_id": archive.archive_id}): 

210 if row[0] not in sources: 

211 sources[row[0]] = {} 

212 sources[row[0]][row[1]] = (re_no_epoch.sub("", row[2]), row[3]) 

213 

214 for p in sources.keys(): 

215 for s in sources[p].keys(): 

216 path = os.path.join(clpool, "/".join(sources[p][s][1].split("/")[:-1])) 

217 if not os.path.exists(path): 

218 os.makedirs(path) 

219 if not os.path.exists( 219 ↛ 226line 219 didn't jump to line 226 because the condition on line 219 was always true

220 os.path.join(path, "%s_%s_changelog" % (p, sources[p][s][0])) 

221 ): 

222 if os.path.join(pool, sources[p][s][1]) not in unpack: 

223 unpack[os.path.join(pool, sources[p][s][1])] = (path, set()) 

224 unpack[os.path.join(pool, sources[p][s][1])][1].add(s) 

225 else: 

226 for file in glob("%s/%s_%s_*" % (path, p, sources[p][s][0])): 

227 link = "%s%s" % (s, file.split("%s_%s" % (p, sources[p][s][0]))[1]) 

228 try: 

229 os.unlink(os.path.join(path, link)) 

230 except OSError: 

231 pass 

232 os.link(os.path.join(path, file), os.path.join(path, link)) 

233 

234 for p in unpack.keys(): 

235 package = os.path.splitext(os.path.basename(p))[0].split("_") 

236 try: 

237 unpacked = UnpackedSource(p, clpool) 

238 tempdir = unpacked.get_root_directory() 

239 stats["unpack"] += 1 

240 if progress: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 if stats["unpack"] % 100 == 0: 

242 print("%d packages unpacked" % stats["unpack"], file=sys.stderr) 

243 elif stats["unpack"] % 10 == 0: 

244 print(".", end="", file=sys.stderr) 

245 for file in files: 

246 for f in glob(os.path.join(tempdir, "debian", "*%s" % file)): 

247 for s in unpack[p][1]: 

248 suite = os.path.join( 

249 unpack[p][0], "%s_%s" % (s, os.path.basename(f)) 

250 ) 

251 version = os.path.join( 

252 unpack[p][0], 

253 "%s_%s_%s" % (package[0], package[1], os.path.basename(f)), 

254 ) 

255 if not os.path.exists(version): 

256 os.link(f, version) 

257 stats["created"] += 1 

258 try: 

259 os.unlink(suite) 

260 except OSError: 

261 pass 

262 os.link(version, suite) 

263 stats["created"] += 1 

264 unpacked.cleanup() 

265 except Exception as e: 

266 print("make-changelog: unable to unpack %s\n%s" % (p, e)) 

267 stats["errors"] += 1 

268 

269 for root, dirs, files2 in os.walk(clpool, topdown=False): 

270 files2 = [f for f in files2 if f != filelist] 

271 if len(files2): 

272 if root != clpool: 272 ↛ 277line 272 didn't jump to line 277 because the condition on line 272 was always true

273 if root.split("/")[-1] not in sources: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 if os.path.exists(root): 

275 stats["removed"] += len(os.listdir(root)) 

276 rmtree(root) 

277 for file in files2: 

278 if os.path.exists(os.path.join(root, file)): 278 ↛ 277line 278 didn't jump to line 277 because the condition on line 278 was always true

279 if os.stat(os.path.join(root, file)).st_nlink == 1: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 stats["removed"] += 1 

281 os.unlink(os.path.join(root, file)) 

282 for dir in dirs: 

283 try: 

284 os.rmdir(os.path.join(root, dir)) 

285 except OSError: 

286 pass 

287 stats["files"] += len(files2) 

288 stats["files"] -= stats["removed"] 

289 

290 print("make-changelog: file exporting finished") 

291 print(" * New packages unpacked: %d" % stats["unpack"]) 

292 print(" * New files created: %d" % stats["created"]) 

293 print(" * New files removed: %d" % stats["removed"]) 

294 print(" * Unpack errors: %d" % stats["errors"]) 

295 print(" * Files available into changelog pool: %d" % stats["files"]) 

296 

297 

298def generate_export_filelist(clpool: str) -> None: 

299 clfiles: dict[str, dict[str, list[str]]] = {} 

300 for root, dirs, files in os.walk(clpool): 

301 for file in [f for f in files if f != filelist]: 

302 clpath = os.path.join(root, file).replace(clpool, "").strip("/") 

303 source = clpath.split("/")[2] 

304 elements = clpath.split("/")[3].split("_") 

305 if source not in clfiles: 

306 clfiles[source] = {} 

307 if elements[0] == source: 

308 if elements[1] not in clfiles[source]: 308 ↛ 310line 308 didn't jump to line 310 because the condition on line 308 was always true

309 clfiles[source][elements[1]] = [] 

310 clfiles[source][elements[1]].append(clpath) 

311 else: 

312 if elements[0] not in clfiles[source]: 312 ↛ 314line 312 didn't jump to line 314 because the condition on line 312 was always true

313 clfiles[source][elements[0]] = [] 

314 clfiles[source][elements[0]].append(clpath) 

315 with open(os.path.join(clpool, filelist), "w+") as fd: 

316 safe_dump(clfiles, fd, default_flow_style=False) 

317 

318 

319def main() -> None: 

320 Cnf = utils.get_conf() 

321 Arguments = [ 

322 ("h", "help", "Make-Changelog::Options::Help"), 

323 ("a", "archive", "Make-Changelog::Options::Archive", "HasArg"), 

324 ("s", "suite", "Make-Changelog::Options::Suite", "HasArg"), 

325 ("b", "base-suite", "Make-Changelog::Options::Base-Suite", "HasArg"), 

326 ("n", "binnmu", "Make-Changelog::Options::binNMU"), 

327 ("e", "export", "Make-Changelog::Options::export"), 

328 ("p", "progress", "Make-Changelog::Options::progress"), 

329 ] 

330 

331 for i in ["help", "suite", "base-suite", "binnmu", "export", "progress"]: 

332 key = "Make-Changelog::Options::%s" % i 

333 if key not in Cnf: 333 ↛ 331line 333 didn't jump to line 331 because the condition on line 333 was always true

334 Cnf[key] = "" # type: ignore[index] 

335 

336 apt_pkg.parse_commandline(Cnf, Arguments, sys.argv) # type: ignore[attr-defined] 

337 Options = Cnf.subtree("Make-Changelog::Options") # type: ignore[attr-defined] 

338 suite = Cnf["Make-Changelog::Options::Suite"] 

339 base_suite = Cnf["Make-Changelog::Options::Base-Suite"] 

340 binnmu = Cnf["Make-Changelog::Options::binNMU"] 

341 export = Cnf["Make-Changelog::Options::export"] 

342 progress = Cnf["Make-Changelog::Options::progress"] 

343 

344 if Options["help"] or not (suite and base_suite) and not export: 

345 usage() 

346 

347 for s in suite, base_suite: 

348 if not export and not get_suite(s): 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true

349 utils.fubar('Invalid suite "%s"' % s) 

350 

351 session = DBConn().session() 

352 

353 if export: 

354 archive = ( 

355 session.query(Archive).filter_by(archive_name=Options["Archive"]).one() 

356 ) 

357 exportpath = archive.changelog 

358 if exportpath: 358 ↛ 362line 358 didn't jump to line 362 because the condition on line 358 was always true

359 export_files(session, archive, exportpath, progress) 

360 generate_export_filelist(exportpath) 

361 else: 

362 utils.fubar("No changelog export path defined") 

363 elif binnmu: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true

364 display_changes(get_binary_uploads(suite, base_suite, session), 3) 

365 else: 

366 display_changes(get_source_uploads(suite, base_suite, session), 2) 

367 

368 session.commit() 

369 

370 

371if __name__ == "__main__": 

372 main()