1import asyncio
2import collections
3import os
4import subprocess
5import sys
6import tempfile
8import apt_pkg
10from daklib.dakapt import DakHashes
12HASH_FIELDS = [
13 ("SHA1-History", 0, 1, "", True),
14 ("SHA256-History", 0, 2, "", True),
15 ("SHA1-Patches", 1, 1, "", True),
16 ("SHA256-Patches", 1, 2, "", True),
17 ("SHA1-Download", 2, 1, ".gz", True),
18 ("SHA256-Download", 2, 2, ".gz", True),
19 ("X-Unmerged-SHA1-History", 0, 1, "", False),
20 ("X-Unmerged-SHA256-History", 0, 2, "", False),
21 ("X-Unmerged-SHA1-Patches", 1, 1, "", False),
22 ("X-Unmerged-SHA256-Patches", 1, 2, "", False),
23 ("X-Unmerged-SHA1-Download", 2, 1, ".gz", False),
24 ("X-Unmerged-SHA256-Download", 2, 2, ".gz", False),
25]
27HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS}
29_PDiffHashes = collections.namedtuple("_PDiffHashes", ["size", "sha1", "sha256"])
32async def asyncio_check_call(*args, **kwargs):
33 """async variant of subprocess.check_call
35 Parameters reflect that of asyncio.create_subprocess_exec or
36 (if "shell=True") that of asyncio.create_subprocess_shell
37 with restore_signals=True being the default.
38 """
39 kwargs.setdefault("restore_signals", True)
40 shell = kwargs.pop("shell", False)
41 if shell:
42 proc = await asyncio.create_subprocess_shell(*args, **kwargs)
43 else:
44 proc = await asyncio.create_subprocess_exec(*args, **kwargs)
45 retcode = await proc.wait()
46 if retcode != 0: 46 ↛ 47line 46 didn't jump to line 47, because the condition on line 46 was never true
47 raise subprocess.CalledProcessError(retcode, args[0])
48 return 0
51async def open_decompressed(file, named_temp_file=False):
52 async def call_decompressor(cmd, inpath):
53 fh = (
54 tempfile.NamedTemporaryFile("w+")
55 if named_temp_file
56 else tempfile.TemporaryFile("w+")
57 )
58 with open(inpath, "rb") as rfd:
59 await asyncio_check_call(
60 *cmd,
61 stdin=rfd,
62 stdout=fh,
63 )
64 fh.seek(0)
65 return fh
67 if os.path.isfile(file):
68 return open(file, "r")
69 elif os.path.isfile("%s.gz" % file):
70 return await call_decompressor(["zcat"], "{}.gz".format(file))
71 elif os.path.isfile("%s.bz2" % file): 71 ↛ 72line 71 didn't jump to line 72, because the condition on line 71 was never true
72 return await call_decompressor(["bzcat"], "{}.bz2".format(file))
73 elif os.path.isfile("%s.xz" % file): 73 ↛ 75line 73 didn't jump to line 75, because the condition on line 73 was never false
74 return await call_decompressor(["xzcat"], "{}.xz".format(file))
75 elif os.path.isfile(f"{file}.zst"):
76 return await call_decompressor(["zstdcat"], f"{file}.zst")
77 else:
78 return None
81async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
82 """Merge two pdiff in to a merged pdiff
84 While rred support merging more than 2, we only need support for merging two.
85 In the steady state, we will have N merged patches plus 1 new patch. Here
86 we need to do N pairwise merges (i.e. merge two patches N times).
87 Therefore, supporting merging of 3+ patches does not help at all.
89 The setup state looks like it could do with a bulk merging. However, if you
90 merge from "latest to earliest" then you will be building in optimal order
91 and still only need to do N-1 pairwise merges (rather than N-1 merges
92 between N, N-1, N-2, ... 3, 2 patches).
94 Combined, supporting pairwise merges is sufficient for our use case.
95 """
96 with await open_decompressed(
97 patch_a, named_temp_file=True
98 ) as fd_a, await open_decompressed(patch_b, named_temp_file=True) as fd_b:
99 await asyncio_check_call(
100 "/usr/lib/apt/methods/rred %s %s | gzip -9n > %s"
101 % (fd_a.name, fd_b.name, resulting_patch_without_extension + ".gz"),
102 shell=True,
103 )
106class PDiffHashes(_PDiffHashes):
108 @classmethod
109 def from_file(cls, fd):
110 size = os.fstat(fd.fileno())[6]
111 hashes = DakHashes(fd)
112 return cls(size, hashes.sha1, hashes.sha256)
115async def _pdiff_hashes_from_patch(path_without_extension):
116 with await open_decompressed(path_without_extension) as difff:
117 hashes_decompressed = PDiffHashes.from_file(difff)
119 with open(path_without_extension + ".gz", "r") as difffgz:
120 hashes_compressed = PDiffHashes.from_file(difffgz)
122 return hashes_decompressed, hashes_compressed
125def _prune_history(order, history, maximum):
126 cnt = len(order)
127 if cnt <= maximum:
128 return order
129 for h in order[: cnt - maximum]:
130 del history[h]
131 return order[cnt - maximum :]
134def _read_hashes(history, history_order, ind, hashind, lines):
135 current_order = []
136 for line in lines:
137 parts = line.split()
138 fname = parts[2]
139 if fname.endswith(".gz"):
140 fname = fname[:-3]
141 current_order.append(fname)
142 if fname not in history:
143 history[fname] = [None, None, None]
144 if not history[fname][ind]:
145 history[fname][ind] = PDiffHashes(int(parts[1]), None, None)
146 if hashind == 1:
147 history[fname][ind] = PDiffHashes(
148 history[fname][ind].size,
149 parts[0],
150 history[fname][ind].sha256,
151 )
152 else:
153 history[fname][ind] = PDiffHashes(
154 history[fname][ind].size,
155 history[fname][ind].sha1,
156 parts[0],
157 )
159 # Common-case: Either this is the first sequence we read and we
160 # simply adopt that
161 if not history_order:
162 return current_order
163 # Common-case: The current history perfectly matches the existing, so
164 # we just stop here.
165 if current_order == history_order:
166 return history_order
168 # Special-case, the histories are not aligned. This "should not happen"
169 # but has done so in the past due to bugs. Depending on which field is
170 # out of sync, dak would either self heal or be stuff forever. We
171 # realign the history to ensure we always end with "self-heal".
172 #
173 # Typically, the patches are aligned from the end as we always add a
174 # patch in the end of the series.
175 patches_from_the_end = 0
176 for p1, p2 in zip(reversed(current_order), reversed(history_order)):
177 if p1 == p2:
178 patches_from_the_end += 1
179 else:
180 break
182 if not patches_from_the_end:
183 return None
185 return current_order[-patches_from_the_end:]
188class PDiffIndex:
189 def __init__(self, patches_dir, max=56, merge_pdiffs=False):
190 self.can_path = None
191 self._history = {}
192 self._history_order = []
193 self._unmerged_history = {}
194 self._unmerged_history_order = []
195 self._old_merged_patches_prefix = []
196 self.max = max
197 self.patches_dir = patches_dir
198 self.filesizehashes = None
199 self.wants_merged_pdiffs = merge_pdiffs
200 self.has_merged_pdiffs = False
201 self.index_path = os.path.join(patches_dir, "Index")
202 self.read_index_file(self.index_path)
204 async def generate_and_add_patch_file(
205 self, original_file, new_file_uncompressed, patch_name
206 ):
208 with await open_decompressed(original_file) as oldf:
209 oldsizehashes = PDiffHashes.from_file(oldf)
211 with open(new_file_uncompressed, "r") as newf:
212 newsizehashes = PDiffHashes.from_file(newf)
214 if newsizehashes == oldsizehashes:
215 return
217 if not os.path.isdir(self.patches_dir):
218 os.mkdir(self.patches_dir)
220 oldf.seek(0)
221 patch_path = os.path.join(self.patches_dir, patch_name)
222 with open("{}.gz".format(patch_path), "wb") as fh:
223 await asyncio_check_call(
224 "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(
225 new_file_uncompressed
226 ),
227 shell=True,
228 stdin=oldf,
229 stdout=fh,
230 )
232 difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path)
234 self.filesizehashes = newsizehashes
235 self._unmerged_history[patch_name] = [
236 oldsizehashes,
237 difsizehashes,
238 difgzsizehashes,
239 ]
240 self._unmerged_history_order.append(patch_name)
242 if self.has_merged_pdiffs != self.wants_merged_pdiffs:
243 # Convert patches
244 if self.wants_merged_pdiffs:
245 await self._convert_to_merged_patches()
246 else:
247 self._convert_to_unmerged()
248 # Conversion also covers the newly added patch. Accordingly,
249 # the elif here.
250 else:
251 second_patch_name = patch_name
252 if self.wants_merged_pdiffs:
253 await self._bump_merged_patches()
254 second_patch_name = "T-%s-F-%s" % (patch_name, patch_name)
255 os.link(
256 os.path.join(self.patches_dir, patch_name + ".gz"),
257 os.path.join(self.patches_dir, second_patch_name + ".gz"),
258 )
260 # Without merged PDiffs, keep _history and _unmerged_history aligned
261 self._history[second_patch_name] = [
262 oldsizehashes,
263 difsizehashes,
264 difgzsizehashes,
265 ]
266 self._history_order.append(second_patch_name)
268 async def _bump_merged_patches(self):
269 # When bumping patches, we need to "rewrite" all merged patches. As
270 # neither apt nor dak supports by-hash for pdiffs, we leave the old
271 # versions of merged pdiffs behind.
272 target_name = self._unmerged_history_order[-1]
273 target_path = os.path.join(self.patches_dir, target_name)
275 new_merged_order = []
276 new_merged_history = {}
277 for old_merged_patch_name in self._history_order:
278 try:
279 old_orig_name = old_merged_patch_name.split("-F-", 1)[1]
280 except IndexError:
281 old_orig_name = old_merged_patch_name
282 new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name)
283 old_merged_patch_path = os.path.join(
284 self.patches_dir, old_merged_patch_name
285 )
286 new_merged_patch_path = os.path.join(
287 self.patches_dir, new_merged_patch_name
288 )
289 await _merge_pdiffs(
290 old_merged_patch_path, target_path, new_merged_patch_path
291 )
293 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(
294 new_merged_patch_path
295 )
297 new_merged_history[new_merged_patch_name] = [
298 self._history[old_merged_patch_name][0],
299 hashes_decompressed,
300 hashes_compressed,
301 ]
302 new_merged_order.append(new_merged_patch_name)
304 self._history_order = new_merged_order
305 self._history = new_merged_history
307 self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
309 def _convert_to_unmerged(self):
310 if not self.has_merged_pdiffs: 310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true
311 return
312 # Converting from merged patches to unmerged patches is simply. Discard the merged
313 # patches. Cleanup will be handled by find_obsolete_patches
314 self._history = {k: v for k, v in self._unmerged_history.items()}
315 self._history_order = list(self._unmerged_history_order)
316 self._old_merged_patches_prefix = []
317 self.has_merged_pdiffs = False
319 async def _convert_to_merged_patches(self):
320 if self.has_merged_pdiffs: 320 ↛ 321line 320 didn't jump to line 321, because the condition on line 320 was never true
321 return
323 target_name = self._unmerged_history_order[-1]
325 self._history = {}
326 self._history_order = []
328 new_patches = []
330 # We merge from newest to oldest
331 #
332 # Assume we got N unmerged patches (u1 - uN) where given s1 then
333 # you can apply u1 to get to s2. From s2 you use u2 to move to s3
334 # and so on until you reach your target T (= sN+1).
335 #
336 # In the merged patch world, we want N merged patches called m1-N,
337 # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to
338 # T directly regardless of where you start.
339 #
340 # A note worthy special case is that m(N-1)-N is identical uN
341 # content-wise. This will be important in a moment. For now,
342 # lets start with looking at creating merged patches.
343 #
344 # We can get m1-N by merging u1 with m2-N because u1 will take s1
345 # to s2 and m2-N will take s2 to T. By the same argument, we get
346 # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until
347 # we get to the base-case m(N-1)-N - which is uN.
348 #
349 # From this, we can conclude that generating the patches in
350 # reverse order (i.e. m2-N is generated before m1-N) will get
351 # us the desired result in N-1 pair-wise merges without having
352 # to use all patches in one go. (This is also optimal in the
353 # sense that we need to update N-1 patches to preserve the
354 # entire history).
355 #
356 for patch_name in reversed(self._unmerged_history_order):
357 merged_patch = "T-%s-F-%s" % (target_name, patch_name)
358 merged_patch_path = os.path.join(self.patches_dir, merged_patch)
360 if new_patches:
361 oldest_patch = os.path.join(self.patches_dir, patch_name)
362 previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1])
363 await _merge_pdiffs(
364 oldest_patch, previous_merged_patch, merged_patch_path
365 )
367 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(
368 merged_patch_path
369 )
371 self._history[merged_patch] = [
372 self._unmerged_history[patch_name][0],
373 hashes_decompressed,
374 hashes_compressed,
375 ]
376 else:
377 # Special_case; the latest patch is its own "merged" variant.
378 os.link(
379 os.path.join(self.patches_dir, patch_name + ".gz"),
380 merged_patch_path + ".gz",
381 )
382 self._history[merged_patch] = self._unmerged_history[patch_name]
384 new_patches.append(merged_patch)
386 self._history_order = list(reversed(new_patches))
387 self._old_merged_patches_prefix.append(target_name)
388 self.has_merged_pdiffs = True
390 def read_index_file(self, index_file_path):
391 try:
392 with apt_pkg.TagFile(index_file_path) as index:
393 index.step()
394 section = index.section
395 self.has_merged_pdiffs = section.get("X-Patch-Precedence") == "merged"
396 self._old_merged_patches_prefix = section.get(
397 "X-DAK-Older-Patches", ""
398 ).split()
400 for field in section.keys():
401 value = section[field]
402 if field in HASH_FIELDS_TABLE:
403 ind, hashind, primary_history = HASH_FIELDS_TABLE[field]
404 if primary_history:
405 history = self._history
406 history_order = self._history_order
407 else:
408 history = self._unmerged_history
409 history_order = self._unmerged_history_order
411 if history_order is None:
412 # History is already misaligned and we cannot find a common restore point.
413 continue
415 new_order = _read_hashes(
416 history, history_order, ind, hashind, value.splitlines()
417 )
418 if primary_history:
419 self._history_order = new_order
420 else:
421 self._unmerged_history_order = new_order
422 continue
424 if field in ("Canonical-Name", "Canonical-Path"):
425 self.can_path = value
426 continue
428 if field not in ("SHA1-Current", "SHA256-Current"):
429 continue
431 columns = value.split()
433 if len(columns) != 2: 433 ↛ 434line 433 didn't jump to line 434, because the condition on line 433 was never true
434 continue
436 if not self.filesizehashes:
437 self.filesizehashes = PDiffHashes(int(columns[1]), None, None)
439 if field == "SHA1-Current":
440 self.filesizehashes = PDiffHashes(
441 self.filesizehashes.size,
442 columns[0],
443 self.filesizehashes.sha256,
444 )
446 if field == "SHA256-Current":
447 self.filesizehashes = PDiffHashes(
448 self.filesizehashes.size,
449 self.filesizehashes.sha1,
450 columns[0],
451 )
453 # Ensure that the order lists are defined again.
454 if self._history_order is None:
455 self._history_order = []
456 if self._unmerged_history_order is None: 456 ↛ 457line 456 didn't jump to line 457, because the condition on line 456 was never true
457 self._unmerged_history_order = []
459 if not self.has_merged_pdiffs:
460 # When X-Patch-Precedence != merged, then the two histories are the same.
461 self._unmerged_history = {k: v for k, v in self._history.items()}
462 self._unmerged_history_order = list(self._history_order)
463 self._old_merged_patches_prefix = []
465 except (OSError, apt_pkg.Error):
466 # On error, we ignore everything. This causes the file to be regenerated from scratch.
467 # It forces everyone to download the full file for if they are behind.
468 # But it is self-healing providing that we generate valid files from here on.
469 pass
471 def prune_patch_history(self):
472 # Truncate our history if necessary
473 hs = self._history
474 order = self._history_order
475 unmerged_hs = self._unmerged_history
476 unmerged_order = self._unmerged_history_order
477 self._history_order = _prune_history(order, hs, self.max)
478 self._unmerged_history_order = _prune_history(
479 unmerged_order, unmerged_hs, self.max
480 )
482 prefix_cnt = len(self._old_merged_patches_prefix)
483 if prefix_cnt > 3:
484 self._old_merged_patches_prefix = self._old_merged_patches_prefix[
485 prefix_cnt - 3 :
486 ]
488 def find_obsolete_patches(self):
489 if not os.path.isdir(self.patches_dir):
490 return
492 hs = self._history
493 unmerged_hs = self._unmerged_history
495 keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix)
497 # Scan for obsolete patches. While we could have computed these
498 # from the history, this method has the advantage of cleaning up
499 # old patches left that we failed to remove previously (e.g. if
500 # we had an index corruption, which happened in fed7ada36b609 and
501 # was later fixed in a36f867acf029)
502 for name in os.listdir(self.patches_dir):
503 if name in ("Index", "by-hash"):
504 continue
505 # We keep some old merged patches around (as neither apt nor
506 # dak supports by-hash for pdiffs)
507 if keep_prefixes and name.startswith(keep_prefixes):
508 continue
509 basename, ext = os.path.splitext(name)
510 if ext in ("", ".gz") and (basename in hs or basename in unmerged_hs):
511 continue
512 path = os.path.join(self.patches_dir, name)
513 if not os.path.isfile(path): 513 ↛ 515line 513 didn't jump to line 515, because the condition on line 513 was never true
514 # Non-files are probably not patches.
515 continue
516 # Unknown patch file; flag it as obsolete
517 yield path
519 def dump(self, out=sys.stdout):
520 if self.can_path:
521 out.write("Canonical-Path: %s\n" % self.can_path)
523 if self.filesizehashes: 523 ↛ 535line 523 didn't jump to line 535, because the condition on line 523 was never false
524 if self.filesizehashes.sha1: 524 ↛ 529line 524 didn't jump to line 529, because the condition on line 524 was never false
525 out.write(
526 "SHA1-Current: %s %7d\n"
527 % (self.filesizehashes.sha1, self.filesizehashes.size)
528 )
529 if self.filesizehashes.sha256: 529 ↛ 535line 529 didn't jump to line 535, because the condition on line 529 was never false
530 out.write(
531 "SHA256-Current: %s %7d\n"
532 % (self.filesizehashes.sha256, self.filesizehashes.size)
533 )
535 for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS:
537 if primary_history:
538 hs = self._history
539 order = self._history_order
540 elif self.has_merged_pdiffs: 540 ↛ 544line 540 didn't jump to line 544, because the condition on line 540 was never false
541 hs = self._unmerged_history
542 order = self._unmerged_history_order
543 else:
544 continue
546 out.write("%s:\n" % fieldname)
547 for h in order:
548 if hs[h][ind] and hs[h][ind][hashind]: 548 ↛ 547line 548 didn't jump to line 547, because the condition on line 548 was never false
549 out.write(
550 " %s %7d %s%s\n"
551 % (hs[h][ind][hashind], hs[h][ind].size, h, ext)
552 )
554 if self.has_merged_pdiffs:
555 out.write("X-Patch-Precedence: merged\n")
556 if self._old_merged_patches_prefix: 556 ↛ exitline 556 didn't return from function 'dump', because the condition on line 556 was never false
557 out.write(
558 "X-DAK-Older-Patches: %s\n"
559 % " ".join(self._old_merged_patches_prefix)
560 )
562 def update_index(self, tmp_suffix=".new"):
563 if not os.path.isdir(self.patches_dir):
564 # If there is no patch directory, then we have no patches.
565 # It seems weird to have an Index of patches when we know there are
566 # none.
567 return
568 tmp_path = self.index_path + tmp_suffix
569 with open(tmp_path, "w") as f:
570 self.dump(f)
571 os.rename(tmp_path, self.index_path)