Coverage for daklib/pdiff.py: 94%

286 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-04 16:18 +0000

1import asyncio 

2import collections 

3import os 

4import subprocess 

5import sys 

6import tempfile 

7 

8import apt_pkg 

9 

10from daklib.dakapt import DakHashes 

11 

12HASH_FIELDS = [ 

13 ("SHA1-History", 0, 1, "", True), 

14 ("SHA256-History", 0, 2, "", True), 

15 ("SHA1-Patches", 1, 1, "", True), 

16 ("SHA256-Patches", 1, 2, "", True), 

17 ("SHA1-Download", 2, 1, ".gz", True), 

18 ("SHA256-Download", 2, 2, ".gz", True), 

19 ("X-Unmerged-SHA1-History", 0, 1, "", False), 

20 ("X-Unmerged-SHA256-History", 0, 2, "", False), 

21 ("X-Unmerged-SHA1-Patches", 1, 1, "", False), 

22 ("X-Unmerged-SHA256-Patches", 1, 2, "", False), 

23 ("X-Unmerged-SHA1-Download", 2, 1, ".gz", False), 

24 ("X-Unmerged-SHA256-Download", 2, 2, ".gz", False), 

25] 

26 

27HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS} 

28 

29_PDiffHashes = collections.namedtuple("_PDiffHashes", ["size", "sha1", "sha256"]) 

30 

31 

32async def asyncio_check_call(*args, **kwargs): 

33 """async variant of subprocess.check_call 

34 

35 Parameters reflect that of asyncio.create_subprocess_exec or 

36 (if "shell=True") that of asyncio.create_subprocess_shell 

37 with restore_signals=True being the default. 

38 """ 

39 kwargs.setdefault("restore_signals", True) 

40 shell = kwargs.pop("shell", False) 

41 if shell: 

42 proc = await asyncio.create_subprocess_shell(*args, **kwargs) 

43 else: 

44 proc = await asyncio.create_subprocess_exec(*args, **kwargs) 

45 retcode = await proc.wait() 

46 if retcode != 0: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true

47 raise subprocess.CalledProcessError(retcode, args[0]) 

48 return 0 

49 

50 

51async def open_decompressed(file, named_temp_file=False): 

52 async def call_decompressor(cmd, inpath): 

53 fh = ( 

54 tempfile.NamedTemporaryFile("w+") 

55 if named_temp_file 

56 else tempfile.TemporaryFile("w+") 

57 ) 

58 with open(inpath, "rb") as rfd: 

59 await asyncio_check_call( 

60 *cmd, 

61 stdin=rfd, 

62 stdout=fh, 

63 ) 

64 fh.seek(0) 

65 return fh 

66 

67 if os.path.isfile(file): 

68 return open(file, "r") 

69 elif os.path.isfile("%s.gz" % file): 

70 return await call_decompressor(["zcat"], "{}.gz".format(file)) 

71 elif os.path.isfile("%s.bz2" % file): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 return await call_decompressor(["bzcat"], "{}.bz2".format(file)) 

73 elif os.path.isfile("%s.xz" % file): 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true

74 return await call_decompressor(["xzcat"], "{}.xz".format(file)) 

75 elif os.path.isfile(f"{file}.zst"): 

76 return await call_decompressor(["zstdcat"], f"{file}.zst") 

77 else: 

78 return None 

79 

80 

81async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension): 

82 """Merge two pdiff in to a merged pdiff 

83 

84 While rred support merging more than 2, we only need support for merging two. 

85 In the steady state, we will have N merged patches plus 1 new patch. Here 

86 we need to do N pairwise merges (i.e. merge two patches N times). 

87 Therefore, supporting merging of 3+ patches does not help at all. 

88 

89 The setup state looks like it could do with a bulk merging. However, if you 

90 merge from "latest to earliest" then you will be building in optimal order 

91 and still only need to do N-1 pairwise merges (rather than N-1 merges 

92 between N, N-1, N-2, ... 3, 2 patches). 

93 

94 Combined, supporting pairwise merges is sufficient for our use case. 

95 """ 

96 with ( 

97 await open_decompressed(patch_a, named_temp_file=True) as fd_a, 

98 await open_decompressed(patch_b, named_temp_file=True) as fd_b, 

99 ): 

100 await asyncio_check_call( 

101 "/usr/lib/apt/methods/rred %s %s | gzip -9n > %s" 

102 % (fd_a.name, fd_b.name, resulting_patch_without_extension + ".gz"), 

103 shell=True, 

104 ) 

105 

106 

107class PDiffHashes(_PDiffHashes): 

108 

109 @classmethod 

110 def from_file(cls, fd): 

111 size = os.fstat(fd.fileno())[6] 

112 hashes = DakHashes(fd) 

113 return cls(size, hashes.sha1, hashes.sha256) 

114 

115 

116async def _pdiff_hashes_from_patch(path_without_extension): 

117 with await open_decompressed(path_without_extension) as difff: 

118 hashes_decompressed = PDiffHashes.from_file(difff) 

119 

120 with open(path_without_extension + ".gz", "r") as difffgz: 

121 hashes_compressed = PDiffHashes.from_file(difffgz) 

122 

123 return hashes_decompressed, hashes_compressed 

124 

125 

126def _prune_history(order, history, maximum): 

127 cnt = len(order) 

128 if cnt <= maximum: 

129 return order 

130 for h in order[: cnt - maximum]: 

131 del history[h] 

132 return order[cnt - maximum :] 

133 

134 

135def _read_hashes(history, history_order, ind, hashind, lines): 

136 current_order = [] 

137 for line in lines: 

138 parts = line.split() 

139 fname = parts[2] 

140 if fname.endswith(".gz"): 

141 fname = fname[:-3] 

142 current_order.append(fname) 

143 if fname not in history: 

144 history[fname] = [None, None, None] 

145 if not history[fname][ind]: 

146 history[fname][ind] = PDiffHashes(int(parts[1]), None, None) 

147 if hashind == 1: 

148 history[fname][ind] = PDiffHashes( 

149 history[fname][ind].size, 

150 parts[0], 

151 history[fname][ind].sha256, 

152 ) 

153 else: 

154 history[fname][ind] = PDiffHashes( 

155 history[fname][ind].size, 

156 history[fname][ind].sha1, 

157 parts[0], 

158 ) 

159 

160 # Common-case: Either this is the first sequence we read and we 

161 # simply adopt that 

162 if not history_order: 

163 return current_order 

164 # Common-case: The current history perfectly matches the existing, so 

165 # we just stop here. 

166 if current_order == history_order: 

167 return history_order 

168 

169 # Special-case, the histories are not aligned. This "should not happen" 

170 # but has done so in the past due to bugs. Depending on which field is 

171 # out of sync, dak would either self heal or be stuff forever. We 

172 # realign the history to ensure we always end with "self-heal". 

173 # 

174 # Typically, the patches are aligned from the end as we always add a 

175 # patch in the end of the series. 

176 patches_from_the_end = 0 

177 for p1, p2 in zip(reversed(current_order), reversed(history_order)): 

178 if p1 == p2: 

179 patches_from_the_end += 1 

180 else: 

181 break 

182 

183 if not patches_from_the_end: 

184 return None 

185 

186 return current_order[-patches_from_the_end:] 

187 

188 

189class PDiffIndex: 

190 def __init__(self, patches_dir, max=56, merge_pdiffs=False): 

191 self.can_path = None 

192 self._history = {} 

193 self._history_order = [] 

194 self._unmerged_history = {} 

195 self._unmerged_history_order = [] 

196 self._old_merged_patches_prefix = [] 

197 self.max = max 

198 self.patches_dir = patches_dir 

199 self.filesizehashes = None 

200 self.wants_merged_pdiffs = merge_pdiffs 

201 self.has_merged_pdiffs = False 

202 self.index_path = os.path.join(patches_dir, "Index") 

203 self.read_index_file(self.index_path) 

204 

205 async def generate_and_add_patch_file( 

206 self, original_file, new_file_uncompressed, patch_name 

207 ): 

208 

209 with await open_decompressed(original_file) as oldf: 

210 oldsizehashes = PDiffHashes.from_file(oldf) 

211 

212 with open(new_file_uncompressed, "r") as newf: 

213 newsizehashes = PDiffHashes.from_file(newf) 

214 

215 if newsizehashes == oldsizehashes: 

216 return 

217 

218 if not os.path.isdir(self.patches_dir): 

219 os.mkdir(self.patches_dir) 

220 

221 oldf.seek(0) 

222 patch_path = os.path.join(self.patches_dir, patch_name) 

223 with open("{}.gz".format(patch_path), "wb") as fh: 

224 await asyncio_check_call( 

225 "diff --ed - {} | gzip --rsyncable --no-name -c -9".format( 

226 new_file_uncompressed 

227 ), 

228 shell=True, 

229 stdin=oldf, 

230 stdout=fh, 

231 ) 

232 

233 difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path) 

234 

235 self.filesizehashes = newsizehashes 

236 self._unmerged_history[patch_name] = [ 

237 oldsizehashes, 

238 difsizehashes, 

239 difgzsizehashes, 

240 ] 

241 self._unmerged_history_order.append(patch_name) 

242 

243 if self.has_merged_pdiffs != self.wants_merged_pdiffs: 

244 # Convert patches 

245 if self.wants_merged_pdiffs: 

246 await self._convert_to_merged_patches() 

247 else: 

248 self._convert_to_unmerged() 

249 # Conversion also covers the newly added patch. Accordingly, 

250 # the elif here. 

251 else: 

252 second_patch_name = patch_name 

253 if self.wants_merged_pdiffs: 

254 await self._bump_merged_patches() 

255 second_patch_name = "T-%s-F-%s" % (patch_name, patch_name) 

256 os.link( 

257 os.path.join(self.patches_dir, patch_name + ".gz"), 

258 os.path.join(self.patches_dir, second_patch_name + ".gz"), 

259 ) 

260 

261 # Without merged PDiffs, keep _history and _unmerged_history aligned 

262 self._history[second_patch_name] = [ 

263 oldsizehashes, 

264 difsizehashes, 

265 difgzsizehashes, 

266 ] 

267 self._history_order.append(second_patch_name) 

268 

269 async def _bump_merged_patches(self): 

270 # When bumping patches, we need to "rewrite" all merged patches. As 

271 # neither apt nor dak supports by-hash for pdiffs, we leave the old 

272 # versions of merged pdiffs behind. 

273 target_name = self._unmerged_history_order[-1] 

274 target_path = os.path.join(self.patches_dir, target_name) 

275 

276 new_merged_order = [] 

277 new_merged_history = {} 

278 for old_merged_patch_name in self._history_order: 

279 try: 

280 old_orig_name = old_merged_patch_name.split("-F-", 1)[1] 

281 except IndexError: 

282 old_orig_name = old_merged_patch_name 

283 new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name) 

284 old_merged_patch_path = os.path.join( 

285 self.patches_dir, old_merged_patch_name 

286 ) 

287 new_merged_patch_path = os.path.join( 

288 self.patches_dir, new_merged_patch_name 

289 ) 

290 await _merge_pdiffs( 

291 old_merged_patch_path, target_path, new_merged_patch_path 

292 ) 

293 

294 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch( 

295 new_merged_patch_path 

296 ) 

297 

298 new_merged_history[new_merged_patch_name] = [ 

299 self._history[old_merged_patch_name][0], 

300 hashes_decompressed, 

301 hashes_compressed, 

302 ] 

303 new_merged_order.append(new_merged_patch_name) 

304 

305 self._history_order = new_merged_order 

306 self._history = new_merged_history 

307 

308 self._old_merged_patches_prefix.append(self._unmerged_history_order[-1]) 

309 

310 def _convert_to_unmerged(self): 

311 if not self.has_merged_pdiffs: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 return 

313 # Converting from merged patches to unmerged patches is simply. Discard the merged 

314 # patches. Cleanup will be handled by find_obsolete_patches 

315 self._history = {k: v for k, v in self._unmerged_history.items()} 

316 self._history_order = list(self._unmerged_history_order) 

317 self._old_merged_patches_prefix = [] 

318 self.has_merged_pdiffs = False 

319 

320 async def _convert_to_merged_patches(self): 

321 if self.has_merged_pdiffs: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true

322 return 

323 

324 target_name = self._unmerged_history_order[-1] 

325 

326 self._history = {} 

327 self._history_order = [] 

328 

329 new_patches: list[str] = [] 

330 

331 # We merge from newest to oldest 

332 # 

333 # Assume we got N unmerged patches (u1 - uN) where given s1 then 

334 # you can apply u1 to get to s2. From s2 you use u2 to move to s3 

335 # and so on until you reach your target T (= sN+1). 

336 # 

337 # In the merged patch world, we want N merged patches called m1-N, 

338 # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to 

339 # T directly regardless of where you start. 

340 # 

341 # A note worthy special case is that m(N-1)-N is identical uN 

342 # content-wise. This will be important in a moment. For now, 

343 # lets start with looking at creating merged patches. 

344 # 

345 # We can get m1-N by merging u1 with m2-N because u1 will take s1 

346 # to s2 and m2-N will take s2 to T. By the same argument, we get 

347 # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until 

348 # we get to the base-case m(N-1)-N - which is uN. 

349 # 

350 # From this, we can conclude that generating the patches in 

351 # reverse order (i.e. m2-N is generated before m1-N) will get 

352 # us the desired result in N-1 pair-wise merges without having 

353 # to use all patches in one go. (This is also optimal in the 

354 # sense that we need to update N-1 patches to preserve the 

355 # entire history). 

356 # 

357 for patch_name in reversed(self._unmerged_history_order): 

358 merged_patch = "T-%s-F-%s" % (target_name, patch_name) 

359 merged_patch_path = os.path.join(self.patches_dir, merged_patch) 

360 

361 if new_patches: 

362 oldest_patch = os.path.join(self.patches_dir, patch_name) 

363 previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1]) 

364 await _merge_pdiffs( 

365 oldest_patch, previous_merged_patch, merged_patch_path 

366 ) 

367 

368 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch( 

369 merged_patch_path 

370 ) 

371 

372 self._history[merged_patch] = [ 

373 self._unmerged_history[patch_name][0], 

374 hashes_decompressed, 

375 hashes_compressed, 

376 ] 

377 else: 

378 # Special_case; the latest patch is its own "merged" variant. 

379 os.link( 

380 os.path.join(self.patches_dir, patch_name + ".gz"), 

381 merged_patch_path + ".gz", 

382 ) 

383 self._history[merged_patch] = self._unmerged_history[patch_name] 

384 

385 new_patches.append(merged_patch) 

386 

387 self._history_order = list(reversed(new_patches)) 

388 self._old_merged_patches_prefix.append(target_name) 

389 self.has_merged_pdiffs = True 

390 

391 def read_index_file(self, index_file_path): 

392 try: 

393 with apt_pkg.TagFile(index_file_path) as index: 

394 index.step() # type: ignore[attr-defined] 

395 section: apt_pkg.TagSection = index.section # type: ignore[attr-defined] 

396 self.has_merged_pdiffs = section.get("X-Patch-Precedence") == "merged" 

397 self._old_merged_patches_prefix = section.get( 

398 "X-DAK-Older-Patches", "" 

399 ).split() 

400 

401 for field in section.keys(): 

402 value = section[field] 

403 if field in HASH_FIELDS_TABLE: 

404 ind, hashind, primary_history = HASH_FIELDS_TABLE[field] 

405 if primary_history: 

406 history = self._history 

407 history_order = self._history_order 

408 else: 

409 history = self._unmerged_history 

410 history_order = self._unmerged_history_order 

411 

412 if history_order is None: 

413 # History is already misaligned and we cannot find a common restore point. 

414 continue 

415 

416 new_order = _read_hashes( 

417 history, history_order, ind, hashind, value.splitlines() 

418 ) 

419 if primary_history: 

420 self._history_order = new_order 

421 else: 

422 self._unmerged_history_order = new_order 

423 continue 

424 

425 if field in ("Canonical-Name", "Canonical-Path"): 

426 self.can_path = value 

427 continue 

428 

429 if field not in ("SHA1-Current", "SHA256-Current"): 

430 continue 

431 

432 columns = value.split() 

433 

434 if len(columns) != 2: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 continue 

436 

437 if not self.filesizehashes: 

438 self.filesizehashes = PDiffHashes(int(columns[1]), None, None) 

439 

440 if field == "SHA1-Current": 

441 self.filesizehashes = PDiffHashes( 

442 self.filesizehashes.size, 

443 columns[0], 

444 self.filesizehashes.sha256, 

445 ) 

446 

447 if field == "SHA256-Current": 

448 self.filesizehashes = PDiffHashes( 

449 self.filesizehashes.size, 

450 self.filesizehashes.sha1, 

451 columns[0], 

452 ) 

453 

454 # Ensure that the order lists are defined again. 

455 if self._history_order is None: 

456 self._history_order = [] 

457 if self._unmerged_history_order is None: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true

458 self._unmerged_history_order = [] 

459 

460 if not self.has_merged_pdiffs: 

461 # When X-Patch-Precedence != merged, then the two histories are the same. 

462 self._unmerged_history = {k: v for k, v in self._history.items()} 

463 self._unmerged_history_order = list(self._history_order) 

464 self._old_merged_patches_prefix = [] 

465 

466 except (OSError, apt_pkg.Error): 

467 # On error, we ignore everything. This causes the file to be regenerated from scratch. 

468 # It forces everyone to download the full file for if they are behind. 

469 # But it is self-healing providing that we generate valid files from here on. 

470 pass 

471 

472 def prune_patch_history(self): 

473 # Truncate our history if necessary 

474 hs = self._history 

475 order = self._history_order 

476 unmerged_hs = self._unmerged_history 

477 unmerged_order = self._unmerged_history_order 

478 self._history_order = _prune_history(order, hs, self.max) 

479 self._unmerged_history_order = _prune_history( 

480 unmerged_order, unmerged_hs, self.max 

481 ) 

482 

483 prefix_cnt = len(self._old_merged_patches_prefix) 

484 if prefix_cnt > 3: 

485 self._old_merged_patches_prefix = self._old_merged_patches_prefix[ 

486 prefix_cnt - 3 : 

487 ] 

488 

489 def find_obsolete_patches(self): 

490 if not os.path.isdir(self.patches_dir): 

491 return 

492 

493 hs = self._history 

494 unmerged_hs = self._unmerged_history 

495 

496 keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix) 

497 

498 # Scan for obsolete patches. While we could have computed these 

499 # from the history, this method has the advantage of cleaning up 

500 # old patches left that we failed to remove previously (e.g. if 

501 # we had an index corruption, which happened in fed7ada36b609 and 

502 # was later fixed in a36f867acf029) 

503 for name in os.listdir(self.patches_dir): 

504 if name in ("Index", "by-hash"): 

505 continue 

506 # We keep some old merged patches around (as neither apt nor 

507 # dak supports by-hash for pdiffs) 

508 if keep_prefixes and name.startswith(keep_prefixes): 

509 continue 

510 basename, ext = os.path.splitext(name) 

511 if ext in ("", ".gz") and (basename in hs or basename in unmerged_hs): 

512 continue 

513 path = os.path.join(self.patches_dir, name) 

514 if not os.path.isfile(path): 514 ↛ 516line 514 didn't jump to line 516 because the condition on line 514 was never true

515 # Non-files are probably not patches. 

516 continue 

517 # Unknown patch file; flag it as obsolete 

518 yield path 

519 

520 def dump(self, out=sys.stdout): 

521 if self.can_path: 

522 out.write("Canonical-Path: %s\n" % self.can_path) 

523 

524 if self.filesizehashes: 524 ↛ 536line 524 didn't jump to line 536 because the condition on line 524 was always true

525 if self.filesizehashes.sha1: 525 ↛ 530line 525 didn't jump to line 530 because the condition on line 525 was always true

526 out.write( 

527 "SHA1-Current: %s %7d\n" 

528 % (self.filesizehashes.sha1, self.filesizehashes.size) 

529 ) 

530 if self.filesizehashes.sha256: 530 ↛ 536line 530 didn't jump to line 536 because the condition on line 530 was always true

531 out.write( 

532 "SHA256-Current: %s %7d\n" 

533 % (self.filesizehashes.sha256, self.filesizehashes.size) 

534 ) 

535 

536 for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS: 

537 

538 if primary_history: 

539 hs = self._history 

540 order = self._history_order 

541 elif self.has_merged_pdiffs: 

542 hs = self._unmerged_history 

543 order = self._unmerged_history_order 

544 else: 

545 continue 

546 

547 out.write("%s:\n" % fieldname) 

548 for h in order: 

549 if hs[h][ind] and hs[h][ind][hashind]: 549 ↛ 548line 549 didn't jump to line 548 because the condition on line 549 was always true

550 out.write( 

551 " %s %7d %s%s\n" 

552 % (hs[h][ind][hashind], hs[h][ind].size, h, ext) 

553 ) 

554 

555 if self.has_merged_pdiffs: 

556 out.write("X-Patch-Precedence: merged\n") 

557 if self._old_merged_patches_prefix: 557 ↛ exitline 557 didn't return from function 'dump' because the condition on line 557 was always true

558 out.write( 

559 "X-DAK-Older-Patches: %s\n" 

560 % " ".join(self._old_merged_patches_prefix) 

561 ) 

562 

563 def update_index(self, tmp_suffix=".new"): 

564 if not os.path.isdir(self.patches_dir): 

565 # If there is no patch directory, then we have no patches. 

566 # It seems weird to have an Index of patches when we know there are 

567 # none. 

568 return 

569 tmp_path = self.index_path + tmp_suffix 

570 with open(tmp_path, "w") as f: 

571 self.dump(f) 

572 os.rename(tmp_path, self.index_path)