Package daklib :: Module pdiff
[hide private]
[frames] | no frames]

Source Code for Module daklib.pdiff

  1  import asyncio 
  2  import collections 
  3  import os 
  4  import subprocess 
  5  import sys 
  6  import tempfile 
  7   
  8  import apt_pkg 
  9   
 10  from daklib.dakapt import DakHashes 
 11   
 12  HASH_FIELDS = [ 
 13      ('SHA1-History', 0, 1, "", True), 
 14      ('SHA256-History', 0, 2, "", True), 
 15      ('SHA1-Patches', 1, 1, "", True), 
 16      ('SHA256-Patches', 1, 2, "", True), 
 17      ('SHA1-Download', 2, 1, ".gz", True), 
 18      ('SHA256-Download', 2, 2, ".gz", True), 
 19      ('X-Unmerged-SHA1-History', 0, 1, "", False), 
 20      ('X-Unmerged-SHA256-History', 0, 2, "", False), 
 21      ('X-Unmerged-SHA1-Patches', 1, 1, "", False), 
 22      ('X-Unmerged-SHA256-Patches', 1, 2, "", False), 
 23      ('X-Unmerged-SHA1-Download', 2, 1, ".gz", False), 
 24      ('X-Unmerged-SHA256-Download', 2, 2, ".gz", False), 
 25  ] 
 26   
 27  HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS} 
 28   
 29  _PDiffHashes = collections.namedtuple('_PDiffHashes', ['size', 'sha1', 'sha256']) 
30 31 32 -async def asyncio_check_call(*args, **kwargs):
33 """async variant of subprocess.check_call 34 35 Parameters reflect that of asyncio.create_subprocess_exec or 36 (if "shell=True") that of asyncio.create_subprocess_shell 37 with restore_signals=True being the default. 38 """ 39 kwargs.setdefault('restore_signals', True) 40 shell = kwargs.pop('shell', False) 41 if shell: 42 proc = await asyncio.create_subprocess_shell(*args, **kwargs) 43 else: 44 proc = await asyncio.create_subprocess_exec(*args, **kwargs) 45 retcode = await proc.wait() 46 if retcode != 0: 47 raise subprocess.CalledProcessError(retcode, args[0]) 48 return 0
49
50 51 -async def open_decompressed(file, named_temp_file=False):
52 async def call_decompressor(cmd, inpath): 53 fh = tempfile.NamedTemporaryFile("w+") if named_temp_file \ 54 else tempfile.TemporaryFile("w+") 55 with open(inpath, "rb") as rfd: 56 await asyncio_check_call( 57 *cmd, 58 stdin=rfd, 59 stdout=fh, 60 ) 61 fh.seek(0) 62 return fh
63 64 if os.path.isfile(file): 65 return open(file, "r") 66 elif os.path.isfile("%s.gz" % file): 67 return await call_decompressor(['zcat'], '{}.gz'.format(file)) 68 elif os.path.isfile("%s.bz2" % file): 69 return await call_decompressor(['bzcat'], '{}.bz2'.format(file)) 70 elif os.path.isfile("%s.xz" % file): 71 return await call_decompressor(['xzcat'], '{}.xz'.format(file)) 72 elif os.path.isfile(f"{file}.zst"): 73 return await call_decompressor(['zstdcat'], f'{file}.zst') 74 else: 75 return None 76
77 78 -async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
79 """Merge two pdiff in to a merged pdiff 80 81 While rred support merging more than 2, we only need support for merging two. 82 In the steady state, we will have N merged patches plus 1 new patch. Here 83 we need to do N pairwise merges (i.e. merge two patches N times). 84 Therefore, supporting merging of 3+ patches does not help at all. 85 86 The setup state looks like it could do with a bulk merging. However, if you 87 merge from "latest to earliest" then you will be building in optimal order 88 and still only need to do N-1 pairwise merges (rather than N-1 merges 89 between N, N-1, N-2, ... 3, 2 patches). 90 91 Combined, supporting pairwise merges is sufficient for our use case. 92 """ 93 with await open_decompressed(patch_a, named_temp_file=True) as fd_a, \ 94 await open_decompressed(patch_b, named_temp_file=True) as fd_b: 95 await asyncio_check_call( 96 '/usr/lib/apt/methods/rred %s %s | gzip -9n > %s' % (fd_a.name, fd_b.name, 97 resulting_patch_without_extension + ".gz"), 98 shell=True, 99 )
100
101 102 -class PDiffHashes(_PDiffHashes):
103 104 @classmethod
105 - def from_file(cls, fd):
106 size = os.fstat(fd.fileno())[6] 107 hashes = DakHashes(fd) 108 return cls(size, hashes.sha1, hashes.sha256)
109
110 111 -async def _pdiff_hashes_from_patch(path_without_extension):
112 with await open_decompressed(path_without_extension) as difff: 113 hashes_decompressed = PDiffHashes.from_file(difff) 114 115 with open(path_without_extension + ".gz", "r") as difffgz: 116 hashes_compressed = PDiffHashes.from_file(difffgz) 117 118 return hashes_decompressed, hashes_compressed
119
120 121 -def _prune_history(order, history, maximum):
122 cnt = len(order) 123 if cnt <= maximum: 124 return order 125 for h in order[:cnt - maximum]: 126 del history[h] 127 return order[cnt - maximum:]
128
129 130 -def _read_hashes(history, history_order, ind, hashind, lines):
131 current_order = [] 132 for line in lines: 133 parts = line.split() 134 fname = parts[2] 135 if fname.endswith('.gz'): 136 fname = fname[:-3] 137 current_order.append(fname) 138 if fname not in history: 139 history[fname] = [None, None, None] 140 if not history[fname][ind]: 141 history[fname][ind] = PDiffHashes(int(parts[1]), None, None) 142 if hashind == 1: 143 history[fname][ind] = PDiffHashes(history[fname][ind].size, 144 parts[0], 145 history[fname][ind].sha256, 146 ) 147 else: 148 history[fname][ind] = PDiffHashes(history[fname][ind].size, 149 history[fname][ind].sha1, 150 parts[0], 151 ) 152 153 # Common-case: Either this is the first sequence we read and we 154 # simply adopt that 155 if not history_order: 156 return current_order 157 # Common-case: The current history perfectly matches the existing, so 158 # we just stop here. 159 if current_order == history_order: 160 return history_order 161 162 # Special-case, the histories are not aligned. This "should not happen" 163 # but has done so in the past due to bugs. Depending on which field is 164 # out of sync, dak would either self heal or be stuff forever. We 165 # realign the history to ensure we always end with "self-heal". 166 # 167 # Typically, the patches are aligned from the end as we always add a 168 # patch in the end of the series. 169 patches_from_the_end = 0 170 for p1, p2 in zip(reversed(current_order), reversed(history_order)): 171 if p1 == p2: 172 patches_from_the_end += 1 173 else: 174 break 175 176 if not patches_from_the_end: 177 return None 178 179 return current_order[-patches_from_the_end:]
180
181 182 -class PDiffIndex:
183 - def __init__(self, patches_dir, max=56, merge_pdiffs=False):
184 self.can_path = None 185 self._history = {} 186 self._history_order = [] 187 self._unmerged_history = {} 188 self._unmerged_history_order = [] 189 self._old_merged_patches_prefix = [] 190 self.max = max 191 self.patches_dir = patches_dir 192 self.filesizehashes = None 193 self.wants_merged_pdiffs = merge_pdiffs 194 self.has_merged_pdiffs = False 195 self.index_path = os.path.join(patches_dir, 'Index') 196 self.read_index_file(self.index_path)
197
198 - async def generate_and_add_patch_file(self, original_file, new_file_uncompressed, patch_name):
199 200 with await open_decompressed(original_file) as oldf: 201 oldsizehashes = PDiffHashes.from_file(oldf) 202 203 with open(new_file_uncompressed, "r") as newf: 204 newsizehashes = PDiffHashes.from_file(newf) 205 206 if newsizehashes == oldsizehashes: 207 return 208 209 if not os.path.isdir(self.patches_dir): 210 os.mkdir(self.patches_dir) 211 212 oldf.seek(0) 213 patch_path = os.path.join(self.patches_dir, patch_name) 214 with open("{}.gz".format(patch_path), "wb") as fh: 215 await asyncio_check_call( 216 "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(new_file_uncompressed), 217 shell=True, 218 stdin=oldf, 219 stdout=fh 220 ) 221 222 difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path) 223 224 self.filesizehashes = newsizehashes 225 self._unmerged_history[patch_name] = [oldsizehashes, 226 difsizehashes, 227 difgzsizehashes, 228 ] 229 self._unmerged_history_order.append(patch_name) 230 231 if self.has_merged_pdiffs != self.wants_merged_pdiffs: 232 # Convert patches 233 if self.wants_merged_pdiffs: 234 await self._convert_to_merged_patches() 235 else: 236 self._convert_to_unmerged() 237 # Conversion also covers the newly added patch. Accordingly, 238 # the elif here. 239 else: 240 second_patch_name = patch_name 241 if self.wants_merged_pdiffs: 242 await self._bump_merged_patches() 243 second_patch_name = "T-%s-F-%s" % (patch_name, patch_name) 244 os.link(os.path.join(self.patches_dir, patch_name + ".gz"), 245 os.path.join(self.patches_dir, second_patch_name + ".gz")) 246 247 # Without merged PDiffs, keep _history and _unmerged_history aligned 248 self._history[second_patch_name] = [oldsizehashes, 249 difsizehashes, 250 difgzsizehashes, 251 ] 252 self._history_order.append(second_patch_name)
253
254 - async def _bump_merged_patches(self):
255 # When bumping patches, we need to "rewrite" all merged patches. As 256 # neither apt nor dak supports by-hash for pdiffs, we leave the old 257 # versions of merged pdiffs behind. 258 target_name = self._unmerged_history_order[-1] 259 target_path = os.path.join(self.patches_dir, target_name) 260 261 new_merged_order = [] 262 new_merged_history = {} 263 for old_merged_patch_name in self._history_order: 264 try: 265 old_orig_name = old_merged_patch_name.split("-F-", 1)[1] 266 except IndexError: 267 old_orig_name = old_merged_patch_name 268 new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name) 269 old_merged_patch_path = os.path.join(self.patches_dir, old_merged_patch_name) 270 new_merged_patch_path = os.path.join(self.patches_dir, new_merged_patch_name) 271 await _merge_pdiffs(old_merged_patch_path, target_path, new_merged_patch_path) 272 273 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(new_merged_patch_path) 274 275 new_merged_history[new_merged_patch_name] = [self._history[old_merged_patch_name][0], 276 hashes_decompressed, 277 hashes_compressed, 278 ] 279 new_merged_order.append(new_merged_patch_name) 280 281 self._history_order = new_merged_order 282 self._history = new_merged_history 283 284 self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
285
286 - def _convert_to_unmerged(self):
287 if not self.has_merged_pdiffs: 288 return 289 # Converting from merged patches to unmerged patches is simply. Discard the merged 290 # patches. Cleanup will be handled by find_obsolete_patches 291 self._history = {k: v for k, v in self._unmerged_history.items()} 292 self._history_order = list(self._unmerged_history_order) 293 self._old_merged_patches_prefix = [] 294 self.has_merged_pdiffs = False
295
296 - async def _convert_to_merged_patches(self):
297 if self.has_merged_pdiffs: 298 return 299 300 target_name = self._unmerged_history_order[-1] 301 302 self._history = {} 303 self._history_order = [] 304 305 new_patches = [] 306 307 # We merge from newest to oldest 308 # 309 # Assume we got N unmerged patches (u1 - uN) where given s1 then 310 # you can apply u1 to get to s2. From s2 you use u2 to move to s3 311 # and so on until you reach your target T (= sN+1). 312 # 313 # In the merged patch world, we want N merged patches called m1-N, 314 # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to 315 # T directly regardless of where you start. 316 # 317 # A note worthy special case is that m(N-1)-N is identical uN 318 # content-wise. This will be important in a moment. For now, 319 # lets start with looking at creating merged patches. 320 # 321 # We can get m1-N by merging u1 with m2-N because u1 will take s1 322 # to s2 and m2-N will take s2 to T. By the same argument, we get 323 # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until 324 # we get to the base-case m(N-1)-N - which is uN. 325 # 326 # From this, we can conclude that generating the patches in 327 # reverse order (i.e. m2-N is generated before m1-N) will get 328 # us the desired result in N-1 pair-wise merges without having 329 # to use all patches in one go. (This is also optimal in the 330 # sense that we need to update N-1 patches to preserve the 331 # entire history). 332 # 333 for patch_name in reversed(self._unmerged_history_order): 334 merged_patch = "T-%s-F-%s" % (target_name, patch_name) 335 merged_patch_path = os.path.join(self.patches_dir, merged_patch) 336 337 if new_patches: 338 oldest_patch = os.path.join(self.patches_dir, patch_name) 339 previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1]) 340 await _merge_pdiffs(oldest_patch, previous_merged_patch, merged_patch_path) 341 342 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(merged_patch_path) 343 344 self._history[merged_patch] = [self._unmerged_history[patch_name][0], 345 hashes_decompressed, 346 hashes_compressed, 347 ] 348 else: 349 # Special_case; the latest patch is its own "merged" variant. 350 os.link(os.path.join(self.patches_dir, patch_name + ".gz"), merged_patch_path + ".gz") 351 self._history[merged_patch] = self._unmerged_history[patch_name] 352 353 new_patches.append(merged_patch) 354 355 self._history_order = list(reversed(new_patches)) 356 self._old_merged_patches_prefix.append(target_name) 357 self.has_merged_pdiffs = True
358
359 - def read_index_file(self, index_file_path):
360 try: 361 with apt_pkg.TagFile(index_file_path) as index: 362 index.step() 363 section = index.section 364 self.has_merged_pdiffs = section.get('X-Patch-Precedence') == 'merged' 365 self._old_merged_patches_prefix = section.get('X-DAK-Older-Patches', '').split() 366 367 for field in section.keys(): 368 value = section[field] 369 if field in HASH_FIELDS_TABLE: 370 ind, hashind, primary_history = HASH_FIELDS_TABLE[field] 371 if primary_history: 372 history = self._history 373 history_order = self._history_order 374 else: 375 history = self._unmerged_history 376 history_order = self._unmerged_history_order 377 378 if history_order is None: 379 # History is already misaligned and we cannot find a common restore point. 380 continue 381 382 new_order = _read_hashes(history, history_order, ind, hashind, value.splitlines()) 383 if primary_history: 384 self._history_order = new_order 385 else: 386 self._unmerged_history_order = new_order 387 continue 388 389 if field in ("Canonical-Name", "Canonical-Path"): 390 self.can_path = value 391 continue 392 393 if field not in ("SHA1-Current", "SHA256-Current"): 394 continue 395 396 l = value.split() 397 398 if len(l) != 2: 399 continue 400 401 if not self.filesizehashes: 402 self.filesizehashes = PDiffHashes(int(l[1]), None, None) 403 404 if field == "SHA1-Current": 405 self.filesizehashes = PDiffHashes(self.filesizehashes.size, l[0], self.filesizehashes.sha256) 406 407 if field == "SHA256-Current": 408 self.filesizehashes = PDiffHashes(self.filesizehashes.size, self.filesizehashes.sha1, l[0]) 409 410 # Ensure that the order lists are defined again. 411 if self._history_order is None: 412 self._history_order = [] 413 if self._unmerged_history_order is None: 414 self._unmerged_history_order = [] 415 416 if not self.has_merged_pdiffs: 417 # When X-Patch-Precedence != merged, then the two histories are the same. 418 self._unmerged_history = {k: v for k, v in self._history.items()} 419 self._unmerged_history_order = list(self._history_order) 420 self._old_merged_patches_prefix = [] 421 422 except (OSError, apt_pkg.Error): 423 # On error, we ignore everything. This causes the file to be regenerated from scratch. 424 # It forces everyone to download the full file for if they are behind. 425 # But it is self-healing providing that we generate valid files from here on. 426 pass
427
428 - def prune_patch_history(self):
429 # Truncate our history if necessary 430 hs = self._history 431 order = self._history_order 432 unmerged_hs = self._unmerged_history 433 unmerged_order = self._unmerged_history_order 434 self._history_order = _prune_history(order, hs, self.max) 435 self._unmerged_history_order = _prune_history(unmerged_order, unmerged_hs, self.max) 436 437 prefix_cnt = len(self._old_merged_patches_prefix) 438 if prefix_cnt > 3: 439 self._old_merged_patches_prefix = self._old_merged_patches_prefix[prefix_cnt - 3:]
440
441 - def find_obsolete_patches(self):
442 if not os.path.isdir(self.patches_dir): 443 return 444 445 hs = self._history 446 unmerged_hs = self._unmerged_history 447 448 keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix) 449 450 # Scan for obsolete patches. While we could have computed these 451 # from the history, this method has the advantage of cleaning up 452 # old patches left that we failed to remove previously (e.g. if 453 # we had an index corruption, which happened in fed7ada36b609 and 454 # was later fixed in a36f867acf029) 455 for name in os.listdir(self.patches_dir): 456 if name in ('Index', 'by-hash'): 457 continue 458 # We keep some old merged patches around (as neither apt nor 459 # dak supports by-hash for pdiffs) 460 if keep_prefixes and name.startswith(keep_prefixes): 461 continue 462 basename, ext = os.path.splitext(name) 463 if ext in ('', '.gz') and (basename in hs or basename in unmerged_hs): 464 continue 465 path = os.path.join(self.patches_dir, name) 466 if not os.path.isfile(path): 467 # Non-files are probably not patches. 468 continue 469 # Unknown patch file; flag it as obsolete 470 yield path
471
472 - def dump(self, out=sys.stdout):
473 if self.can_path: 474 out.write("Canonical-Path: %s\n" % self.can_path) 475 476 if self.filesizehashes: 477 if self.filesizehashes.sha1: 478 out.write("SHA1-Current: %s %7d\n" % (self.filesizehashes.sha1, self.filesizehashes.size)) 479 if self.filesizehashes.sha256: 480 out.write("SHA256-Current: %s %7d\n" % (self.filesizehashes.sha256, self.filesizehashes.size)) 481 482 for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS: 483 484 if primary_history: 485 hs = self._history 486 order = self._history_order 487 elif self.has_merged_pdiffs: 488 hs = self._unmerged_history 489 order = self._unmerged_history_order 490 else: 491 continue 492 493 out.write("%s:\n" % fieldname) 494 for h in order: 495 if hs[h][ind] and hs[h][ind][hashind]: 496 out.write(" %s %7d %s%s\n" % (hs[h][ind][hashind], hs[h][ind].size, h, ext)) 497 498 if self.has_merged_pdiffs: 499 out.write("X-Patch-Precedence: merged\n") 500 if self._old_merged_patches_prefix: 501 out.write("X-DAK-Older-Patches: %s\n" % " ".join(self._old_merged_patches_prefix))
502
503 - def update_index(self, tmp_suffix=".new"):
504 if not os.path.isdir(self.patches_dir): 505 # If there is no patch directory, then we have no patches. 506 # It seems weird to have an Index of patches when we know there are 507 # none. 508 return 509 tmp_path = self.index_path + tmp_suffix 510 with open(tmp_path, "w") as f: 511 self.dump(f) 512 os.rename(tmp_path, self.index_path)
513