Package daklib :: Module pdiff
[hide private]
[frames] | no frames]

Source Code for Module daklib.pdiff

  1  import asyncio 
  2  import collections 
  3  import os 
  4  import subprocess 
  5  import sys 
  6  import tempfile 
  7   
  8  import apt_pkg 
  9   
 10  from daklib.dakapt import DakHashes 
 11   
 12  HASH_FIELDS = [ 
 13      ("SHA1-History", 0, 1, "", True), 
 14      ("SHA256-History", 0, 2, "", True), 
 15      ("SHA1-Patches", 1, 1, "", True), 
 16      ("SHA256-Patches", 1, 2, "", True), 
 17      ("SHA1-Download", 2, 1, ".gz", True), 
 18      ("SHA256-Download", 2, 2, ".gz", True), 
 19      ("X-Unmerged-SHA1-History", 0, 1, "", False), 
 20      ("X-Unmerged-SHA256-History", 0, 2, "", False), 
 21      ("X-Unmerged-SHA1-Patches", 1, 1, "", False), 
 22      ("X-Unmerged-SHA256-Patches", 1, 2, "", False), 
 23      ("X-Unmerged-SHA1-Download", 2, 1, ".gz", False), 
 24      ("X-Unmerged-SHA256-Download", 2, 2, ".gz", False), 
 25  ] 
 26   
 27  HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS} 
 28   
 29  _PDiffHashes = collections.namedtuple("_PDiffHashes", ["size", "sha1", "sha256"]) 
30 31 32 -async def asyncio_check_call(*args, **kwargs):
33 """async variant of subprocess.check_call 34 35 Parameters reflect that of asyncio.create_subprocess_exec or 36 (if "shell=True") that of asyncio.create_subprocess_shell 37 with restore_signals=True being the default. 38 """ 39 kwargs.setdefault("restore_signals", True) 40 shell = kwargs.pop("shell", False) 41 if shell: 42 proc = await asyncio.create_subprocess_shell(*args, **kwargs) 43 else: 44 proc = await asyncio.create_subprocess_exec(*args, **kwargs) 45 retcode = await proc.wait() 46 if retcode != 0: 47 raise subprocess.CalledProcessError(retcode, args[0]) 48 return 0
49
50 51 -async def open_decompressed(file, named_temp_file=False):
52 async def call_decompressor(cmd, inpath): 53 fh = ( 54 tempfile.NamedTemporaryFile("w+") 55 if named_temp_file 56 else tempfile.TemporaryFile("w+") 57 ) 58 with open(inpath, "rb") as rfd: 59 await asyncio_check_call( 60 *cmd, 61 stdin=rfd, 62 stdout=fh, 63 ) 64 fh.seek(0) 65 return fh
66 67 if os.path.isfile(file): 68 return open(file, "r") 69 elif os.path.isfile("%s.gz" % file): 70 return await call_decompressor(["zcat"], "{}.gz".format(file)) 71 elif os.path.isfile("%s.bz2" % file): 72 return await call_decompressor(["bzcat"], "{}.bz2".format(file)) 73 elif os.path.isfile("%s.xz" % file): 74 return await call_decompressor(["xzcat"], "{}.xz".format(file)) 75 elif os.path.isfile(f"{file}.zst"): 76 return await call_decompressor(["zstdcat"], f"{file}.zst") 77 else: 78 return None 79
80 81 -async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
82 """Merge two pdiff in to a merged pdiff 83 84 While rred support merging more than 2, we only need support for merging two. 85 In the steady state, we will have N merged patches plus 1 new patch. Here 86 we need to do N pairwise merges (i.e. merge two patches N times). 87 Therefore, supporting merging of 3+ patches does not help at all. 88 89 The setup state looks like it could do with a bulk merging. However, if you 90 merge from "latest to earliest" then you will be building in optimal order 91 and still only need to do N-1 pairwise merges (rather than N-1 merges 92 between N, N-1, N-2, ... 3, 2 patches). 93 94 Combined, supporting pairwise merges is sufficient for our use case. 95 """ 96 with await open_decompressed( 97 patch_a, named_temp_file=True 98 ) as fd_a, await open_decompressed(patch_b, named_temp_file=True) as fd_b: 99 await asyncio_check_call( 100 "/usr/lib/apt/methods/rred %s %s | gzip -9n > %s" 101 % (fd_a.name, fd_b.name, resulting_patch_without_extension + ".gz"), 102 shell=True, 103 )
104
105 106 -class PDiffHashes(_PDiffHashes):
107 108 @classmethod
109 - def from_file(cls, fd):
110 size = os.fstat(fd.fileno())[6] 111 hashes = DakHashes(fd) 112 return cls(size, hashes.sha1, hashes.sha256)
113
114 115 -async def _pdiff_hashes_from_patch(path_without_extension):
116 with await open_decompressed(path_without_extension) as difff: 117 hashes_decompressed = PDiffHashes.from_file(difff) 118 119 with open(path_without_extension + ".gz", "r") as difffgz: 120 hashes_compressed = PDiffHashes.from_file(difffgz) 121 122 return hashes_decompressed, hashes_compressed
123
124 125 -def _prune_history(order, history, maximum):
126 cnt = len(order) 127 if cnt <= maximum: 128 return order 129 for h in order[: cnt - maximum]: 130 del history[h] 131 return order[cnt - maximum :]
132
133 134 -def _read_hashes(history, history_order, ind, hashind, lines):
135 current_order = [] 136 for line in lines: 137 parts = line.split() 138 fname = parts[2] 139 if fname.endswith(".gz"): 140 fname = fname[:-3] 141 current_order.append(fname) 142 if fname not in history: 143 history[fname] = [None, None, None] 144 if not history[fname][ind]: 145 history[fname][ind] = PDiffHashes(int(parts[1]), None, None) 146 if hashind == 1: 147 history[fname][ind] = PDiffHashes( 148 history[fname][ind].size, 149 parts[0], 150 history[fname][ind].sha256, 151 ) 152 else: 153 history[fname][ind] = PDiffHashes( 154 history[fname][ind].size, 155 history[fname][ind].sha1, 156 parts[0], 157 ) 158 159 # Common-case: Either this is the first sequence we read and we 160 # simply adopt that 161 if not history_order: 162 return current_order 163 # Common-case: The current history perfectly matches the existing, so 164 # we just stop here. 165 if current_order == history_order: 166 return history_order 167 168 # Special-case, the histories are not aligned. This "should not happen" 169 # but has done so in the past due to bugs. Depending on which field is 170 # out of sync, dak would either self heal or be stuff forever. We 171 # realign the history to ensure we always end with "self-heal". 172 # 173 # Typically, the patches are aligned from the end as we always add a 174 # patch in the end of the series. 175 patches_from_the_end = 0 176 for p1, p2 in zip(reversed(current_order), reversed(history_order)): 177 if p1 == p2: 178 patches_from_the_end += 1 179 else: 180 break 181 182 if not patches_from_the_end: 183 return None 184 185 return current_order[-patches_from_the_end:]
186
187 188 -class PDiffIndex:
189 - def __init__(self, patches_dir, max=56, merge_pdiffs=False):
190 self.can_path = None 191 self._history = {} 192 self._history_order = [] 193 self._unmerged_history = {} 194 self._unmerged_history_order = [] 195 self._old_merged_patches_prefix = [] 196 self.max = max 197 self.patches_dir = patches_dir 198 self.filesizehashes = None 199 self.wants_merged_pdiffs = merge_pdiffs 200 self.has_merged_pdiffs = False 201 self.index_path = os.path.join(patches_dir, "Index") 202 self.read_index_file(self.index_path)
203
204 - async def generate_and_add_patch_file( 205 self, original_file, new_file_uncompressed, patch_name 206 ):
207 208 with await open_decompressed(original_file) as oldf: 209 oldsizehashes = PDiffHashes.from_file(oldf) 210 211 with open(new_file_uncompressed, "r") as newf: 212 newsizehashes = PDiffHashes.from_file(newf) 213 214 if newsizehashes == oldsizehashes: 215 return 216 217 if not os.path.isdir(self.patches_dir): 218 os.mkdir(self.patches_dir) 219 220 oldf.seek(0) 221 patch_path = os.path.join(self.patches_dir, patch_name) 222 with open("{}.gz".format(patch_path), "wb") as fh: 223 await asyncio_check_call( 224 "diff --ed - {} | gzip --rsyncable --no-name -c -9".format( 225 new_file_uncompressed 226 ), 227 shell=True, 228 stdin=oldf, 229 stdout=fh, 230 ) 231 232 difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path) 233 234 self.filesizehashes = newsizehashes 235 self._unmerged_history[patch_name] = [ 236 oldsizehashes, 237 difsizehashes, 238 difgzsizehashes, 239 ] 240 self._unmerged_history_order.append(patch_name) 241 242 if self.has_merged_pdiffs != self.wants_merged_pdiffs: 243 # Convert patches 244 if self.wants_merged_pdiffs: 245 await self._convert_to_merged_patches() 246 else: 247 self._convert_to_unmerged() 248 # Conversion also covers the newly added patch. Accordingly, 249 # the elif here. 250 else: 251 second_patch_name = patch_name 252 if self.wants_merged_pdiffs: 253 await self._bump_merged_patches() 254 second_patch_name = "T-%s-F-%s" % (patch_name, patch_name) 255 os.link( 256 os.path.join(self.patches_dir, patch_name + ".gz"), 257 os.path.join(self.patches_dir, second_patch_name + ".gz"), 258 ) 259 260 # Without merged PDiffs, keep _history and _unmerged_history aligned 261 self._history[second_patch_name] = [ 262 oldsizehashes, 263 difsizehashes, 264 difgzsizehashes, 265 ] 266 self._history_order.append(second_patch_name)
267
268 - async def _bump_merged_patches(self):
269 # When bumping patches, we need to "rewrite" all merged patches. As 270 # neither apt nor dak supports by-hash for pdiffs, we leave the old 271 # versions of merged pdiffs behind. 272 target_name = self._unmerged_history_order[-1] 273 target_path = os.path.join(self.patches_dir, target_name) 274 275 new_merged_order = [] 276 new_merged_history = {} 277 for old_merged_patch_name in self._history_order: 278 try: 279 old_orig_name = old_merged_patch_name.split("-F-", 1)[1] 280 except IndexError: 281 old_orig_name = old_merged_patch_name 282 new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name) 283 old_merged_patch_path = os.path.join( 284 self.patches_dir, old_merged_patch_name 285 ) 286 new_merged_patch_path = os.path.join( 287 self.patches_dir, new_merged_patch_name 288 ) 289 await _merge_pdiffs( 290 old_merged_patch_path, target_path, new_merged_patch_path 291 ) 292 293 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch( 294 new_merged_patch_path 295 ) 296 297 new_merged_history[new_merged_patch_name] = [ 298 self._history[old_merged_patch_name][0], 299 hashes_decompressed, 300 hashes_compressed, 301 ] 302 new_merged_order.append(new_merged_patch_name) 303 304 self._history_order = new_merged_order 305 self._history = new_merged_history 306 307 self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
308
309 - def _convert_to_unmerged(self):
310 if not self.has_merged_pdiffs: 311 return 312 # Converting from merged patches to unmerged patches is simply. Discard the merged 313 # patches. Cleanup will be handled by find_obsolete_patches 314 self._history = {k: v for k, v in self._unmerged_history.items()} 315 self._history_order = list(self._unmerged_history_order) 316 self._old_merged_patches_prefix = [] 317 self.has_merged_pdiffs = False
318
319 - async def _convert_to_merged_patches(self):
320 if self.has_merged_pdiffs: 321 return 322 323 target_name = self._unmerged_history_order[-1] 324 325 self._history = {} 326 self._history_order = [] 327 328 new_patches = [] 329 330 # We merge from newest to oldest 331 # 332 # Assume we got N unmerged patches (u1 - uN) where given s1 then 333 # you can apply u1 to get to s2. From s2 you use u2 to move to s3 334 # and so on until you reach your target T (= sN+1). 335 # 336 # In the merged patch world, we want N merged patches called m1-N, 337 # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to 338 # T directly regardless of where you start. 339 # 340 # A note worthy special case is that m(N-1)-N is identical uN 341 # content-wise. This will be important in a moment. For now, 342 # lets start with looking at creating merged patches. 343 # 344 # We can get m1-N by merging u1 with m2-N because u1 will take s1 345 # to s2 and m2-N will take s2 to T. By the same argument, we get 346 # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until 347 # we get to the base-case m(N-1)-N - which is uN. 348 # 349 # From this, we can conclude that generating the patches in 350 # reverse order (i.e. m2-N is generated before m1-N) will get 351 # us the desired result in N-1 pair-wise merges without having 352 # to use all patches in one go. (This is also optimal in the 353 # sense that we need to update N-1 patches to preserve the 354 # entire history). 355 # 356 for patch_name in reversed(self._unmerged_history_order): 357 merged_patch = "T-%s-F-%s" % (target_name, patch_name) 358 merged_patch_path = os.path.join(self.patches_dir, merged_patch) 359 360 if new_patches: 361 oldest_patch = os.path.join(self.patches_dir, patch_name) 362 previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1]) 363 await _merge_pdiffs( 364 oldest_patch, previous_merged_patch, merged_patch_path 365 ) 366 367 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch( 368 merged_patch_path 369 ) 370 371 self._history[merged_patch] = [ 372 self._unmerged_history[patch_name][0], 373 hashes_decompressed, 374 hashes_compressed, 375 ] 376 else: 377 # Special_case; the latest patch is its own "merged" variant. 378 os.link( 379 os.path.join(self.patches_dir, patch_name + ".gz"), 380 merged_patch_path + ".gz", 381 ) 382 self._history[merged_patch] = self._unmerged_history[patch_name] 383 384 new_patches.append(merged_patch) 385 386 self._history_order = list(reversed(new_patches)) 387 self._old_merged_patches_prefix.append(target_name) 388 self.has_merged_pdiffs = True
389
390 - def read_index_file(self, index_file_path):
391 try: 392 with apt_pkg.TagFile(index_file_path) as index: 393 index.step() 394 section = index.section 395 self.has_merged_pdiffs = section.get("X-Patch-Precedence") == "merged" 396 self._old_merged_patches_prefix = section.get( 397 "X-DAK-Older-Patches", "" 398 ).split() 399 400 for field in section.keys(): 401 value = section[field] 402 if field in HASH_FIELDS_TABLE: 403 ind, hashind, primary_history = HASH_FIELDS_TABLE[field] 404 if primary_history: 405 history = self._history 406 history_order = self._history_order 407 else: 408 history = self._unmerged_history 409 history_order = self._unmerged_history_order 410 411 if history_order is None: 412 # History is already misaligned and we cannot find a common restore point. 413 continue 414 415 new_order = _read_hashes( 416 history, history_order, ind, hashind, value.splitlines() 417 ) 418 if primary_history: 419 self._history_order = new_order 420 else: 421 self._unmerged_history_order = new_order 422 continue 423 424 if field in ("Canonical-Name", "Canonical-Path"): 425 self.can_path = value 426 continue 427 428 if field not in ("SHA1-Current", "SHA256-Current"): 429 continue 430 431 columns = value.split() 432 433 if len(columns) != 2: 434 continue 435 436 if not self.filesizehashes: 437 self.filesizehashes = PDiffHashes(int(columns[1]), None, None) 438 439 if field == "SHA1-Current": 440 self.filesizehashes = PDiffHashes( 441 self.filesizehashes.size, 442 columns[0], 443 self.filesizehashes.sha256, 444 ) 445 446 if field == "SHA256-Current": 447 self.filesizehashes = PDiffHashes( 448 self.filesizehashes.size, 449 self.filesizehashes.sha1, 450 columns[0], 451 ) 452 453 # Ensure that the order lists are defined again. 454 if self._history_order is None: 455 self._history_order = [] 456 if self._unmerged_history_order is None: 457 self._unmerged_history_order = [] 458 459 if not self.has_merged_pdiffs: 460 # When X-Patch-Precedence != merged, then the two histories are the same. 461 self._unmerged_history = {k: v for k, v in self._history.items()} 462 self._unmerged_history_order = list(self._history_order) 463 self._old_merged_patches_prefix = [] 464 465 except (OSError, apt_pkg.Error): 466 # On error, we ignore everything. This causes the file to be regenerated from scratch. 467 # It forces everyone to download the full file for if they are behind. 468 # But it is self-healing providing that we generate valid files from here on. 469 pass
470
471 - def prune_patch_history(self):
472 # Truncate our history if necessary 473 hs = self._history 474 order = self._history_order 475 unmerged_hs = self._unmerged_history 476 unmerged_order = self._unmerged_history_order 477 self._history_order = _prune_history(order, hs, self.max) 478 self._unmerged_history_order = _prune_history( 479 unmerged_order, unmerged_hs, self.max 480 ) 481 482 prefix_cnt = len(self._old_merged_patches_prefix) 483 if prefix_cnt > 3: 484 self._old_merged_patches_prefix = self._old_merged_patches_prefix[ 485 prefix_cnt - 3 : 486 ]
487
488 - def find_obsolete_patches(self):
489 if not os.path.isdir(self.patches_dir): 490 return 491 492 hs = self._history 493 unmerged_hs = self._unmerged_history 494 495 keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix) 496 497 # Scan for obsolete patches. While we could have computed these 498 # from the history, this method has the advantage of cleaning up 499 # old patches left that we failed to remove previously (e.g. if 500 # we had an index corruption, which happened in fed7ada36b609 and 501 # was later fixed in a36f867acf029) 502 for name in os.listdir(self.patches_dir): 503 if name in ("Index", "by-hash"): 504 continue 505 # We keep some old merged patches around (as neither apt nor 506 # dak supports by-hash for pdiffs) 507 if keep_prefixes and name.startswith(keep_prefixes): 508 continue 509 basename, ext = os.path.splitext(name) 510 if ext in ("", ".gz") and (basename in hs or basename in unmerged_hs): 511 continue 512 path = os.path.join(self.patches_dir, name) 513 if not os.path.isfile(path): 514 # Non-files are probably not patches. 515 continue 516 # Unknown patch file; flag it as obsolete 517 yield path
518
519 - def dump(self, out=sys.stdout):
520 if self.can_path: 521 out.write("Canonical-Path: %s\n" % self.can_path) 522 523 if self.filesizehashes: 524 if self.filesizehashes.sha1: 525 out.write( 526 "SHA1-Current: %s %7d\n" 527 % (self.filesizehashes.sha1, self.filesizehashes.size) 528 ) 529 if self.filesizehashes.sha256: 530 out.write( 531 "SHA256-Current: %s %7d\n" 532 % (self.filesizehashes.sha256, self.filesizehashes.size) 533 ) 534 535 for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS: 536 537 if primary_history: 538 hs = self._history 539 order = self._history_order 540 elif self.has_merged_pdiffs: 541 hs = self._unmerged_history 542 order = self._unmerged_history_order 543 else: 544 continue 545 546 out.write("%s:\n" % fieldname) 547 for h in order: 548 if hs[h][ind] and hs[h][ind][hashind]: 549 out.write( 550 " %s %7d %s%s\n" 551 % (hs[h][ind][hashind], hs[h][ind].size, h, ext) 552 ) 553 554 if self.has_merged_pdiffs: 555 out.write("X-Patch-Precedence: merged\n") 556 if self._old_merged_patches_prefix: 557 out.write( 558 "X-DAK-Older-Patches: %s\n" 559 % " ".join(self._old_merged_patches_prefix) 560 )
561
562 - def update_index(self, tmp_suffix=".new"):
563 if not os.path.isdir(self.patches_dir): 564 # If there is no patch directory, then we have no patches. 565 # It seems weird to have an Index of patches when we know there are 566 # none. 567 return 568 tmp_path = self.index_path + tmp_suffix 569 with open(tmp_path, "w") as f: 570 self.dump(f) 571 os.rename(tmp_path, self.index_path)
572