1
2
3 """ generates partial package updates list"""
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 import asyncio
35 import errno
36 import os
37 import re
38 import sys
39 import time
40 import traceback
41
42 import apt_pkg
43
44 from daklib import utils, pdiff
45 from daklib.dbconn import Archive, Component, DBConn, Suite, get_suite, get_suite_architectures
46 from daklib.pdiff import PDiffIndex
47
48 re_includeinpdiff = re.compile(r"(Translation-[a-zA-Z_]+\.(?:bz2|xz))")
49
50
51
52 Cnf = None
53 Logger = None
54 Options = None
55
56
57
58
60 print("""Usage: dak generate-index-diffs [OPTIONS] [suites]
61 Write out ed-style diffs to Packages/Source lists
62
63 -h, --help show this help and exit
64 -a <archive> generate diffs for suites in <archive>
65 -c give the canonical path of the file
66 -p name for the patch (defaults to current time)
67 -d name for the hardlink farm for status
68 -m how many diffs to generate
69 -n take no action
70 -v be verbose and list each file as we work on it
71 """)
72 sys.exit(exit_code)
73
74
76 try:
77 os.unlink(file)
78 except OSError:
79 print("warning: removing of %s denied" % (file))
80
81
83 for ext in ["", ".gz", ".bz2", ".xz", ".zst"]:
84 if os.path.isfile(file + ext):
85 return (ext, os.stat(file + ext))
86 return (None, None)
87
88
90 async def call_decompressor(cmd, inpath, outpath):
91 with open(inpath, "rb") as rfd, open(outpath, "wb") as wfd:
92 await pdiff.asyncio_check_call(
93 *cmd,
94 stdin=rfd,
95 stdout=wfd,
96 )
97
98 if os.path.isfile(f):
99 os.link(f, t)
100 elif os.path.isfile("%s.gz" % (f)):
101 await call_decompressor(['gzip', '-d'], '{}.gz'.format(f), t)
102 elif os.path.isfile("%s.bz2" % (f)):
103 await call_decompressor(['bzip2', '-d'], '{}.bz2'.format(f), t)
104 elif os.path.isfile("%s.xz" % (f)):
105 await call_decompressor(['xz', '-d', '-T0'], '{}.xz'.format(f), t)
106 elif os.path.isfile(f"{f}.zst"):
107 await call_decompressor(['zstd', '--decompress'], f'{f}.zst', t)
108 else:
109 print("missing: %s" % (f))
110 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), f)
111
112
113 -async def genchanges(Options, outdir, oldfile, origfile, maxdiffs=56, merged_pdiffs=False):
114 if "NoAct" in Options:
115 print("Not acting on: od: %s, oldf: %s, origf: %s, md: %s" % (outdir, oldfile, origfile, maxdiffs))
116 return
117
118 patchname = Options["PatchName"]
119
120
121
122
123
124
125
126 (oldext, oldstat) = smartstat(oldfile)
127 (origext, origstat) = smartstat(origfile)
128 if not origstat:
129 print("%s: doesn't exist" % (origfile))
130 return
131
132 old_full_path = oldfile + origext
133 resolved_orig_path = os.path.realpath(origfile + origext)
134
135 if not oldstat:
136 print("%s: initial run" % origfile)
137
138
139 if os.path.islink(old_full_path):
140 os.unlink(old_full_path)
141 os.link(resolved_orig_path, old_full_path)
142 return
143
144 if oldstat[1:3] == origstat[1:3]:
145 return
146
147 upd = PDiffIndex(outdir, int(maxdiffs), merged_pdiffs)
148
149 if "CanonicalPath" in Options:
150 upd.can_path = Options["CanonicalPath"]
151
152
153
154
155 newfile = oldfile + ".new"
156 if os.path.exists(newfile):
157 os.unlink(newfile)
158
159 await smartlink(origfile, newfile)
160
161 try:
162 await upd.generate_and_add_patch_file(oldfile, newfile, patchname)
163 finally:
164 os.unlink(newfile)
165
166 upd.prune_patch_history()
167
168 for obsolete_patch in upd.find_obsolete_patches():
169 tryunlink(obsolete_patch)
170
171 upd.update_index()
172
173 if oldfile + oldext != old_full_path and os.path.islink(old_full_path):
174
175
176 os.unlink(old_full_path)
177
178 os.unlink(oldfile + oldext)
179 os.link(resolved_orig_path, old_full_path)
180
181
183 global Cnf, Options, Logger
184
185 os.umask(0o002)
186
187 Cnf = utils.get_conf()
188 Arguments = [('h', "help", "Generate-Index-Diffs::Options::Help"),
189 ('a', 'archive', 'Generate-Index-Diffs::Options::Archive', 'hasArg'),
190 ('c', None, "Generate-Index-Diffs::Options::CanonicalPath", "hasArg"),
191 ('p', "patchname", "Generate-Index-Diffs::Options::PatchName", "hasArg"),
192 ('d', "tmpdir", "Generate-Index-Diffs::Options::TempDir", "hasArg"),
193 ('m', "maxdiffs", "Generate-Index-Diffs::Options::MaxDiffs", "hasArg"),
194 ('n', "no-act", "Generate-Index-Diffs::Options::NoAct"),
195 ('v', "verbose", "Generate-Index-Diffs::Options::Verbose"),
196 ]
197 suites = apt_pkg.parse_commandline(Cnf, Arguments, sys.argv)
198 Options = Cnf.subtree("Generate-Index-Diffs::Options")
199 if "Help" in Options:
200 usage()
201
202 maxdiffs = Options.get("MaxDiffs::Default", "56")
203 maxpackages = Options.get("MaxDiffs::Packages", maxdiffs)
204 maxcontents = Options.get("MaxDiffs::Contents", maxdiffs)
205 maxsources = Options.get("MaxDiffs::Sources", maxdiffs)
206
207
208 max_parallel = int(Options.get("MaxParallel", "8"))
209
210 if "PatchName" not in Options:
211 format = "%Y-%m-%d-%H%M.%S"
212 Options["PatchName"] = time.strftime(format)
213
214 session = DBConn().session()
215 pending_tasks = []
216
217 if not suites:
218 query = session.query(Suite.suite_name)
219 if Options.get('Archive'):
220 archives = utils.split_args(Options['Archive'])
221 query = query.join(Suite.archive).filter(Archive.archive_name.in_(archives))
222 suites = [s.suite_name for s in query]
223
224 for suitename in suites:
225 print("Processing: " + suitename)
226
227 suiteobj = get_suite(suitename.lower(), session=session)
228
229
230 suite = suiteobj.suite_name
231
232 if suiteobj.untouchable:
233 print("Skipping: " + suite + " (untouchable)")
234 continue
235
236 skip_all = True
237 if suiteobj.separate_contents_architecture_all or suiteobj.separate_packages_architecture_all:
238 skip_all = False
239
240 architectures = get_suite_architectures(suite, skipall=skip_all, session=session)
241 components = [c.component_name for c in session.query(Component.component_name)]
242
243 suite_suffix = utils.suite_suffix(suitename)
244 if components and suite_suffix:
245 longsuite = suite + "/" + suite_suffix
246 else:
247 longsuite = suite
248
249 merged_pdiffs = suiteobj.merged_pdiffs
250
251 tree = os.path.join(suiteobj.archive.path, 'dists', longsuite)
252
253
254 cwd = os.getcwd()
255 for component in components:
256 workpath = os.path.join(tree, component, "i18n")
257 if os.path.isdir(workpath):
258 os.chdir(workpath)
259 for dirpath, dirnames, filenames in os.walk(".", followlinks=True, topdown=True):
260 for entry in filenames:
261 if not re_includeinpdiff.match(entry):
262 continue
263 (fname, fext) = os.path.splitext(entry)
264 processfile = os.path.join(workpath, fname)
265 storename = "%s/%s_%s_%s" % (Options["TempDir"], suite, component, fname)
266 coroutine = genchanges(Options, processfile + ".diff", storename, processfile, maxdiffs, merged_pdiffs)
267 pending_tasks.append(coroutine)
268 os.chdir(cwd)
269
270 for archobj in architectures:
271 architecture = archobj.arch_string
272
273 if architecture == "source":
274 longarch = architecture
275 packages = "Sources"
276 maxsuite = maxsources
277 else:
278 longarch = "binary-%s" % architecture
279 packages = "Packages"
280 maxsuite = maxpackages
281
282 for component in components:
283
284 file = "%s/%s/Contents-%s" % (tree, component, architecture)
285
286 storename = "%s/%s_%s_contents_%s" % (Options["TempDir"], suite, component, architecture)
287 coroutine = genchanges(Options, file + ".diff", storename, file, maxcontents, merged_pdiffs)
288 pending_tasks.append(coroutine)
289
290 file = "%s/%s/%s/%s" % (tree, component, longarch, packages)
291 storename = "%s/%s_%s_%s" % (Options["TempDir"], suite, component, architecture)
292 coroutine = genchanges(Options, file + ".diff", storename, file, maxsuite, merged_pdiffs)
293 pending_tasks.append(coroutine)
294
295 asyncio.run(process_pdiff_tasks(pending_tasks, max_parallel))
296
297
299 if limit is not None:
300
301 semaphore = asyncio.Semaphore(limit)
302
303 async def bounded_task(task):
304 async with semaphore:
305 return await task
306
307 pending_coroutines = [bounded_task(task) for task in pending_coroutines]
308
309 print(f"Processing {len(pending_coroutines)} PDiff generation tasks (parallel limit {limit})")
310 start = time.time()
311 pending_tasks = [asyncio.create_task(coroutine) for coroutine in pending_coroutines]
312 done, pending = await asyncio.wait(pending_tasks)
313 duration = round(time.time() - start, 2)
314
315 errors = False
316
317 for task in done:
318 try:
319 task.result()
320 except Exception:
321 traceback.print_exc()
322 errors = True
323
324 if errors:
325 print(f"Processing failed after {duration} seconds")
326 sys.exit(1)
327
328 print(f"Processing finished {duration} seconds")
329
330
331
332
333 if __name__ == '__main__':
334 main()
335