1import asyncio
2import collections
3import os
4import subprocess
5import sys
6import tempfile
8import apt_pkg
10from daklib.dakapt import DakHashes
12HASH_FIELDS = [
13 ('SHA1-History', 0, 1, "", True),
14 ('SHA256-History', 0, 2, "", True),
15 ('SHA1-Patches', 1, 1, "", True),
16 ('SHA256-Patches', 1, 2, "", True),
17 ('SHA1-Download', 2, 1, ".gz", True),
18 ('SHA256-Download', 2, 2, ".gz", True),
19 ('X-Unmerged-SHA1-History', 0, 1, "", False),
20 ('X-Unmerged-SHA256-History', 0, 2, "", False),
21 ('X-Unmerged-SHA1-Patches', 1, 1, "", False),
22 ('X-Unmerged-SHA256-Patches', 1, 2, "", False),
23 ('X-Unmerged-SHA1-Download', 2, 1, ".gz", False),
24 ('X-Unmerged-SHA256-Download', 2, 2, ".gz", False),
25]
27HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS}
29_PDiffHashes = collections.namedtuple('_PDiffHashes', ['size', 'sha1', 'sha256'])
32async def asyncio_check_call(*args, **kwargs):
33 """async variant of subprocess.check_call
35 Parameters reflect that of asyncio.create_subprocess_exec or
36 (if "shell=True") that of asyncio.create_subprocess_shell
37 with restore_signals=True being the default.
38 """
39 kwargs.setdefault('restore_signals', True)
40 shell = kwargs.pop('shell', False)
41 if shell:
42 proc = await asyncio.create_subprocess_shell(*args, **kwargs)
43 else:
44 proc = await asyncio.create_subprocess_exec(*args, **kwargs)
45 retcode = await proc.wait()
46 if retcode != 0: 46 ↛ 47line 46 didn't jump to line 47, because the condition on line 46 was never true
47 raise subprocess.CalledProcessError(retcode, args[0])
48 return 0
51async def open_decompressed(file, named_temp_file=False):
52 async def call_decompressor(cmd, inpath):
53 fh = tempfile.NamedTemporaryFile("w+") if named_temp_file \
54 else tempfile.TemporaryFile("w+")
55 with open(inpath, "rb") as rfd:
56 await asyncio_check_call(
57 *cmd,
58 stdin=rfd,
59 stdout=fh,
60 )
61 fh.seek(0)
62 return fh
64 if os.path.isfile(file):
65 return open(file, "r")
66 elif os.path.isfile("%s.gz" % file):
67 return await call_decompressor(['zcat'], '{}.gz'.format(file))
68 elif os.path.isfile("%s.bz2" % file): 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true
69 return await call_decompressor(['bzcat'], '{}.bz2'.format(file))
70 elif os.path.isfile("%s.xz" % file): 70 ↛ 72line 70 didn't jump to line 72, because the condition on line 70 was never false
71 return await call_decompressor(['xzcat'], '{}.xz'.format(file))
72 elif os.path.isfile(f"{file}.zst"):
73 return await call_decompressor(['zstdcat'], f'{file}.zst')
74 else:
75 return None
78async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
79 """Merge two pdiff in to a merged pdiff
81 While rred support merging more than 2, we only need support for merging two.
82 In the steady state, we will have N merged patches plus 1 new patch. Here
83 we need to do N pairwise merges (i.e. merge two patches N times).
84 Therefore, supporting merging of 3+ patches does not help at all.
86 The setup state looks like it could do with a bulk merging. However, if you
87 merge from "latest to earliest" then you will be building in optimal order
88 and still only need to do N-1 pairwise merges (rather than N-1 merges
89 between N, N-1, N-2, ... 3, 2 patches).
91 Combined, supporting pairwise merges is sufficient for our use case.
92 """
93 with await open_decompressed(patch_a, named_temp_file=True) as fd_a, \
94 await open_decompressed(patch_b, named_temp_file=True) as fd_b:
95 await asyncio_check_call(
96 '/usr/lib/apt/methods/rred %s %s | gzip -9n > %s' % (fd_a.name, fd_b.name,
97 resulting_patch_without_extension + ".gz"),
98 shell=True,
99 )
102class PDiffHashes(_PDiffHashes):
104 @classmethod
105 def from_file(cls, fd):
106 size = os.fstat(fd.fileno())[6]
107 hashes = DakHashes(fd)
108 return cls(size, hashes.sha1, hashes.sha256)
111async def _pdiff_hashes_from_patch(path_without_extension):
112 with await open_decompressed(path_without_extension) as difff:
113 hashes_decompressed = PDiffHashes.from_file(difff)
115 with open(path_without_extension + ".gz", "r") as difffgz:
116 hashes_compressed = PDiffHashes.from_file(difffgz)
118 return hashes_decompressed, hashes_compressed
121def _prune_history(order, history, maximum):
122 cnt = len(order)
123 if cnt <= maximum:
124 return order
125 for h in order[:cnt - maximum]:
126 del history[h]
127 return order[cnt - maximum:]
130def _read_hashes(history, history_order, ind, hashind, lines):
131 current_order = []
132 for line in lines:
133 parts = line.split()
134 fname = parts[2]
135 if fname.endswith('.gz'):
136 fname = fname[:-3]
137 current_order.append(fname)
138 if fname not in history:
139 history[fname] = [None, None, None]
140 if not history[fname][ind]:
141 history[fname][ind] = PDiffHashes(int(parts[1]), None, None)
142 if hashind == 1:
143 history[fname][ind] = PDiffHashes(history[fname][ind].size,
144 parts[0],
145 history[fname][ind].sha256,
146 )
147 else:
148 history[fname][ind] = PDiffHashes(history[fname][ind].size,
149 history[fname][ind].sha1,
150 parts[0],
151 )
153 # Common-case: Either this is the first sequence we read and we
154 # simply adopt that
155 if not history_order:
156 return current_order
157 # Common-case: The current history perfectly matches the existing, so
158 # we just stop here.
159 if current_order == history_order:
160 return history_order
162 # Special-case, the histories are not aligned. This "should not happen"
163 # but has done so in the past due to bugs. Depending on which field is
164 # out of sync, dak would either self heal or be stuff forever. We
165 # realign the history to ensure we always end with "self-heal".
166 #
167 # Typically, the patches are aligned from the end as we always add a
168 # patch in the end of the series.
169 patches_from_the_end = 0
170 for p1, p2 in zip(reversed(current_order), reversed(history_order)):
171 if p1 == p2:
172 patches_from_the_end += 1
173 else:
174 break
176 if not patches_from_the_end:
177 return None
179 return current_order[-patches_from_the_end:]
182class PDiffIndex:
183 def __init__(self, patches_dir, max=56, merge_pdiffs=False):
184 self.can_path = None
185 self._history = {}
186 self._history_order = []
187 self._unmerged_history = {}
188 self._unmerged_history_order = []
189 self._old_merged_patches_prefix = []
190 self.max = max
191 self.patches_dir = patches_dir
192 self.filesizehashes = None
193 self.wants_merged_pdiffs = merge_pdiffs
194 self.has_merged_pdiffs = False
195 self.index_path = os.path.join(patches_dir, 'Index')
196 self.read_index_file(self.index_path)
198 async def generate_and_add_patch_file(self, original_file, new_file_uncompressed, patch_name):
200 with await open_decompressed(original_file) as oldf:
201 oldsizehashes = PDiffHashes.from_file(oldf)
203 with open(new_file_uncompressed, "r") as newf:
204 newsizehashes = PDiffHashes.from_file(newf)
206 if newsizehashes == oldsizehashes:
207 return
209 if not os.path.isdir(self.patches_dir):
210 os.mkdir(self.patches_dir)
212 oldf.seek(0)
213 patch_path = os.path.join(self.patches_dir, patch_name)
214 with open("{}.gz".format(patch_path), "wb") as fh:
215 await asyncio_check_call(
216 "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(new_file_uncompressed),
217 shell=True,
218 stdin=oldf,
219 stdout=fh
220 )
222 difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path)
224 self.filesizehashes = newsizehashes
225 self._unmerged_history[patch_name] = [oldsizehashes,
226 difsizehashes,
227 difgzsizehashes,
228 ]
229 self._unmerged_history_order.append(patch_name)
231 if self.has_merged_pdiffs != self.wants_merged_pdiffs:
232 # Convert patches
233 if self.wants_merged_pdiffs:
234 await self._convert_to_merged_patches()
235 else:
236 self._convert_to_unmerged()
237 # Conversion also covers the newly added patch. Accordingly,
238 # the elif here.
239 else:
240 second_patch_name = patch_name
241 if self.wants_merged_pdiffs:
242 await self._bump_merged_patches()
243 second_patch_name = "T-%s-F-%s" % (patch_name, patch_name)
244 os.link(os.path.join(self.patches_dir, patch_name + ".gz"),
245 os.path.join(self.patches_dir, second_patch_name + ".gz"))
247 # Without merged PDiffs, keep _history and _unmerged_history aligned
248 self._history[second_patch_name] = [oldsizehashes,
249 difsizehashes,
250 difgzsizehashes,
251 ]
252 self._history_order.append(second_patch_name)
254 async def _bump_merged_patches(self):
255 # When bumping patches, we need to "rewrite" all merged patches. As
256 # neither apt nor dak supports by-hash for pdiffs, we leave the old
257 # versions of merged pdiffs behind.
258 target_name = self._unmerged_history_order[-1]
259 target_path = os.path.join(self.patches_dir, target_name)
261 new_merged_order = []
262 new_merged_history = {}
263 for old_merged_patch_name in self._history_order:
264 try:
265 old_orig_name = old_merged_patch_name.split("-F-", 1)[1]
266 except IndexError:
267 old_orig_name = old_merged_patch_name
268 new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name)
269 old_merged_patch_path = os.path.join(self.patches_dir, old_merged_patch_name)
270 new_merged_patch_path = os.path.join(self.patches_dir, new_merged_patch_name)
271 await _merge_pdiffs(old_merged_patch_path, target_path, new_merged_patch_path)
273 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(new_merged_patch_path)
275 new_merged_history[new_merged_patch_name] = [self._history[old_merged_patch_name][0],
276 hashes_decompressed,
277 hashes_compressed,
278 ]
279 new_merged_order.append(new_merged_patch_name)
281 self._history_order = new_merged_order
282 self._history = new_merged_history
284 self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
286 def _convert_to_unmerged(self):
287 if not self.has_merged_pdiffs: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true
288 return
289 # Converting from merged patches to unmerged patches is simply. Discard the merged
290 # patches. Cleanup will be handled by find_obsolete_patches
291 self._history = {k: v for k, v in self._unmerged_history.items()}
292 self._history_order = list(self._unmerged_history_order)
293 self._old_merged_patches_prefix = []
294 self.has_merged_pdiffs = False
296 async def _convert_to_merged_patches(self):
297 if self.has_merged_pdiffs: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true
298 return
300 target_name = self._unmerged_history_order[-1]
302 self._history = {}
303 self._history_order = []
305 new_patches = []
307 # We merge from newest to oldest
308 #
309 # Assume we got N unmerged patches (u1 - uN) where given s1 then
310 # you can apply u1 to get to s2. From s2 you use u2 to move to s3
311 # and so on until you reach your target T (= sN+1).
312 #
313 # In the merged patch world, we want N merged patches called m1-N,
314 # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to
315 # T directly regardless of where you start.
316 #
317 # A note worthy special case is that m(N-1)-N is identical uN
318 # content-wise. This will be important in a moment. For now,
319 # lets start with looking at creating merged patches.
320 #
321 # We can get m1-N by merging u1 with m2-N because u1 will take s1
322 # to s2 and m2-N will take s2 to T. By the same argument, we get
323 # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until
324 # we get to the base-case m(N-1)-N - which is uN.
325 #
326 # From this, we can conclude that generating the patches in
327 # reverse order (i.e. m2-N is generated before m1-N) will get
328 # us the desired result in N-1 pair-wise merges without having
329 # to use all patches in one go. (This is also optimal in the
330 # sense that we need to update N-1 patches to preserve the
331 # entire history).
332 #
333 for patch_name in reversed(self._unmerged_history_order):
334 merged_patch = "T-%s-F-%s" % (target_name, patch_name)
335 merged_patch_path = os.path.join(self.patches_dir, merged_patch)
337 if new_patches:
338 oldest_patch = os.path.join(self.patches_dir, patch_name)
339 previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1])
340 await _merge_pdiffs(oldest_patch, previous_merged_patch, merged_patch_path)
342 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(merged_patch_path)
344 self._history[merged_patch] = [self._unmerged_history[patch_name][0],
345 hashes_decompressed,
346 hashes_compressed,
347 ]
348 else:
349 # Special_case; the latest patch is its own "merged" variant.
350 os.link(os.path.join(self.patches_dir, patch_name + ".gz"), merged_patch_path + ".gz")
351 self._history[merged_patch] = self._unmerged_history[patch_name]
353 new_patches.append(merged_patch)
355 self._history_order = list(reversed(new_patches))
356 self._old_merged_patches_prefix.append(target_name)
357 self.has_merged_pdiffs = True
359 def read_index_file(self, index_file_path):
360 try:
361 with apt_pkg.TagFile(index_file_path) as index:
362 index.step()
363 section = index.section
364 self.has_merged_pdiffs = section.get('X-Patch-Precedence') == 'merged'
365 self._old_merged_patches_prefix = section.get('X-DAK-Older-Patches', '').split()
367 for field in section.keys():
368 value = section[field]
369 if field in HASH_FIELDS_TABLE:
370 ind, hashind, primary_history = HASH_FIELDS_TABLE[field]
371 if primary_history:
372 history = self._history
373 history_order = self._history_order
374 else:
375 history = self._unmerged_history
376 history_order = self._unmerged_history_order
378 if history_order is None:
379 # History is already misaligned and we cannot find a common restore point.
380 continue
382 new_order = _read_hashes(history, history_order, ind, hashind, value.splitlines())
383 if primary_history:
384 self._history_order = new_order
385 else:
386 self._unmerged_history_order = new_order
387 continue
389 if field in ("Canonical-Name", "Canonical-Path"):
390 self.can_path = value
391 continue
393 if field not in ("SHA1-Current", "SHA256-Current"):
394 continue
396 l = value.split()
398 if len(l) != 2: 398 ↛ 399line 398 didn't jump to line 399, because the condition on line 398 was never true
399 continue
401 if not self.filesizehashes:
402 self.filesizehashes = PDiffHashes(int(l[1]), None, None)
404 if field == "SHA1-Current":
405 self.filesizehashes = PDiffHashes(self.filesizehashes.size, l[0], self.filesizehashes.sha256)
407 if field == "SHA256-Current":
408 self.filesizehashes = PDiffHashes(self.filesizehashes.size, self.filesizehashes.sha1, l[0])
410 # Ensure that the order lists are defined again.
411 if self._history_order is None:
412 self._history_order = []
413 if self._unmerged_history_order is None: 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true
414 self._unmerged_history_order = []
416 if not self.has_merged_pdiffs:
417 # When X-Patch-Precedence != merged, then the two histories are the same.
418 self._unmerged_history = {k: v for k, v in self._history.items()}
419 self._unmerged_history_order = list(self._history_order)
420 self._old_merged_patches_prefix = []
422 except (OSError, apt_pkg.Error):
423 # On error, we ignore everything. This causes the file to be regenerated from scratch.
424 # It forces everyone to download the full file for if they are behind.
425 # But it is self-healing providing that we generate valid files from here on.
426 pass
428 def prune_patch_history(self):
429 # Truncate our history if necessary
430 hs = self._history
431 order = self._history_order
432 unmerged_hs = self._unmerged_history
433 unmerged_order = self._unmerged_history_order
434 self._history_order = _prune_history(order, hs, self.max)
435 self._unmerged_history_order = _prune_history(unmerged_order, unmerged_hs, self.max)
437 prefix_cnt = len(self._old_merged_patches_prefix)
438 if prefix_cnt > 3:
439 self._old_merged_patches_prefix = self._old_merged_patches_prefix[prefix_cnt - 3:]
441 def find_obsolete_patches(self):
442 if not os.path.isdir(self.patches_dir):
443 return
445 hs = self._history
446 unmerged_hs = self._unmerged_history
448 keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix)
450 # Scan for obsolete patches. While we could have computed these
451 # from the history, this method has the advantage of cleaning up
452 # old patches left that we failed to remove previously (e.g. if
453 # we had an index corruption, which happened in fed7ada36b609 and
454 # was later fixed in a36f867acf029)
455 for name in os.listdir(self.patches_dir):
456 if name in ('Index', 'by-hash'):
457 continue
458 # We keep some old merged patches around (as neither apt nor
459 # dak supports by-hash for pdiffs)
460 if keep_prefixes and name.startswith(keep_prefixes):
461 continue
462 basename, ext = os.path.splitext(name)
463 if ext in ('', '.gz') and (basename in hs or basename in unmerged_hs):
464 continue
465 path = os.path.join(self.patches_dir, name)
466 if not os.path.isfile(path): 466 ↛ 468line 466 didn't jump to line 468, because the condition on line 466 was never true
467 # Non-files are probably not patches.
468 continue
469 # Unknown patch file; flag it as obsolete
470 yield path
472 def dump(self, out=sys.stdout):
473 if self.can_path:
474 out.write("Canonical-Path: %s\n" % self.can_path)
476 if self.filesizehashes: 476 ↛ 482line 476 didn't jump to line 482, because the condition on line 476 was never false
477 if self.filesizehashes.sha1: 477 ↛ 479line 477 didn't jump to line 479, because the condition on line 477 was never false
478 out.write("SHA1-Current: %s %7d\n" % (self.filesizehashes.sha1, self.filesizehashes.size))
479 if self.filesizehashes.sha256: 479 ↛ 482line 479 didn't jump to line 482, because the condition on line 479 was never false
480 out.write("SHA256-Current: %s %7d\n" % (self.filesizehashes.sha256, self.filesizehashes.size))
482 for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS:
484 if primary_history:
485 hs = self._history
486 order = self._history_order
487 elif self.has_merged_pdiffs: 487 ↛ 491line 487 didn't jump to line 491, because the condition on line 487 was never false
488 hs = self._unmerged_history
489 order = self._unmerged_history_order
490 else:
491 continue
493 out.write("%s:\n" % fieldname)
494 for h in order:
495 if hs[h][ind] and hs[h][ind][hashind]: 495 ↛ 494line 495 didn't jump to line 494, because the condition on line 495 was never false
496 out.write(" %s %7d %s%s\n" % (hs[h][ind][hashind], hs[h][ind].size, h, ext))
498 if self.has_merged_pdiffs:
499 out.write("X-Patch-Precedence: merged\n")
500 if self._old_merged_patches_prefix: 500 ↛ exitline 500 didn't return from function 'dump', because the condition on line 500 was never false
501 out.write("X-DAK-Older-Patches: %s\n" % " ".join(self._old_merged_patches_prefix))
503 def update_index(self, tmp_suffix=".new"):
504 if not os.path.isdir(self.patches_dir):
505 # If there is no patch directory, then we have no patches.
506 # It seems weird to have an Index of patches when we know there are
507 # none.
508 return
509 tmp_path = self.index_path + tmp_suffix
510 with open(tmp_path, "w") as f:
511 self.dump(f)
512 os.rename(tmp_path, self.index_path)