1import asyncio 

2import collections 

3import os 

4import subprocess 

5import sys 

6import tempfile 

7 

8import apt_pkg 

9 

10from daklib.dakapt import DakHashes 

11 

12HASH_FIELDS = [ 

13 ('SHA1-History', 0, 1, "", True), 

14 ('SHA256-History', 0, 2, "", True), 

15 ('SHA1-Patches', 1, 1, "", True), 

16 ('SHA256-Patches', 1, 2, "", True), 

17 ('SHA1-Download', 2, 1, ".gz", True), 

18 ('SHA256-Download', 2, 2, ".gz", True), 

19 ('X-Unmerged-SHA1-History', 0, 1, "", False), 

20 ('X-Unmerged-SHA256-History', 0, 2, "", False), 

21 ('X-Unmerged-SHA1-Patches', 1, 1, "", False), 

22 ('X-Unmerged-SHA256-Patches', 1, 2, "", False), 

23 ('X-Unmerged-SHA1-Download', 2, 1, ".gz", False), 

24 ('X-Unmerged-SHA256-Download', 2, 2, ".gz", False), 

25] 

26 

27HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS} 

28 

29_PDiffHashes = collections.namedtuple('_PDiffHashes', ['size', 'sha1', 'sha256']) 

30 

31 

32async def asyncio_check_call(*args, **kwargs): 

33 """async variant of subprocess.check_call 

34 

35 Parameters reflect that of asyncio.create_subprocess_exec or 

36 (if "shell=True") that of asyncio.create_subprocess_shell 

37 with restore_signals=True being the default. 

38 """ 

39 kwargs.setdefault('restore_signals', True) 

40 shell = kwargs.pop('shell', False) 

41 if shell: 

42 proc = await asyncio.create_subprocess_shell(*args, **kwargs) 

43 else: 

44 proc = await asyncio.create_subprocess_exec(*args, **kwargs) 

45 retcode = await proc.wait() 

46 if retcode != 0: 46 ↛ 47line 46 didn't jump to line 47, because the condition on line 46 was never true

47 raise subprocess.CalledProcessError(retcode, args[0]) 

48 return 0 

49 

50 

51async def open_decompressed(file, named_temp_file=False): 

52 async def call_decompressor(cmd, inpath): 

53 fh = tempfile.NamedTemporaryFile("w+") if named_temp_file \ 

54 else tempfile.TemporaryFile("w+") 

55 with open(inpath, "rb") as rfd: 

56 await asyncio_check_call( 

57 *cmd, 

58 stdin=rfd, 

59 stdout=fh, 

60 ) 

61 fh.seek(0) 

62 return fh 

63 

64 if os.path.isfile(file): 

65 return open(file, "r") 

66 elif os.path.isfile("%s.gz" % file): 

67 return await call_decompressor(['zcat'], '{}.gz'.format(file)) 

68 elif os.path.isfile("%s.bz2" % file): 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true

69 return await call_decompressor(['bzcat'], '{}.bz2'.format(file)) 

70 elif os.path.isfile("%s.xz" % file): 70 ↛ 72line 70 didn't jump to line 72, because the condition on line 70 was never false

71 return await call_decompressor(['xzcat'], '{}.xz'.format(file)) 

72 elif os.path.isfile(f"{file}.zst"): 

73 return await call_decompressor(['zstdcat'], f'{file}.zst') 

74 else: 

75 return None 

76 

77 

78async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension): 

79 """Merge two pdiff in to a merged pdiff 

80 

81 While rred support merging more than 2, we only need support for merging two. 

82 In the steady state, we will have N merged patches plus 1 new patch. Here 

83 we need to do N pairwise merges (i.e. merge two patches N times). 

84 Therefore, supporting merging of 3+ patches does not help at all. 

85 

86 The setup state looks like it could do with a bulk merging. However, if you 

87 merge from "latest to earliest" then you will be building in optimal order 

88 and still only need to do N-1 pairwise merges (rather than N-1 merges 

89 between N, N-1, N-2, ... 3, 2 patches). 

90 

91 Combined, supporting pairwise merges is sufficient for our use case. 

92 """ 

93 with await open_decompressed(patch_a, named_temp_file=True) as fd_a, \ 

94 await open_decompressed(patch_b, named_temp_file=True) as fd_b: 

95 await asyncio_check_call( 

96 '/usr/lib/apt/methods/rred %s %s | gzip -9n > %s' % (fd_a.name, fd_b.name, 

97 resulting_patch_without_extension + ".gz"), 

98 shell=True, 

99 ) 

100 

101 

102class PDiffHashes(_PDiffHashes): 

103 

104 @classmethod 

105 def from_file(cls, fd): 

106 size = os.fstat(fd.fileno())[6] 

107 hashes = DakHashes(fd) 

108 return cls(size, hashes.sha1, hashes.sha256) 

109 

110 

111async def _pdiff_hashes_from_patch(path_without_extension): 

112 with await open_decompressed(path_without_extension) as difff: 

113 hashes_decompressed = PDiffHashes.from_file(difff) 

114 

115 with open(path_without_extension + ".gz", "r") as difffgz: 

116 hashes_compressed = PDiffHashes.from_file(difffgz) 

117 

118 return hashes_decompressed, hashes_compressed 

119 

120 

121def _prune_history(order, history, maximum): 

122 cnt = len(order) 

123 if cnt <= maximum: 

124 return order 

125 for h in order[:cnt - maximum]: 

126 del history[h] 

127 return order[cnt - maximum:] 

128 

129 

130def _read_hashes(history, history_order, ind, hashind, lines): 

131 current_order = [] 

132 for line in lines: 

133 parts = line.split() 

134 fname = parts[2] 

135 if fname.endswith('.gz'): 

136 fname = fname[:-3] 

137 current_order.append(fname) 

138 if fname not in history: 

139 history[fname] = [None, None, None] 

140 if not history[fname][ind]: 

141 history[fname][ind] = PDiffHashes(int(parts[1]), None, None) 

142 if hashind == 1: 

143 history[fname][ind] = PDiffHashes(history[fname][ind].size, 

144 parts[0], 

145 history[fname][ind].sha256, 

146 ) 

147 else: 

148 history[fname][ind] = PDiffHashes(history[fname][ind].size, 

149 history[fname][ind].sha1, 

150 parts[0], 

151 ) 

152 

153 # Common-case: Either this is the first sequence we read and we 

154 # simply adopt that 

155 if not history_order: 

156 return current_order 

157 # Common-case: The current history perfectly matches the existing, so 

158 # we just stop here. 

159 if current_order == history_order: 

160 return history_order 

161 

162 # Special-case, the histories are not aligned. This "should not happen" 

163 # but has done so in the past due to bugs. Depending on which field is 

164 # out of sync, dak would either self heal or be stuff forever. We 

165 # realign the history to ensure we always end with "self-heal". 

166 # 

167 # Typically, the patches are aligned from the end as we always add a 

168 # patch in the end of the series. 

169 patches_from_the_end = 0 

170 for p1, p2 in zip(reversed(current_order), reversed(history_order)): 

171 if p1 == p2: 

172 patches_from_the_end += 1 

173 else: 

174 break 

175 

176 if not patches_from_the_end: 

177 return None 

178 

179 return current_order[-patches_from_the_end:] 

180 

181 

182class PDiffIndex: 

183 def __init__(self, patches_dir, max=56, merge_pdiffs=False): 

184 self.can_path = None 

185 self._history = {} 

186 self._history_order = [] 

187 self._unmerged_history = {} 

188 self._unmerged_history_order = [] 

189 self._old_merged_patches_prefix = [] 

190 self.max = max 

191 self.patches_dir = patches_dir 

192 self.filesizehashes = None 

193 self.wants_merged_pdiffs = merge_pdiffs 

194 self.has_merged_pdiffs = False 

195 self.index_path = os.path.join(patches_dir, 'Index') 

196 self.read_index_file(self.index_path) 

197 

198 async def generate_and_add_patch_file(self, original_file, new_file_uncompressed, patch_name): 

199 

200 with await open_decompressed(original_file) as oldf: 

201 oldsizehashes = PDiffHashes.from_file(oldf) 

202 

203 with open(new_file_uncompressed, "r") as newf: 

204 newsizehashes = PDiffHashes.from_file(newf) 

205 

206 if newsizehashes == oldsizehashes: 

207 return 

208 

209 if not os.path.isdir(self.patches_dir): 

210 os.mkdir(self.patches_dir) 

211 

212 oldf.seek(0) 

213 patch_path = os.path.join(self.patches_dir, patch_name) 

214 with open("{}.gz".format(patch_path), "wb") as fh: 

215 await asyncio_check_call( 

216 "diff --ed - {} | gzip --rsyncable --no-name -c -9".format(new_file_uncompressed), 

217 shell=True, 

218 stdin=oldf, 

219 stdout=fh 

220 ) 

221 

222 difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path) 

223 

224 self.filesizehashes = newsizehashes 

225 self._unmerged_history[patch_name] = [oldsizehashes, 

226 difsizehashes, 

227 difgzsizehashes, 

228 ] 

229 self._unmerged_history_order.append(patch_name) 

230 

231 if self.has_merged_pdiffs != self.wants_merged_pdiffs: 

232 # Convert patches 

233 if self.wants_merged_pdiffs: 

234 await self._convert_to_merged_patches() 

235 else: 

236 self._convert_to_unmerged() 

237 # Conversion also covers the newly added patch. Accordingly, 

238 # the elif here. 

239 else: 

240 second_patch_name = patch_name 

241 if self.wants_merged_pdiffs: 

242 await self._bump_merged_patches() 

243 second_patch_name = "T-%s-F-%s" % (patch_name, patch_name) 

244 os.link(os.path.join(self.patches_dir, patch_name + ".gz"), 

245 os.path.join(self.patches_dir, second_patch_name + ".gz")) 

246 

247 # Without merged PDiffs, keep _history and _unmerged_history aligned 

248 self._history[second_patch_name] = [oldsizehashes, 

249 difsizehashes, 

250 difgzsizehashes, 

251 ] 

252 self._history_order.append(second_patch_name) 

253 

254 async def _bump_merged_patches(self): 

255 # When bumping patches, we need to "rewrite" all merged patches. As 

256 # neither apt nor dak supports by-hash for pdiffs, we leave the old 

257 # versions of merged pdiffs behind. 

258 target_name = self._unmerged_history_order[-1] 

259 target_path = os.path.join(self.patches_dir, target_name) 

260 

261 new_merged_order = [] 

262 new_merged_history = {} 

263 for old_merged_patch_name in self._history_order: 

264 try: 

265 old_orig_name = old_merged_patch_name.split("-F-", 1)[1] 

266 except IndexError: 

267 old_orig_name = old_merged_patch_name 

268 new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name) 

269 old_merged_patch_path = os.path.join(self.patches_dir, old_merged_patch_name) 

270 new_merged_patch_path = os.path.join(self.patches_dir, new_merged_patch_name) 

271 await _merge_pdiffs(old_merged_patch_path, target_path, new_merged_patch_path) 

272 

273 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(new_merged_patch_path) 

274 

275 new_merged_history[new_merged_patch_name] = [self._history[old_merged_patch_name][0], 

276 hashes_decompressed, 

277 hashes_compressed, 

278 ] 

279 new_merged_order.append(new_merged_patch_name) 

280 

281 self._history_order = new_merged_order 

282 self._history = new_merged_history 

283 

284 self._old_merged_patches_prefix.append(self._unmerged_history_order[-1]) 

285 

286 def _convert_to_unmerged(self): 

287 if not self.has_merged_pdiffs: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true

288 return 

289 # Converting from merged patches to unmerged patches is simply. Discard the merged 

290 # patches. Cleanup will be handled by find_obsolete_patches 

291 self._history = {k: v for k, v in self._unmerged_history.items()} 

292 self._history_order = list(self._unmerged_history_order) 

293 self._old_merged_patches_prefix = [] 

294 self.has_merged_pdiffs = False 

295 

296 async def _convert_to_merged_patches(self): 

297 if self.has_merged_pdiffs: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 return 

299 

300 target_name = self._unmerged_history_order[-1] 

301 

302 self._history = {} 

303 self._history_order = [] 

304 

305 new_patches = [] 

306 

307 # We merge from newest to oldest 

308 # 

309 # Assume we got N unmerged patches (u1 - uN) where given s1 then 

310 # you can apply u1 to get to s2. From s2 you use u2 to move to s3 

311 # and so on until you reach your target T (= sN+1). 

312 # 

313 # In the merged patch world, we want N merged patches called m1-N, 

314 # m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to 

315 # T directly regardless of where you start. 

316 # 

317 # A note worthy special case is that m(N-1)-N is identical uN 

318 # content-wise. This will be important in a moment. For now, 

319 # lets start with looking at creating merged patches. 

320 # 

321 # We can get m1-N by merging u1 with m2-N because u1 will take s1 

322 # to s2 and m2-N will take s2 to T. By the same argument, we get 

323 # generate m2-N by combing u2 with m3-N. Rinse-and-repeat until 

324 # we get to the base-case m(N-1)-N - which is uN. 

325 # 

326 # From this, we can conclude that generating the patches in 

327 # reverse order (i.e. m2-N is generated before m1-N) will get 

328 # us the desired result in N-1 pair-wise merges without having 

329 # to use all patches in one go. (This is also optimal in the 

330 # sense that we need to update N-1 patches to preserve the 

331 # entire history). 

332 # 

333 for patch_name in reversed(self._unmerged_history_order): 

334 merged_patch = "T-%s-F-%s" % (target_name, patch_name) 

335 merged_patch_path = os.path.join(self.patches_dir, merged_patch) 

336 

337 if new_patches: 

338 oldest_patch = os.path.join(self.patches_dir, patch_name) 

339 previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1]) 

340 await _merge_pdiffs(oldest_patch, previous_merged_patch, merged_patch_path) 

341 

342 hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(merged_patch_path) 

343 

344 self._history[merged_patch] = [self._unmerged_history[patch_name][0], 

345 hashes_decompressed, 

346 hashes_compressed, 

347 ] 

348 else: 

349 # Special_case; the latest patch is its own "merged" variant. 

350 os.link(os.path.join(self.patches_dir, patch_name + ".gz"), merged_patch_path + ".gz") 

351 self._history[merged_patch] = self._unmerged_history[patch_name] 

352 

353 new_patches.append(merged_patch) 

354 

355 self._history_order = list(reversed(new_patches)) 

356 self._old_merged_patches_prefix.append(target_name) 

357 self.has_merged_pdiffs = True 

358 

359 def read_index_file(self, index_file_path): 

360 try: 

361 with apt_pkg.TagFile(index_file_path) as index: 

362 index.step() 

363 section = index.section 

364 self.has_merged_pdiffs = section.get('X-Patch-Precedence') == 'merged' 

365 self._old_merged_patches_prefix = section.get('X-DAK-Older-Patches', '').split() 

366 

367 for field in section.keys(): 

368 value = section[field] 

369 if field in HASH_FIELDS_TABLE: 

370 ind, hashind, primary_history = HASH_FIELDS_TABLE[field] 

371 if primary_history: 

372 history = self._history 

373 history_order = self._history_order 

374 else: 

375 history = self._unmerged_history 

376 history_order = self._unmerged_history_order 

377 

378 if history_order is None: 

379 # History is already misaligned and we cannot find a common restore point. 

380 continue 

381 

382 new_order = _read_hashes(history, history_order, ind, hashind, value.splitlines()) 

383 if primary_history: 

384 self._history_order = new_order 

385 else: 

386 self._unmerged_history_order = new_order 

387 continue 

388 

389 if field in ("Canonical-Name", "Canonical-Path"): 

390 self.can_path = value 

391 continue 

392 

393 if field not in ("SHA1-Current", "SHA256-Current"): 

394 continue 

395 

396 l = value.split() 

397 

398 if len(l) != 2: 398 ↛ 399line 398 didn't jump to line 399, because the condition on line 398 was never true

399 continue 

400 

401 if not self.filesizehashes: 

402 self.filesizehashes = PDiffHashes(int(l[1]), None, None) 

403 

404 if field == "SHA1-Current": 

405 self.filesizehashes = PDiffHashes(self.filesizehashes.size, l[0], self.filesizehashes.sha256) 

406 

407 if field == "SHA256-Current": 

408 self.filesizehashes = PDiffHashes(self.filesizehashes.size, self.filesizehashes.sha1, l[0]) 

409 

410 # Ensure that the order lists are defined again. 

411 if self._history_order is None: 

412 self._history_order = [] 

413 if self._unmerged_history_order is None: 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true

414 self._unmerged_history_order = [] 

415 

416 if not self.has_merged_pdiffs: 

417 # When X-Patch-Precedence != merged, then the two histories are the same. 

418 self._unmerged_history = {k: v for k, v in self._history.items()} 

419 self._unmerged_history_order = list(self._history_order) 

420 self._old_merged_patches_prefix = [] 

421 

422 except (OSError, apt_pkg.Error): 

423 # On error, we ignore everything. This causes the file to be regenerated from scratch. 

424 # It forces everyone to download the full file for if they are behind. 

425 # But it is self-healing providing that we generate valid files from here on. 

426 pass 

427 

428 def prune_patch_history(self): 

429 # Truncate our history if necessary 

430 hs = self._history 

431 order = self._history_order 

432 unmerged_hs = self._unmerged_history 

433 unmerged_order = self._unmerged_history_order 

434 self._history_order = _prune_history(order, hs, self.max) 

435 self._unmerged_history_order = _prune_history(unmerged_order, unmerged_hs, self.max) 

436 

437 prefix_cnt = len(self._old_merged_patches_prefix) 

438 if prefix_cnt > 3: 

439 self._old_merged_patches_prefix = self._old_merged_patches_prefix[prefix_cnt - 3:] 

440 

441 def find_obsolete_patches(self): 

442 if not os.path.isdir(self.patches_dir): 

443 return 

444 

445 hs = self._history 

446 unmerged_hs = self._unmerged_history 

447 

448 keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix) 

449 

450 # Scan for obsolete patches. While we could have computed these 

451 # from the history, this method has the advantage of cleaning up 

452 # old patches left that we failed to remove previously (e.g. if 

453 # we had an index corruption, which happened in fed7ada36b609 and 

454 # was later fixed in a36f867acf029) 

455 for name in os.listdir(self.patches_dir): 

456 if name in ('Index', 'by-hash'): 

457 continue 

458 # We keep some old merged patches around (as neither apt nor 

459 # dak supports by-hash for pdiffs) 

460 if keep_prefixes and name.startswith(keep_prefixes): 

461 continue 

462 basename, ext = os.path.splitext(name) 

463 if ext in ('', '.gz') and (basename in hs or basename in unmerged_hs): 

464 continue 

465 path = os.path.join(self.patches_dir, name) 

466 if not os.path.isfile(path): 466 ↛ 468line 466 didn't jump to line 468, because the condition on line 466 was never true

467 # Non-files are probably not patches. 

468 continue 

469 # Unknown patch file; flag it as obsolete 

470 yield path 

471 

472 def dump(self, out=sys.stdout): 

473 if self.can_path: 

474 out.write("Canonical-Path: %s\n" % self.can_path) 

475 

476 if self.filesizehashes: 476 ↛ 482line 476 didn't jump to line 482, because the condition on line 476 was never false

477 if self.filesizehashes.sha1: 477 ↛ 479line 477 didn't jump to line 479, because the condition on line 477 was never false

478 out.write("SHA1-Current: %s %7d\n" % (self.filesizehashes.sha1, self.filesizehashes.size)) 

479 if self.filesizehashes.sha256: 479 ↛ 482line 479 didn't jump to line 482, because the condition on line 479 was never false

480 out.write("SHA256-Current: %s %7d\n" % (self.filesizehashes.sha256, self.filesizehashes.size)) 

481 

482 for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS: 

483 

484 if primary_history: 

485 hs = self._history 

486 order = self._history_order 

487 elif self.has_merged_pdiffs: 487 ↛ 491line 487 didn't jump to line 491, because the condition on line 487 was never false

488 hs = self._unmerged_history 

489 order = self._unmerged_history_order 

490 else: 

491 continue 

492 

493 out.write("%s:\n" % fieldname) 

494 for h in order: 

495 if hs[h][ind] and hs[h][ind][hashind]: 495 ↛ 494line 495 didn't jump to line 494, because the condition on line 495 was never false

496 out.write(" %s %7d %s%s\n" % (hs[h][ind][hashind], hs[h][ind].size, h, ext)) 

497 

498 if self.has_merged_pdiffs: 

499 out.write("X-Patch-Precedence: merged\n") 

500 if self._old_merged_patches_prefix: 500 ↛ exitline 500 didn't return from function 'dump', because the condition on line 500 was never false

501 out.write("X-DAK-Older-Patches: %s\n" % " ".join(self._old_merged_patches_prefix)) 

502 

503 def update_index(self, tmp_suffix=".new"): 

504 if not os.path.isdir(self.patches_dir): 

505 # If there is no patch directory, then we have no patches. 

506 # It seems weird to have an Index of patches when we know there are 

507 # none. 

508 return 

509 tmp_path = self.index_path + tmp_suffix 

510 with open(tmp_path, "w") as f: 

511 self.dump(f) 

512 os.rename(tmp_path, self.index_path)