From b01e9c6ab3d02494edd267bb8e9a7983e62d319a Mon Sep 17 00:00:00 2001 From: John Readey Date: Thu, 22 Aug 2024 10:12:09 -0500 Subject: [PATCH] updates for hdf5 file images datasets --- h5pyd/__init__.py | 2 +- h5pyd/_apps/hsload.py | 56 +++++++----- h5pyd/_apps/utillib.py | 199 ++++++++++++++++++++++++++++++++--------- h5pyd/_hl/files.py | 113 ++++++++++++++++++++++- 4 files changed, 304 insertions(+), 66 deletions(-) diff --git a/h5pyd/__init__.py b/h5pyd/__init__.py index 0474e1e..732a15a 100644 --- a/h5pyd/__init__.py +++ b/h5pyd/__init__.py @@ -18,7 +18,7 @@ from ._hl.h5type import vlen_dtype, string_dtype, enum_dtype from ._hl.h5type import check_vlen_dtype, check_string_dtype, check_enum_dtype from ._hl.h5type import check_opaque_dtype, check_ref_dtype, check_dtype -from ._hl.files import File, is_hdf5 +from ._hl.files import File, H5Image, is_hdf5 from ._hl.folders import Folder from ._hl.group import Group, SoftLink, ExternalLink, UserDefinedLink, HardLink from ._hl.dataset import Dataset, MultiManager diff --git a/h5pyd/_apps/hsload.py b/h5pyd/_apps/hsload.py index f56ea78..a437a80 100755 --- a/h5pyd/_apps/hsload.py +++ b/h5pyd/_apps/hsload.py @@ -31,10 +31,10 @@ if __name__ == "__main__": from config import Config - from utillib import load_file + from utillib import load_file, load_h5image else: from .config import Config - from .utillib import load_file + from .utillib import load_file, load_h5image from urllib.parse import urlparse @@ -70,7 +70,7 @@ def usage(): if help_msg: print(f" {help_msg}") print("") - print("Note about --link option:") + print("Note about --link and --fastlink option:") print(" --link enables just the source HDF5 metadata to be ingested while the dataset data") print(" is left in the original file and fetched as needed.") print(" When used with files stored in AWS S3, the source file can be specified using the S3") @@ -79,6 +79,8 @@ def usage(): print(" For Posix or Azure deployments, the source file needs to be copied to a regular file,") print(" and the --linkpath option should be used to specifiy the Azure container name and path, or ") print(" (for HSDS deployed with POSIX) the file path relative to the server ROOT_DIR") + print(" --fastlink works like --link except metadata about chunk locations will be determined as") + print(" as needed. This will lesen the time needed for hsload for files containing many chunks.") print("") print(cfg.get_see_also(cmd)) print("") @@ -110,6 +112,7 @@ def main(): help="Link to dataset data without initializing chunk locations (will be set server side)") cfg.setitem("linkpath", None, flags=["--linkpath",], choices=["PATH_URI",], help="Use the given URI for the link references rather than the src path") + cfg.setitem("h5image", False, flags=["--h5image"], help="store as hdf5 file image") cfg.setitem("compression", None, flags=["--compression",], choices=COMPRESSION_FILTERS, help="use the given compression algorithm for -z option (lz4 is default)") cfg.setitem("ignorefilters", False, flags=["--ignore-filters"], help="ignore any filters used by source dataset") @@ -164,9 +167,7 @@ def main(): if cfg["link"]: logging.info("checking libversion") - if ( - h5py.version.version_tuple.major == 2 and h5py.version.version_tuple.minor < 10 - ): + if (h5py.version.version_tuple.major == 2 and h5py.version.version_tuple.minor < 10): abort("link option requires h5py version 2.10 or higher") if h5py.version.hdf5_version_tuple < (1, 10, 6): @@ -225,7 +226,10 @@ def main(): else: s3path = None try: - fin = h5py.File(src_file, mode="r") + if cfg["h5image"]: + fin = open(src_file, "rb") # open the file image + else: + fin = h5py.File(src_file) except IOError as ioe: abort(f"Error opening file {src_file}: {ioe}") @@ -288,21 +292,29 @@ def main(): else: compression_opts = None - # do the actual load - kwargs = { - "verbose": cfg["verbose"], - "dataload": dataload, - "s3path": s3path, - "compression": compression, - "compression_opts": compression_opts, - "ignorefilters": cfg["ignorefilters"], - "append": cfg["append"], - "extend_dim": cfg["extend_dim"], - "extend_offset": cfg["extend_offset"], - "ignore_error": cfg["ignore_error"], - "no_clobber": no_clobber - } - load_file(fin, fout, **kwargs) + if cfg["h5image"]: + kwargs = { + "verbose": cfg["verbose"], + "dataload": dataload, + "s3path": s3path, + } + load_h5image(fin, fout, **kwargs) + else: + # regular load to shared data format + kwargs = { + "verbose": cfg["verbose"], + "dataload": dataload, + "s3path": s3path, + "compression": compression, + "compression_opts": compression_opts, + "ignorefilters": cfg["ignorefilters"], + "append": cfg["append"], + "extend_dim": cfg["extend_dim"], + "extend_offset": cfg["extend_offset"], + "ignore_error": cfg["ignore_error"], + "no_clobber": no_clobber, + } + load_file(fin, fout, **kwargs) msg = f"File {src_file} uploaded to domain: {tgt}" logging.info(msg) diff --git a/h5pyd/_apps/utillib.py b/h5pyd/_apps/utillib.py index ea7262f..dd0b966 100755 --- a/h5pyd/_apps/utillib.py +++ b/h5pyd/_apps/utillib.py @@ -145,13 +145,38 @@ def get_fillvalue(dset): def is_compact(dset): - if isinstance(dset.id.id, str): + if is_h5py(dset): + cpl = dset.id.get_create_plist() + if cpl.get_layout() == h5py.h5d.COMPACT: + return True + else: + return False + else: return False # compact storage not used with HSDS - cpl = dset.id.get_create_plist() - if cpl.get_layout() == h5py.h5d.COMPACT: - return True + + +# ---------------------------------------------------------------------------------- +def get_chunk_layout(dset): + if is_h5py(dset): + msg = "get_chunk_layout called on hdf5 dataset" + logging.error(msg) + raise IOError(msg) + dset_json = dset.id.dcpl_json + if "layout" not in dset_json: + msg = f"expect to find layout key in dset_json: {dset_json}" + logging.error(msg) + raise IOError(msg) + layout = dset_json["layout"] + logging.debug(f"got chunk layout for dset id: {dset.id.id}: {layout}") + return layout + + +def get_chunk_layout_class(dset): + layout_json = get_chunk_layout(dset) + if layout_json and "class" in layout_json: + return layout_json["class"] else: - return False + return None def convert_dtype(srcdt, ctx): @@ -424,22 +449,6 @@ def resize_dataset(dset, extent, axis=0): raise -# ---------------------------------------------------------------------------------- -def get_chunk_layout(dset): - if is_h5py(dset): - msg = "get_chunk_layout called on hdf5 dataset" - logging.error(msg) - raise IOError(msg) - dset_json = dset.id.dcpl_json - if "layout" not in dset_json: - msg = f"expect to find layout key in dset_json: {dset_json}" - logging.error(msg) - raise IOError(msg) - layout = dset_json["layout"] - logging.debug(f"got chunk layout for dset id: {dset.id.id}: {layout}") - return layout - - # ---------------------------------------------------------------------------------- def get_chunk_dims(dset): if dset.chunks: @@ -789,19 +798,82 @@ def create_chunktable(dset, dset_dims, ctx): return chunks +# ---------------------------------------------------------------------------------- +def create_h5image_chunktable(num_bytes, s3path, dataload, fout): + logging.debug(f"create_h5image_chunktable({num_bytes}, {s3path}") + CHUNK_SIZE = 2 * 1024 * 1024 # 2MB + chunks = {} + num_chunks = -(num_bytes // -CHUNK_SIZE) # ceil + if dataload in ("link", "fastlink") and s3path: + chunks = {"file_uri": s3path} + + if num_chunks <= 0: + msg = "unexpected error in setting chunks for h5image" + logging.error(msg) + raise ValueError(msg) + elif num_chunks == 1: + chunks["class"] = "H5D_CONTIGUOUS_REF" + chunks["offset"] = 0 + chunks["size"] = num_bytes + logging.debug(f"using chunk layout for link option: {chunks}") + elif num_chunks <= 100: + # 2MB - 200 MB + chunks["class"] = "H5D_CHUNKED_REF" + chunks["dims"] = [CHUNK_SIZE,] + # set the chunk locations + chunk_map = {} + chunks["dims"] = [CHUNK_SIZE,] + offset = 0 + + for i in range(num_chunks): + if offset + CHUNK_SIZE > num_bytes: + chunk_size = (offset + CHUNK_SIZE) - num_bytes + else: + chunk_size = CHUNK_SIZE + chunk_map[str(i)] = (offset, chunk_size) + offset += chunk_size + + chunks["chunks"] = chunk_map + else: + # num_chunks > 100 + # create anonymous dataset to hold chunk info + chunks["class"] = "H5D_CHUNKED_REF_INDIRECT" + chunks["dims"] = [CHUNK_SIZE,] + dt = get_chunktable_dtype() + + chunktable_dims = [num_chunks,] + anon_dset = fout.create_dataset(None, chunktable_dims, dtype=dt) + msg = f"created chunk table: {anon_dset}" + logging.info(msg) + chunks["chunk_table"] = anon_dset.id.id + else: + # non-linked case + chunks["class"] = "H5D_CHUNKED" + if num_chunks <= 1: + chunk_shape = [num_bytes,] + else: + chunk_shape = [CHUNK_SIZE,] + chunks["dims"] = chunk_shape + + logging.info(f"using chunk layout: {chunks}") + return chunks + + # ---------------------------------------------------------------------------------- def update_chunktable(src, tgt, ctx): - layout = tgt.id.layout - if not layout: + tgt_layout = get_chunk_layout(tgt) + + if not tgt_layout: raise IOError("expected dataset layout to be set") - if layout["class"] != "H5D_CHUNKED_REF_INDIRECT": + tgt_layout_class = tgt_layout.get("class") + if tgt_layout_class != "H5D_CHUNKED_REF_INDIRECT": logging.info("update_chunktable not supported for this chunk class") return if ctx["dataload"] == "fastlink": logging.info("skip update_chunktable for fastload") return rank = len(tgt.shape) - chunktable_id = layout["chunk_table"] + chunktable_id = tgt_layout["chunk_table"] fout = ctx["fout"] logging.info(f"update_chunk_table {src.name}, id: {src.id.id}") @@ -844,16 +916,16 @@ def update_chunktable(src, tgt, ctx): raise IOError(msg) return - layout = get_chunk_layout(src) - layout_class = layout["class"] - if layout_class == "H5D_CONTIGUOUS_REF": - chunk_offset = layout["offset"] - chunk_size = layout["size"] - file_uri = layout["file_uri"] + src_layout = get_chunk_layout(src) + src_layout_class = src_layout["class"] + if src_layout_class == "H5D_CONTIGUOUS_REF": + chunk_offset = src_layout["offset"] + chunk_size = src_layout["size"] + file_uri = src_layout["file_uri"] chunk_arr = [(chunk_offset, chunk_size, file_uri),] - elif layout_class == "H5D_CHUNKED_REF": - file_uri = layout["file_uri"] - chunkmap = layout["chunks"] # e.g.{'0_2': [4016, 2000000]}} + elif src_layout_class == "H5D_CHUNKED_REF": + file_uri = src_layout["file_uri"] + chunkmap = src_layout["chunks"] # e.g.{'0_2': [4016, 2000000]}} for k in chunkmap: v = chunkmap[k] v.append(file_uri) @@ -864,9 +936,9 @@ def update_chunktable(src, tgt, ctx): index.append(int(chunk_indices[i])) index = tuple(index) chunk_arr[index] = v - elif layout_class == "H5D_CHUNKED_REF_INDIRECT": - file_uri = layout["file_uri"] - orig_chunktable_id = layout["chunk_table"] + elif src_layout_class == "H5D_CHUNKED_REF_INDIRECT": + file_uri = src_layout["file_uri"] + orig_chunktable_id = src_layout["chunk_table"] orig_chunktable = fout[f"datasets/{orig_chunktable_id}"] # iterate through contents and add file uri arr = orig_chunktable[...] @@ -884,7 +956,7 @@ def update_chunktable(src, tgt, ctx): tgt_index = tuple(tgt_index) chunk_arr[it.multi_index] = e else: - msg = f"expected chunk ref class but got: {layout_class}" + msg = f"expected chunk ref class but got: {src_layout_class}" logging.error(msg) if not ctx["ignore_error"]: raise IOError(msg) @@ -1316,9 +1388,9 @@ def write_dataset(src, tgt, ctx): print(msg) resize_dataset(tgt, new_extent, axis=0) - if not is_h5py(tgt) and tgt.id.layout["class"] != "H5D_CHUNKED": + if not is_h5py(tgt) and get_chunk_layout_class(tgt) != "H5D_CHUNKED": # this is one of the ref layouts - if tgt.id.layout["class"] == "H5D_CHUNKED_REF_INDIRECT": + if get_chunk_layout_class(tgt) == "H5D_CHUNKED_REF_INDIRECT": # don't write chunks, but update chunktable for chunk ref indirect update_chunktable(src, tgt, ctx) else: @@ -1542,7 +1614,52 @@ def create_datatype(obj, ctx): srcid_desobj_map[obj.id.__hash__()] = fout[obj.name] -# create_datatype +def get_filesize(f): + f.seek(0, 2) # seek to end of file + num_bytes = f.tell() + f.seek(0) # reset synch pointer + return num_bytes + + +def load_h5image( + fin, + fout, + verbose=False, + dataload="ingest", + s3path=None +): + num_bytes = get_filesize(fin) + msg = f"input file: {num_bytes} bytes" + logging.info(msg) + if verbose: + print(msg) + + chunks = create_h5image_chunktable(num_bytes, s3path, dataload, fout) + + dset = fout.create_dataset("h5image", (num_bytes,), chunks=chunks, dtype=np.uint8) + if dataload == "ingest": + # copy the file data by pages + page_size = dset.chunks[0] + if verbose: + print(f"page size: {page_size}") + offset = 0 + while True: + data = fin.read(page_size) + if len(data) == 0: + break + arr = np.frombuffer(data, dtype=dset.dtype) + dset[offset:(offset + len(data))] = arr + offset += len(data) + msg = f"wrote {len(data)} bytes" + logging.info(msg) + if verbose: + print(msg) + + msg = "load h5imag done" + logging.info(msg) + if verbose: + print(msg) + # --------------------------------------------------------------------------------- def load_file( diff --git a/h5pyd/_hl/files.py b/h5pyd/_hl/files.py index 7617eb7..5b4d3a4 100644 --- a/h5pyd/_hl/files.py +++ b/h5pyd/_hl/files.py @@ -12,10 +12,11 @@ from __future__ import absolute_import +import io import os -import time import json import pathlib +import time from .objectid import GroupID from .group import Group @@ -40,6 +41,115 @@ def is_hdf5(domain, **kwargs): return found +class H5Image(io.RawIOBase): + def __init__(self, domain_path, h5path="h5image", logger=None): + self._cursor = 0 + if domain_path.startswith("hdf5::/"): + self._domain_path = domain_path + else: + self._domain_path = "hdf5:/" + domain_path + f = File(domain_path) + if h5path not in f: + raise IOError("Expected 'data' dataset") + dset = f[h5path] + if len(dset.shape) != 1: + raise IOError("Expected one-dimensional dataset") + self._dset = dset + num_chunks = -(dset.shape[0] // -dset.chunks[0]) + self._page_cache = [None,] * num_chunks + self._logger = logger + if self._logger: + self._logger.info(f"domain {self._domain_path} opened") + + def __repr__(self): + return f'<{self._domain_path}>' + + def readable(self): + return True + + def seekable(self): + return True + + @property + def size(self): + return self._dset.shape[0] + + @property + def page_size(self): + return self._dset.chunks[0] + + def tell(self): + return self._cursor + + def seek(self, offset, whence=io.SEEK_SET): + if whence == io.SEEK_SET: + if self._logger: + self._logger.debug(f"SEEK_SET({offset})") + self._cursor = offset + elif whence == io.SEEK_CUR: + if self._logger: + self._logger.debug(f"SEEK_CUR({offset})") + self._cursor += offset + elif whence == io.SEEK_END: + if self._logger: + self._logger.debug(f"SEEK_END({offset})") + self._cursor = self.size + offset + else: + raise ValueError(f'{whence}: Unknown whence value') + if self._logger: + self._logger.debug(f"cursor: {self._cursor}") + return self._cursor + + def _get_page(self, page_number): + if self._page_cache[page_number] is None: + if self._logger: + self._logger.info(f"reading page {page_number} from server") + offset = page_number * self.page_size + arr = self._dset[offset:offset + self.page_size] + self._page_cache[page_number] = arr.tobytes() + if self._logger: + self._logger.debug(f"fetching page {page_number} from cache") + return self._page_cache[page_number] + + def read(self, size=-1): + start = self._cursor + if size < 0 or self._cursor + size >= self.size: + stop = self.size + self.seek(offset=0, whence=io.SEEK_END) + else: + stop = start + size + self.seek(offset=size, whence=io.SEEK_CUR) + + if self._logger: + self._logger.debug(f">>GET {start}:{stop}") + + buffer = bytearray(stop - start) + offset = start + while offset - start < size: + page_number = offset // self.page_size + page_bytes = self._get_page(page_number) + num_bytes = ((offset + 1) + self.page_size // self.page_size) + self.page_size + if offset + num_bytes - start > size: + num_bytes = start + size - offset + page_start = offset % self.page_size + page_stop = page_start + num_bytes + buffer_start = offset - start + buffer_stop = buffer_start + num_bytes + buffer[buffer_start:buffer_stop] = page_bytes[page_start:page_stop] + offset += num_bytes + + if self._logger: + self._logger.debug(f"returning: {len(buffer)} bytes") + return buffer + + def readinto(self, buff): + if self._logger: + self._logger.debug(f"readinto({len(buff)})") + data = self.read(len(buff)) + buff[:len(data)] = data + return len(data) + + class File(Group): """ @@ -183,7 +293,6 @@ def __init__( track_order Whether to track dataset/group/attribute creation order within this file. Objects will be iterated in ascending creation order if this is enabled, otherwise in ascending alphanumeric order. - retries Number of retry attempts to be used if a server request fails timeout