import asyncio
import collections
import os
import subprocess
import sys
import tempfile
import apt_pkg
from daklib.dakapt import DakHashes
HASH_FIELDS = [
('SHA1-History', 0, 1, "", True),
('SHA256-History', 0, 2, "", True),
('SHA1-Patches', 1, 1, "", True),
('SHA256-Patches', 1, 2, "", True),
('SHA1-Download', 2, 1, ".gz", True),
('SHA256-Download', 2, 2, ".gz", True),
('X-Unmerged-SHA1-History', 0, 1, "", False),
('X-Unmerged-SHA256-History', 0, 2, "", False),
('X-Unmerged-SHA1-Patches', 1, 1, "", False),
('X-Unmerged-SHA256-Patches', 1, 2, "", False),
('X-Unmerged-SHA1-Download', 2, 1, ".gz", False),
('X-Unmerged-SHA256-Download', 2, 2, ".gz", False),
]
HASH_FIELDS_TABLE = {x[0]: (x[1], x[2], x[4]) for x in HASH_FIELDS}
_PDiffHashes = collections.namedtuple('_PDiffHashes', ['size', 'sha1', 'sha256'])
[docs]async def asyncio_check_call(*args, **kwargs):
"""async variant of subprocess.check_call
Parameters reflect that of asyncio.create_subprocess_exec or
(if "shell=True") that of asyncio.create_subprocess_shell
with restore_signals=True being the default.
"""
kwargs.setdefault('restore_signals', True)
shell = kwargs.pop('shell', False)
if shell:
proc = await asyncio.create_subprocess_shell(*args, **kwargs)
else:
proc = await asyncio.create_subprocess_exec(*args, **kwargs)
retcode = await proc.wait()
if retcode != 0:
raise subprocess.CalledProcessError(retcode, args[0])
return 0
[docs]async def open_decompressed(file, named_temp_file=False):
async def call_decompressor(cmd, inpath):
fh = tempfile.NamedTemporaryFile("w+") if named_temp_file \
else tempfile.TemporaryFile("w+")
with open(inpath, "rb") as rfd:
await asyncio_check_call(
*cmd,
stdin=rfd,
stdout=fh,
)
fh.seek(0)
return fh
if os.path.isfile(file):
return open(file, "r")
elif os.path.isfile("%s.gz" % file):
return await call_decompressor(['zcat'], '{}.gz'.format(file))
elif os.path.isfile("%s.bz2" % file):
return await call_decompressor(['bzcat'], '{}.bz2'.format(file))
elif os.path.isfile("%s.xz" % file):
return await call_decompressor(['xzcat'], '{}.xz'.format(file))
elif os.path.isfile(f"{file}.zst"):
return await call_decompressor(['zstdcat'], f'{file}.zst')
else:
return None
[docs]async def _merge_pdiffs(patch_a, patch_b, resulting_patch_without_extension):
"""Merge two pdiff in to a merged pdiff
While rred support merging more than 2, we only need support for merging two.
In the steady state, we will have N merged patches plus 1 new patch. Here
we need to do N pairwise merges (i.e. merge two patches N times).
Therefore, supporting merging of 3+ patches does not help at all.
The setup state looks like it could do with a bulk merging. However, if you
merge from "latest to earliest" then you will be building in optimal order
and still only need to do N-1 pairwise merges (rather than N-1 merges
between N, N-1, N-2, ... 3, 2 patches).
Combined, supporting pairwise merges is sufficient for our use case.
"""
with await open_decompressed(patch_a, named_temp_file=True) as fd_a, \
await open_decompressed(patch_b, named_temp_file=True) as fd_b:
await asyncio_check_call(
'/usr/lib/apt/methods/rred %s %s | gzip -9n > %s' % (fd_a.name, fd_b.name,
resulting_patch_without_extension + ".gz"),
shell=True,
)
[docs]class PDiffHashes(_PDiffHashes):
[docs] @classmethod
def from_file(cls, fd):
size = os.fstat(fd.fileno())[6]
hashes = DakHashes(fd)
return cls(size, hashes.sha1, hashes.sha256)
[docs]async def _pdiff_hashes_from_patch(path_without_extension):
with await open_decompressed(path_without_extension) as difff:
hashes_decompressed = PDiffHashes.from_file(difff)
with open(path_without_extension + ".gz", "r") as difffgz:
hashes_compressed = PDiffHashes.from_file(difffgz)
return hashes_decompressed, hashes_compressed
[docs]def _prune_history(order, history, maximum):
cnt = len(order)
if cnt <= maximum:
return order
for h in order[:cnt - maximum]:
del history[h]
return order[cnt - maximum:]
[docs]def _read_hashes(history, history_order, ind, hashind, lines):
current_order = []
for line in lines:
parts = line.split()
fname = parts[2]
if fname.endswith('.gz'):
fname = fname[:-3]
current_order.append(fname)
if fname not in history:
history[fname] = [None, None, None]
if not history[fname][ind]:
history[fname][ind] = PDiffHashes(int(parts[1]), None, None)
if hashind == 1:
history[fname][ind] = PDiffHashes(history[fname][ind].size,
parts[0],
history[fname][ind].sha256,
)
else:
history[fname][ind] = PDiffHashes(history[fname][ind].size,
history[fname][ind].sha1,
parts[0],
)
# Common-case: Either this is the first sequence we read and we
# simply adopt that
if not history_order:
return current_order
# Common-case: The current history perfectly matches the existing, so
# we just stop here.
if current_order == history_order:
return history_order
# Special-case, the histories are not aligned. This "should not happen"
# but has done so in the past due to bugs. Depending on which field is
# out of sync, dak would either self heal or be stuff forever. We
# realign the history to ensure we always end with "self-heal".
#
# Typically, the patches are aligned from the end as we always add a
# patch in the end of the series.
patches_from_the_end = 0
for p1, p2 in zip(reversed(current_order), reversed(history_order)):
if p1 == p2:
patches_from_the_end += 1
else:
break
if not patches_from_the_end:
return None
return current_order[-patches_from_the_end:]
[docs]class PDiffIndex:
def __init__(self, patches_dir, max=56, merge_pdiffs=False):
self.can_path = None
self._history = {}
self._history_order = []
self._unmerged_history = {}
self._unmerged_history_order = []
self._old_merged_patches_prefix = []
self.max = max
self.patches_dir = patches_dir
self.filesizehashes = None
self.wants_merged_pdiffs = merge_pdiffs
self.has_merged_pdiffs = False
self.index_path = os.path.join(patches_dir, 'Index')
self.read_index_file(self.index_path)
[docs] async def generate_and_add_patch_file(self, original_file, new_file_uncompressed, patch_name):
with await open_decompressed(original_file) as oldf:
oldsizehashes = PDiffHashes.from_file(oldf)
with open(new_file_uncompressed, "r") as newf:
newsizehashes = PDiffHashes.from_file(newf)
if newsizehashes == oldsizehashes:
return
if not os.path.isdir(self.patches_dir):
os.mkdir(self.patches_dir)
oldf.seek(0)
patch_path = os.path.join(self.patches_dir, patch_name)
with open("{}.gz".format(patch_path), "wb") as fh:
await asyncio_check_call(
"diff --ed - {} | gzip --rsyncable --no-name -c -9".format(new_file_uncompressed),
shell=True,
stdin=oldf,
stdout=fh
)
difsizehashes, difgzsizehashes = await _pdiff_hashes_from_patch(patch_path)
self.filesizehashes = newsizehashes
self._unmerged_history[patch_name] = [oldsizehashes,
difsizehashes,
difgzsizehashes,
]
self._unmerged_history_order.append(patch_name)
if self.has_merged_pdiffs != self.wants_merged_pdiffs:
# Convert patches
if self.wants_merged_pdiffs:
await self._convert_to_merged_patches()
else:
self._convert_to_unmerged()
# Conversion also covers the newly added patch. Accordingly,
# the elif here.
else:
second_patch_name = patch_name
if self.wants_merged_pdiffs:
await self._bump_merged_patches()
second_patch_name = "T-%s-F-%s" % (patch_name, patch_name)
os.link(os.path.join(self.patches_dir, patch_name + ".gz"),
os.path.join(self.patches_dir, second_patch_name + ".gz"))
# Without merged PDiffs, keep _history and _unmerged_history aligned
self._history[second_patch_name] = [oldsizehashes,
difsizehashes,
difgzsizehashes,
]
self._history_order.append(second_patch_name)
[docs] async def _bump_merged_patches(self):
# When bumping patches, we need to "rewrite" all merged patches. As
# neither apt nor dak supports by-hash for pdiffs, we leave the old
# versions of merged pdiffs behind.
target_name = self._unmerged_history_order[-1]
target_path = os.path.join(self.patches_dir, target_name)
new_merged_order = []
new_merged_history = {}
for old_merged_patch_name in self._history_order:
try:
old_orig_name = old_merged_patch_name.split("-F-", 1)[1]
except IndexError:
old_orig_name = old_merged_patch_name
new_merged_patch_name = "T-%s-F-%s" % (target_name, old_orig_name)
old_merged_patch_path = os.path.join(self.patches_dir, old_merged_patch_name)
new_merged_patch_path = os.path.join(self.patches_dir, new_merged_patch_name)
await _merge_pdiffs(old_merged_patch_path, target_path, new_merged_patch_path)
hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(new_merged_patch_path)
new_merged_history[new_merged_patch_name] = [self._history[old_merged_patch_name][0],
hashes_decompressed,
hashes_compressed,
]
new_merged_order.append(new_merged_patch_name)
self._history_order = new_merged_order
self._history = new_merged_history
self._old_merged_patches_prefix.append(self._unmerged_history_order[-1])
[docs] def _convert_to_unmerged(self):
if not self.has_merged_pdiffs:
return
# Converting from merged patches to unmerged patches is simply. Discard the merged
# patches. Cleanup will be handled by find_obsolete_patches
self._history = {k: v for k, v in self._unmerged_history.items()}
self._history_order = list(self._unmerged_history_order)
self._old_merged_patches_prefix = []
self.has_merged_pdiffs = False
[docs] async def _convert_to_merged_patches(self):
if self.has_merged_pdiffs:
return
target_name = self._unmerged_history_order[-1]
self._history = {}
self._history_order = []
new_patches = []
# We merge from newest to oldest
#
# Assume we got N unmerged patches (u1 - uN) where given s1 then
# you can apply u1 to get to s2. From s2 you use u2 to move to s3
# and so on until you reach your target T (= sN+1).
#
# In the merged patch world, we want N merged patches called m1-N,
# m2-N, m3-N ... m(N-1)-N. Here, the you use sX + mX-N to go to
# T directly regardless of where you start.
#
# A note worthy special case is that m(N-1)-N is identical uN
# content-wise. This will be important in a moment. For now,
# lets start with looking at creating merged patches.
#
# We can get m1-N by merging u1 with m2-N because u1 will take s1
# to s2 and m2-N will take s2 to T. By the same argument, we get
# generate m2-N by combing u2 with m3-N. Rinse-and-repeat until
# we get to the base-case m(N-1)-N - which is uN.
#
# From this, we can conclude that generating the patches in
# reverse order (i.e. m2-N is generated before m1-N) will get
# us the desired result in N-1 pair-wise merges without having
# to use all patches in one go. (This is also optimal in the
# sense that we need to update N-1 patches to preserve the
# entire history).
#
for patch_name in reversed(self._unmerged_history_order):
merged_patch = "T-%s-F-%s" % (target_name, patch_name)
merged_patch_path = os.path.join(self.patches_dir, merged_patch)
if new_patches:
oldest_patch = os.path.join(self.patches_dir, patch_name)
previous_merged_patch = os.path.join(self.patches_dir, new_patches[-1])
await _merge_pdiffs(oldest_patch, previous_merged_patch, merged_patch_path)
hashes_decompressed, hashes_compressed = await _pdiff_hashes_from_patch(merged_patch_path)
self._history[merged_patch] = [self._unmerged_history[patch_name][0],
hashes_decompressed,
hashes_compressed,
]
else:
# Special_case; the latest patch is its own "merged" variant.
os.link(os.path.join(self.patches_dir, patch_name + ".gz"), merged_patch_path + ".gz")
self._history[merged_patch] = self._unmerged_history[patch_name]
new_patches.append(merged_patch)
self._history_order = list(reversed(new_patches))
self._old_merged_patches_prefix.append(target_name)
self.has_merged_pdiffs = True
[docs] def read_index_file(self, index_file_path):
try:
with apt_pkg.TagFile(index_file_path) as index:
index.step()
section = index.section
self.has_merged_pdiffs = section.get('X-Patch-Precedence') == 'merged'
self._old_merged_patches_prefix = section.get('X-DAK-Older-Patches', '').split()
for field in section.keys():
value = section[field]
if field in HASH_FIELDS_TABLE:
ind, hashind, primary_history = HASH_FIELDS_TABLE[field]
if primary_history:
history = self._history
history_order = self._history_order
else:
history = self._unmerged_history
history_order = self._unmerged_history_order
if history_order is None:
# History is already misaligned and we cannot find a common restore point.
continue
new_order = _read_hashes(history, history_order, ind, hashind, value.splitlines())
if primary_history:
self._history_order = new_order
else:
self._unmerged_history_order = new_order
continue
if field in ("Canonical-Name", "Canonical-Path"):
self.can_path = value
continue
if field not in ("SHA1-Current", "SHA256-Current"):
continue
l = value.split()
if len(l) != 2:
continue
if not self.filesizehashes:
self.filesizehashes = PDiffHashes(int(l[1]), None, None)
if field == "SHA1-Current":
self.filesizehashes = PDiffHashes(self.filesizehashes.size, l[0], self.filesizehashes.sha256)
if field == "SHA256-Current":
self.filesizehashes = PDiffHashes(self.filesizehashes.size, self.filesizehashes.sha1, l[0])
# Ensure that the order lists are defined again.
if self._history_order is None:
self._history_order = []
if self._unmerged_history_order is None:
self._unmerged_history_order = []
if not self.has_merged_pdiffs:
# When X-Patch-Precedence != merged, then the two histories are the same.
self._unmerged_history = {k: v for k, v in self._history.items()}
self._unmerged_history_order = list(self._history_order)
self._old_merged_patches_prefix = []
except (OSError, apt_pkg.Error):
# On error, we ignore everything. This causes the file to be regenerated from scratch.
# It forces everyone to download the full file for if they are behind.
# But it is self-healing providing that we generate valid files from here on.
pass
[docs] def prune_patch_history(self):
# Truncate our history if necessary
hs = self._history
order = self._history_order
unmerged_hs = self._unmerged_history
unmerged_order = self._unmerged_history_order
self._history_order = _prune_history(order, hs, self.max)
self._unmerged_history_order = _prune_history(unmerged_order, unmerged_hs, self.max)
prefix_cnt = len(self._old_merged_patches_prefix)
if prefix_cnt > 3:
self._old_merged_patches_prefix = self._old_merged_patches_prefix[prefix_cnt - 3:]
[docs] def find_obsolete_patches(self):
if not os.path.isdir(self.patches_dir):
return
hs = self._history
unmerged_hs = self._unmerged_history
keep_prefixes = tuple("T-%s-F-" % x for x in self._old_merged_patches_prefix)
# Scan for obsolete patches. While we could have computed these
# from the history, this method has the advantage of cleaning up
# old patches left that we failed to remove previously (e.g. if
# we had an index corruption, which happened in fed7ada36b609 and
# was later fixed in a36f867acf029)
for name in os.listdir(self.patches_dir):
if name in ('Index', 'by-hash'):
continue
# We keep some old merged patches around (as neither apt nor
# dak supports by-hash for pdiffs)
if keep_prefixes and name.startswith(keep_prefixes):
continue
basename, ext = os.path.splitext(name)
if ext in ('', '.gz') and (basename in hs or basename in unmerged_hs):
continue
path = os.path.join(self.patches_dir, name)
if not os.path.isfile(path):
# Non-files are probably not patches.
continue
# Unknown patch file; flag it as obsolete
yield path
[docs] def dump(self, out=sys.stdout):
if self.can_path:
out.write("Canonical-Path: %s\n" % self.can_path)
if self.filesizehashes:
if self.filesizehashes.sha1:
out.write("SHA1-Current: %s %7d\n" % (self.filesizehashes.sha1, self.filesizehashes.size))
if self.filesizehashes.sha256:
out.write("SHA256-Current: %s %7d\n" % (self.filesizehashes.sha256, self.filesizehashes.size))
for fieldname, ind, hashind, ext, primary_history in HASH_FIELDS:
if primary_history:
hs = self._history
order = self._history_order
elif self.has_merged_pdiffs:
hs = self._unmerged_history
order = self._unmerged_history_order
else:
continue
out.write("%s:\n" % fieldname)
for h in order:
if hs[h][ind] and hs[h][ind][hashind]:
out.write(" %s %7d %s%s\n" % (hs[h][ind][hashind], hs[h][ind].size, h, ext))
if self.has_merged_pdiffs:
out.write("X-Patch-Precedence: merged\n")
if self._old_merged_patches_prefix:
out.write("X-DAK-Older-Patches: %s\n" % " ".join(self._old_merged_patches_prefix))
[docs] def update_index(self, tmp_suffix=".new"):
if not os.path.isdir(self.patches_dir):
# If there is no patch directory, then we have no patches.
# It seems weird to have an Index of patches when we know there are
# none.
return
tmp_path = self.index_path + tmp_suffix
with open(tmp_path, "w") as f:
self.dump(f)
os.rename(tmp_path, self.index_path)