Coverage for daklib/pdiff.py: 94%
286 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-04 16:18 +0000
1import asyncio
2import collections
3import os
4import subprocess
5import sys
6import tempfile
8import apt_pkg
10from daklib.dakapt import DakHashes
12HASH_FIELDS = [
13 ("SHA1-History", 0, 1, "", True),
14 ("SHA256-History", 0, 2, "", True),
15 ("SHA1-Patches", 1, 1, "", True),
16 ("SHA256-Patches", 1, 2, "", True),
17 ("SHA1-Download", 2, 1, ".gz", True),
18 ("SHA256-Download", 2, 2, ".gz", True),
19 ("X-Unmerged-SHA1-History", 0, 1, "", False),
20 ("X-Unmerged-SHA256-History", 0, 2, "", False),
21 ("X-Unmerged-SHA1-Patches", 1, 1, "", False),
22 ("X-Unmerged-SHA256-Patches", 1, 2, "", False),
23 ("X-Unmerged-SHA1-Download", 2, 1, ".gz", False),
24 ("X-Unmerged-SHA256-Download", 2, 2, ".gz", False),
25]
27HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS}
29_PDiffHashes = collections.namedtuple("_PDiffHashes", ["size", "sha1", "sha256"])
32async def asyncio_check_call(*args, **kwargs):
33 """async variant of subprocess.check_call
35 Parameters reflect that of asyncio.create_subprocess_exec or
36 (if "shell=True") that of asyncio.create_subprocess_shell
37 with restore_signals=True being the default.
38 """
39 kwargs.setdefault("restore_signals", True)
40 shell = kwargs.pop("shell", False)
41 if shell:
42 proc = await asyncio.create_subprocess_shell(*args, **kwargs)
43 else:
44 proc = await asyncio.create_subprocess_exec(*args, **kwargs)
45 retcode = await proc.wait()
46 if retcode != 0: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true
47 raise subprocess.CalledProcessError(retcode, args[0])
48 return 0
51async def open_decompressed(file, named_temp_file=False):
52 async def call_decompressor(cmd, inpath):
53 fh = (
54 tempfile.NamedTemporaryFile("w+")
55 if named_temp_file
56 else tempfile.TemporaryFile("w+")
57 )
58 with open(inpath, "rb") as rfd:
59 await asyncio_check_call(
60 *cmd,
61 stdin=rfd,
62 stdout=fh,
63 )
64 fh.seek(0)
65 return fh
67 if os.path.isfile(file):
68 return open(file, "r")
69 elif os.path.isfile("%s.gz" % file):
70 return await call_decompressor(["zcat"], "{}.gz".format(file))
71 elif os.path.isfile("%s.bz2" % file): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 return await call_decompressor(["bzcat"], "{}.bz2".format(file))
73 elif os.path.isfile("%s.xz" % file): 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true
74 return await call_decompressor(["xzcat"], "{}.xz".format(file))
75 elif os.path.isfile(f"{file}.zst"):
76 return await call_decompressor(["zstdcat"], f"{file}.zst")
77 else:
78 return None
81async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
82 """Merge two pdiff in to a merged pdiff
84 While rred support merging more than 2, we only need support for merging two.
85 In the steady state, we will have N merged patches plus 1 new patch. Here
86 we need to do N pairwise merges (i.e. merge two patches N times).
87 Therefore, supporting merging of 3+ patches does not help at all.
89 The setup state looks like it could do with a bulk merging. However, if you
90 merge from "latest to earliest" then you will be building in optimal order
91 and still only need to do N-1 pairwise merges (rather than N-1 merges
92 between N, N-1, N-2, ... 3, 2 patches).
94 Combined, supporting pairwise merges is sufficient for our use case.
95 """
96 with (
97 await open_decompressed(patch_a, named_temp_file=True) as fd_a,
98 await open_decompressed(patch_b, named_temp_file=True) as fd_b,
99 ):
100 await asyncio_check_call(
101 "/usr/lib/apt/methods/rred %s %s | gzip -9n > %s"
102 % (fd_a.name, fd_b.name, resulting_patch_without_extension + ".gz"),
103 shell=True,
104 )
107class PDiffHashes(_PDiffHashes):
109 @classmethod
110 def from_file(cls, fd):
111 size = os.fstat(fd.fileno())[6]
112 hashes = DakHashes(fd)
113 return cls(size, hashes.sha1, hashes.sha256)
116async def _pdiff_hashes_from_patch(path_without_extension):
117 with await open_decompressed(path_without_extension) as difff:
118 hashes_decompressed = PDiffHashes.from_file(difff)
120 with open(path_without_extension + ".gz", "r") as difffgz:
121 hashes_compressed = PDiffHashes.from_file(difffgz)
123 return hashes_decompressed, hashes_compressed
126def _prune_history(order, history, maximum):
127 cnt = len(order)
128 if cnt <= maximum:
129 return order
130 for h in order[: cnt - maximum]:
131 del history[h]
132 return order[cnt - maximum :]
135def _read_hashes(history, history_order, ind, hashind, lines):
136 current_order = []
137 for line in lines:
138 parts = line.split()
139 fname = parts[2]
140 if fname.endswith(".gz"):
141 fname = fname[:-3]
142 current_order.append(fname)
143 if fname not in history:
144 history[fname] = [None, None, None]
145 if not history[fname][ind]:
146 history[fname][ind] = PDiffHashes(int(parts[1]), None, None)
147 if hashind == 1:
148 history[fname][ind] = PDiffHashes(
149 history[fname][ind].size,
150 parts[0],
151 history[fname][ind].sha256,
152 )
153 else:
154 history[fname][ind] = PDiffHashes(
155 history[fname][ind].size,
156 history[fname][ind].sha1,
157 parts[0],
158 )
160 # Common-case: Either this is the first sequence we read and we
161 # simply adopt that
162 if not history_order:
163 return current_order
164 # Common-case: The current history perfectly matches the existing, so
165 # we just stop here.
166 if current_order == history_order:
167 return history_order
169 # Special-case, the histories are not aligned. This "should not happen"
170 # but has done so in the past due to bugs. Depending on which field is
171 # out of sync, dak would either self heal or be stuff forever. We
172 # realign the history to ensure we always end with "self-heal".
173 #
174 # Typically, the patches are aligned from the end as we always add a
175 # patch in the end of the series.
176 patches_from_the_end = 0
177 for p1, p2 in zip(reversed(current_order), reversed(history_order)):
178 if p1 == p2:
179 patches_from_the_end += 1
180 else:
181 break
183 if not patches_from_the_end:
184 return None
186 return current_order[-patches_from_the_end:]
189class PDiffIndex:
190 def __init__(self, patches_dir, max=56, merge_pdiffs=False):
191 self.can_path = None
192 self._history = {}
193 self._history_order = []
194 self._unmerged_history = {}
195 self._unmerged_history_order = []
196 self._old_merged_patches_prefix = []
197 self.max = max
198 self.patches_dir = patches_dir
199 self.filesizehashes = None
200 self.wants_merged_pdiffs = merge_pdiffs
201 self.has_merged_pdiffs = False
202 self.index_path = os.path.join(patches_dir, "Index")
203 self.read_index_file(self.index_path)
205 async def generate_and_add_patch_file(
206 self, original_file, new_file_uncompressed, patch_name
207 ):
209 with await open_decompressed(original_file) as oldf:
210 oldsizehashes = PDiffHashes.from_file(oldf)
212 with open(new_file_uncompressed, "r") as newf:
213 newsizehashes = PDiffHashes.from_file(newf)
215 if newsizehashes == oldsizehashes:
216 return
218 if not os.path.isdir(self.patches_dir):
219 os.mkdir(self.patches_dir)
221 oldf.seek(0)
222 patch_path = os.path.join(self.patches_dir, patch_name)
223 with open("{}.gz".format(patch_path), "wb") as fh:
224 await asyncio_check_call(
225 "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(
226 new_file_uncompressed
227 ),
228 shell=True,
229 stdin=oldf,
230 stdout=fh,
231 )
233 difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path)
235 self.filesizehashes = newsizehashes
236 self._unmerged_history[patch_name] = [
237 oldsizehashes,
238 difsizehashes,
239 difgzsizehashes,
240 ]
241 self._unmerged_history_order.append(patch_name)
243 if self.has_merged_pdiffs != self.wants_merged_pdiffs:
244 # Convert patches
245 if self.wants_merged_pdiffs:
246 await self._convert_to_merged_patches()
247 else:
248 self._convert_to_unmerged()
249 # Conversion also covers the newly added patch. Accordingly,
250 # the elif here.
251 else:
252 second_patch_name = patch_name
253 if self.wants_merged_pdiffs:
254 await self._bump_merged_patches()
255 second_patch_name = "T-%s-F-%s" % (patch_name, patch_name)
256 os.link(
257 os.path.join(self.patches_dir, patch_name + ".gz"),
258 os.path.join(self.patches_dir, second_patch_name + ".gz"),
259 )
261 # Without merged PDiffs, keep _history and _unmerged_history aligned
262 self._history[second_patch_name] = [
263 oldsizehashes,
264 difsizehashes,
265 difgzsizehashes,
266 ]
267 self._history_order.append(second_patch_name)
269 async def _bump_merged_patches(self):
270 # When bumping patches, we need to "rewrite" all merged patches. As
271 # neither apt nor dak supports by-hash for pdiffs, we leave the old
272 # versions of merged pdiffs behind.
273 target_name = self._unmerged_history_order[-1]
274 target_path = os.path.join(self.patches_dir, target_name)
276 new_merged_order = []
277 new_merged_history = {}
278 for old_merged_patch_name in self._history_order:
279 try:
280 old_orig_name = old_merged_patch_name.split("-F-", 1)[1]
281 except IndexError:
282 old_orig_name = old_merged_patch_name
283 new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name)
284 old_merged_patch_path = os.path.join(
285 self.patches_dir, old_merged_patch_name
286 )
287 new_merged_patch_path = os.path.join(
288 self.patches_dir, new_merged_patch_name
289 )
290 await _merge_pdiffs(
291 old_merged_patch_path, target_path, new_merged_patch_path
292 )
294 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(
295 new_merged_patch_path
296 )
298 new_merged_history[new_merged_patch_name] = [
299 self._history[old_merged_patch_name][0],
300 hashes_decompressed,
301 hashes_compressed,
302 ]
303 new_merged_order.append(new_merged_patch_name)
305 self._history_order = new_merged_order
306 self._history = new_merged_history
308 self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
310 def _convert_to_unmerged(self):
311 if not self.has_merged_pdiffs: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 return
313 # Converting from merged patches to unmerged patches is simply. Discard the merged
314 # patches. Cleanup will be handled by find_obsolete_patches
315 self._history = {k: v for k, v in self._unmerged_history.items()}
316 self._history_order = list(self._unmerged_history_order)
317 self._old_merged_patches_prefix = []
318 self.has_merged_pdiffs = False
320 async def _convert_to_merged_patches(self):
321 if self.has_merged_pdiffs: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true
322 return
324 target_name = self._unmerged_history_order[-1]
326 self._history = {}
327 self._history_order = []
329 new_patches: list[str] = []
331 # We merge from newest to oldest
332 #
333 # Assume we got N unmerged patches (u1 - uN) where given s1 then
334 # you can apply u1 to get to s2. From s2 you use u2 to move to s3
335 # and so on until you reach your target T (= sN+1).
336 #
337 # In the merged patch world, we want N merged patches called m1-N,
338 # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to
339 # T directly regardless of where you start.
340 #
341 # A note worthy special case is that m(N-1)-N is identical uN
342 # content-wise. This will be important in a moment. For now,
343 # lets start with looking at creating merged patches.
344 #
345 # We can get m1-N by merging u1 with m2-N because u1 will take s1
346 # to s2 and m2-N will take s2 to T. By the same argument, we get
347 # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until
348 # we get to the base-case m(N-1)-N - which is uN.
349 #
350 # From this, we can conclude that generating the patches in
351 # reverse order (i.e. m2-N is generated before m1-N) will get
352 # us the desired result in N-1 pair-wise merges without having
353 # to use all patches in one go. (This is also optimal in the
354 # sense that we need to update N-1 patches to preserve the
355 # entire history).
356 #
357 for patch_name in reversed(self._unmerged_history_order):
358 merged_patch = "T-%s-F-%s" % (target_name, patch_name)
359 merged_patch_path = os.path.join(self.patches_dir, merged_patch)
361 if new_patches:
362 oldest_patch = os.path.join(self.patches_dir, patch_name)
363 previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1])
364 await _merge_pdiffs(
365 oldest_patch, previous_merged_patch, merged_patch_path
366 )
368 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(
369 merged_patch_path
370 )
372 self._history[merged_patch] = [
373 self._unmerged_history[patch_name][0],
374 hashes_decompressed,
375 hashes_compressed,
376 ]
377 else:
378 # Special_case; the latest patch is its own "merged" variant.
379 os.link(
380 os.path.join(self.patches_dir, patch_name + ".gz"),
381 merged_patch_path + ".gz",
382 )
383 self._history[merged_patch] = self._unmerged_history[patch_name]
385 new_patches.append(merged_patch)
387 self._history_order = list(reversed(new_patches))
388 self._old_merged_patches_prefix.append(target_name)
389 self.has_merged_pdiffs = True
391 def read_index_file(self, index_file_path):
392 try:
393 with apt_pkg.TagFile(index_file_path) as index:
394 index.step() # type: ignore[attr-defined]
395 section: apt_pkg.TagSection = index.section # type: ignore[attr-defined]
396 self.has_merged_pdiffs = section.get("X-Patch-Precedence") == "merged"
397 self._old_merged_patches_prefix = section.get(
398 "X-DAK-Older-Patches", ""
399 ).split()
401 for field in section.keys():
402 value = section[field]
403 if field in HASH_FIELDS_TABLE:
404 ind, hashind, primary_history = HASH_FIELDS_TABLE[field]
405 if primary_history:
406 history = self._history
407 history_order = self._history_order
408 else:
409 history = self._unmerged_history
410 history_order = self._unmerged_history_order
412 if history_order is None:
413 # History is already misaligned and we cannot find a common restore point.
414 continue
416 new_order = _read_hashes(
417 history, history_order, ind, hashind, value.splitlines()
418 )
419 if primary_history:
420 self._history_order = new_order
421 else:
422 self._unmerged_history_order = new_order
423 continue
425 if field in ("Canonical-Name", "Canonical-Path"):
426 self.can_path = value
427 continue
429 if field not in ("SHA1-Current", "SHA256-Current"):
430 continue
432 columns = value.split()
434 if len(columns) != 2: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 continue
437 if not self.filesizehashes:
438 self.filesizehashes = PDiffHashes(int(columns[1]), None, None)
440 if field == "SHA1-Current":
441 self.filesizehashes = PDiffHashes(
442 self.filesizehashes.size,
443 columns[0],
444 self.filesizehashes.sha256,
445 )
447 if field == "SHA256-Current":
448 self.filesizehashes = PDiffHashes(
449 self.filesizehashes.size,
450 self.filesizehashes.sha1,
451 columns[0],
452 )
454 # Ensure that the order lists are defined again.
455 if self._history_order is None:
456 self._history_order = []
457 if self._unmerged_history_order is None: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true
458 self._unmerged_history_order = []
460 if not self.has_merged_pdiffs:
461 # When X-Patch-Precedence != merged, then the two histories are the same.
462 self._unmerged_history = {k: v for k, v in self._history.items()}
463 self._unmerged_history_order = list(self._history_order)
464 self._old_merged_patches_prefix = []
466 except (OSError, apt_pkg.Error):
467 # On error, we ignore everything. This causes the file to be regenerated from scratch.
468 # It forces everyone to download the full file for if they are behind.
469 # But it is self-healing providing that we generate valid files from here on.
470 pass
472 def prune_patch_history(self):
473 # Truncate our history if necessary
474 hs = self._history
475 order = self._history_order
476 unmerged_hs = self._unmerged_history
477 unmerged_order = self._unmerged_history_order
478 self._history_order = _prune_history(order, hs, self.max)
479 self._unmerged_history_order = _prune_history(
480 unmerged_order, unmerged_hs, self.max
481 )
483 prefix_cnt = len(self._old_merged_patches_prefix)
484 if prefix_cnt > 3:
485 self._old_merged_patches_prefix = self._old_merged_patches_prefix[
486 prefix_cnt - 3 :
487 ]
489 def find_obsolete_patches(self):
490 if not os.path.isdir(self.patches_dir):
491 return
493 hs = self._history
494 unmerged_hs = self._unmerged_history
496 keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix)
498 # Scan for obsolete patches. While we could have computed these
499 # from the history, this method has the advantage of cleaning up
500 # old patches left that we failed to remove previously (e.g. if
501 # we had an index corruption, which happened in fed7ada36b609 and
502 # was later fixed in a36f867acf029)
503 for name in os.listdir(self.patches_dir):
504 if name in ("Index", "by-hash"):
505 continue
506 # We keep some old merged patches around (as neither apt nor
507 # dak supports by-hash for pdiffs)
508 if keep_prefixes and name.startswith(keep_prefixes):
509 continue
510 basename, ext = os.path.splitext(name)
511 if ext in ("", ".gz") and (basename in hs or basename in unmerged_hs):
512 continue
513 path = os.path.join(self.patches_dir, name)
514 if not os.path.isfile(path): 514 ↛ 516line 514 didn't jump to line 516 because the condition on line 514 was never true
515 # Non-files are probably not patches.
516 continue
517 # Unknown patch file; flag it as obsolete
518 yield path
520 def dump(self, out=sys.stdout):
521 if self.can_path:
522 out.write("Canonical-Path: %s\n" % self.can_path)
524 if self.filesizehashes: 524 ↛ 536line 524 didn't jump to line 536 because the condition on line 524 was always true
525 if self.filesizehashes.sha1: 525 ↛ 530line 525 didn't jump to line 530 because the condition on line 525 was always true
526 out.write(
527 "SHA1-Current: %s %7d\n"
528 % (self.filesizehashes.sha1, self.filesizehashes.size)
529 )
530 if self.filesizehashes.sha256: 530 ↛ 536line 530 didn't jump to line 536 because the condition on line 530 was always true
531 out.write(
532 "SHA256-Current: %s %7d\n"
533 % (self.filesizehashes.sha256, self.filesizehashes.size)
534 )
536 for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS:
538 if primary_history:
539 hs = self._history
540 order = self._history_order
541 elif self.has_merged_pdiffs:
542 hs = self._unmerged_history
543 order = self._unmerged_history_order
544 else:
545 continue
547 out.write("%s:\n" % fieldname)
548 for h in order:
549 if hs[h][ind] and hs[h][ind][hashind]: 549 ↛ 548line 549 didn't jump to line 548 because the condition on line 549 was always true
550 out.write(
551 " %s %7d %s%s\n"
552 % (hs[h][ind][hashind], hs[h][ind].size, h, ext)
553 )
555 if self.has_merged_pdiffs:
556 out.write("X-Patch-Precedence: merged\n")
557 if self._old_merged_patches_prefix: 557 ↛ exitline 557 didn't return from function 'dump' because the condition on line 557 was always true
558 out.write(
559 "X-DAK-Older-Patches: %s\n"
560 % " ".join(self._old_merged_patches_prefix)
561 )
563 def update_index(self, tmp_suffix=".new"):
564 if not os.path.isdir(self.patches_dir):
565 # If there is no patch directory, then we have no patches.
566 # It seems weird to have an Index of patches when we know there are
567 # none.
568 return
569 tmp_path = self.index_path + tmp_suffix
570 with open(tmp_path, "w") as f:
571 self.dump(f)
572 os.rename(tmp_path, self.index_path)