From 2be1effef00ff905710143357b02608b1dbeb23d Mon Sep 17 00:00:00 2001 From: John Readey Date: Wed, 6 Nov 2024 20:20:06 -0600 Subject: [PATCH] hsload fix for large contiguous datasets (#231) * hsload fix for large contiguous datasets * remove commented out code block --- h5pyd/_apps/hsload.py | 8 +- h5pyd/_apps/utillib.py | 161 ++++++++++++++++++++++++++++++++++++----- h5pyd/_hl/dataset.py | 2 +- h5pyd/_hl/filters.py | 13 ++-- 4 files changed, 155 insertions(+), 29 deletions(-) diff --git a/h5pyd/_apps/hsload.py b/h5pyd/_apps/hsload.py index 0abf718..20f3e88 100755 --- a/h5pyd/_apps/hsload.py +++ b/h5pyd/_apps/hsload.py @@ -23,12 +23,18 @@ sys.exit(1) try: + # this package is used when reading HDF5 files on S3 import s3fs - S3FS_IMPORT = True except ImportError: S3FS_IMPORT = False +try: + # this package is useful for HDF5 compresssed with blosc, etc. + import hdf5plugin +except ImportError: + pass + if __name__ == "__main__": from config import Config from utillib import load_file, load_h5image diff --git a/h5pyd/_apps/utillib.py b/h5pyd/_apps/utillib.py index 8ccc098..7acd3f1 100755 --- a/h5pyd/_apps/utillib.py +++ b/h5pyd/_apps/utillib.py @@ -26,7 +26,8 @@ # adjust chunk shape to fit between min and max chunk sizes when possible MIN_CHUNK_SIZE = 1 * 1024 * 1024 -MAC_CHUNK_SIZE = 8 * 1024 * 1024 +MAX_CHUNK_SIZE = 8 * 1024 * 1024 +CHUNK_BASE = 64 * 1024 # Multiplier by which chunks are adjusted H5Z_FILTER_MAP = { 32001: "blosclz", @@ -235,6 +236,139 @@ def convert_dtype(srcdt, ctx): return tgt_dt +def guess_chunk(shape, typesize): + """ Guess an appropriate chunk layout for a dataset, given its shape and + the size of each element in bytes. Will allocate chunks only as large + as MAX_SIZE. Chunks are generally close to some power-of-2 fraction of + each axis, slightly favoring bigger values for the last index. + + Undocumented and subject to change without warning. + """ + + # For unlimited dimensions we have to guess 1024 + shape = tuple((x if x != 0 else 1024) for i, x in enumerate(shape)) + + ndims = len(shape) + if ndims == 0: + raise ValueError("Chunks not allowed for scalar datasets.") + + chunks = np.array(shape, dtype='=f8') + if not np.all(np.isfinite(chunks)): + raise ValueError("Illegal value in chunk tuple") + + # Determine the optimal chunk size in bytes using a PyTables expression. + # This is kept as a float. + dset_size = np.prod(chunks) * typesize + target_size = CHUNK_BASE * (2 ** np.log10(dset_size / (1024. * 1024))) + + if target_size > MIN_CHUNK_SIZE: + target_size = MAX_CHUNK_SIZE + elif target_size < MIN_CHUNK_SIZE: + target_size = MIN_CHUNK_SIZE + + idx = 0 + while True: + # Repeatedly loop over the axes, dividing them by 2. Stop when: + # 1a. We're smaller than the target chunk size, OR + # 1b. We're within 50% of the target chunk size, AND + # 2. The chunk is smaller than the maximum chunk size + + chunk_bytes = np.prod(chunks) * typesize + + if (chunk_bytes < target_size or abs(chunk_bytes - target_size) / target_size < 0.5) and \ + chunk_bytes < MAX_CHUNK_SIZE: + break + + if np.prod(chunks) == 1: + break # Element size larger than CHUNK_MAX + chunks[idx % ndims] = np.ceil(chunks[idx % ndims] / 2.0) + idx += 1 + + return tuple(int(x) for x in chunks) + + +class ChunkIterator(object): + """ + Class to iterate through list of chunks of a given dataset + """ + + def __init__(self, dset, source_sel=None): + self._shape = dset.shape + rank = len(dset.shape) + + if not dset.chunks: + # coniguous layout - create some psuedo-chunks so we do't + # try to read to much data in one selection + self._layout = guess_chunk(dset.shape, dset.dtype.itemsize) + elif isinstance(dset.chunks, dict): + self._layout = dset.chunks["dims"] + else: + self._layout = dset.chunks + + if source_sel is None: + # select over entire dataset + slices = [] + for dim in range(rank): + slices.append(slice(0, self._shape[dim])) + self._sel = tuple(slices) + else: + if isinstance(source_sel, slice): + self._sel = (source_sel,) + else: + self._sel = source_sel + if len(self._sel) != rank: + raise ValueError( + "Invalid selection - selection region must have same rank as dataset" + ) + self._chunk_index = [] + for dim in range(rank): + s = self._sel[dim] + if s.start < 0 or s.stop > self._shape[dim] or s.stop <= s.start: + msg = "Invalid selection - selection region must be within dataset space" + raise ValueError(msg) + index = s.start // self._layout[dim] + self._chunk_index.append(index) + + def __iter__(self): + return self + + def __next__(self): + rank = len(self._shape) + slices = [] + if rank == 0 or self._chunk_index[0] * self._layout[0] >= self._sel[0].stop: + # ran past the last chunk, end iteration + raise StopIteration() + + for dim in range(rank): + s = self._sel[dim] + start = self._chunk_index[dim] * self._layout[dim] + stop = (self._chunk_index[dim] + 1) * self._layout[dim] + # adjust the start if this is an edge chunk + if start < s.start: + start = s.start + if stop > s.stop: + stop = s.stop # trim to end of the selection + s = slice(start, stop, 1) + slices.append(s) + + # bump up the last index and carry forward if we run outside the selection + dim = rank - 1 + while dim >= 0: + s = self._sel[dim] + self._chunk_index[dim] += 1 + + chunk_end = self._chunk_index[dim] * self._layout[dim] + if chunk_end < s.stop: + # we still have room to extend along this dimensions + return tuple(slices) + + if dim > 0: + # reset to the start and continue iterating with higher dimension + self._chunk_index[dim] = 0 + dim -= 1 + return tuple(slices) + + # ---------------------------------------------------------------------------------- def copy_element(val, src_dt, tgt_dt, ctx): msg = f"copy_element, val: {val} " @@ -1262,8 +1396,12 @@ def create_dataset(dobj, ctx): # supported non-standard compressor kwargs["compression"] = filter_name logging.info(f"using compressor: {filter_name} for {dobj.name}") - kwargs["compression_opts"] = filter_opts - logging.info(f"compression_opts: {filter_opts}") + if isinstance(filter_opts, int) or isinstance(filter_opts, dict): + kwargs["compression_opts"] = filter_opts + logging.info(f"compression_opts: {filter_opts}") + else: + msg = f"ignoring compression_opts for filter: {filter_name}" + logging.warn(msg) else: logging.warning(f"filter id {filter_id} for {dobj.name} not supported") @@ -1402,26 +1540,11 @@ def write_dataset(src, tgt, ctx): if ctx["verbose"]: print(msg) try: - if src.chunks is None: - # contiguous dataset, fake an iterator by creating a list - # with one slice - slices = [] - for dim in range(rank): - extent = src.shape[dim] - s = slice(0, extent, 1) - slices.append(s) - slices = tuple(slices) - if rank == 1: - it = [slices[0],] - else: - it = [slices,] - else: - it = src.iter_chunks() - logging.debug(f"src dtype: {src.dtype}") logging.debug(f"des dtype: {tgt.dtype}") empty_arr = None + it = ChunkIterator(src) for src_s in it: logging.debug(f"src selection: {src_s}") if rank == 1 and isinstance(src_s, slice): diff --git a/h5pyd/_hl/dataset.py b/h5pyd/_hl/dataset.py index 7958fa6..517ea8b 100644 --- a/h5pyd/_hl/dataset.py +++ b/h5pyd/_hl/dataset.py @@ -265,7 +265,7 @@ def make_new_dset( maxshape = tuple(m if m is not None else 0 for m in maxshape) body["maxdims"] = maxshape else: - print("Warning: maxshape provided but no shape") + print("maxshape provided but no shape") req = "/datasets" diff --git a/h5pyd/_hl/filters.py b/h5pyd/_hl/filters.py index 413bc4f..bcfb6b7 100644 --- a/h5pyd/_hl/filters.py +++ b/h5pyd/_hl/filters.py @@ -255,14 +255,11 @@ def rq_tuple(tpl, name): for k in compression_opts: filter[k] = compression_opts[k] else: - if compression_opts in range(10): - level = compression_opts - else: - raise ValueError( - "compression setting must be a dict or integer from 0-9, not {}".format( - compression_opts - ) - ) + if compression_opts not in range(10): + msg = "compression setting must be a dict or integer from 0-9, " + msg += f"not {compression_opts}" + raise ValueError(msg) + level = compression_opts filter["level"] = level filters.append(filter)