Skip to content

Commit

Permalink
updates for hdf5 file images datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Aug 22, 2024
1 parent 566decc commit b01e9c6
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 66 deletions.
2 changes: 1 addition & 1 deletion h5pyd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ._hl.h5type import vlen_dtype, string_dtype, enum_dtype
from ._hl.h5type import check_vlen_dtype, check_string_dtype, check_enum_dtype
from ._hl.h5type import check_opaque_dtype, check_ref_dtype, check_dtype
from ._hl.files import File, is_hdf5
from ._hl.files import File, H5Image, is_hdf5
from ._hl.folders import Folder
from ._hl.group import Group, SoftLink, ExternalLink, UserDefinedLink, HardLink
from ._hl.dataset import Dataset, MultiManager
Expand Down
56 changes: 34 additions & 22 deletions h5pyd/_apps/hsload.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

if __name__ == "__main__":
from config import Config
from utillib import load_file
from utillib import load_file, load_h5image
else:
from .config import Config
from .utillib import load_file
from .utillib import load_file, load_h5image

from urllib.parse import urlparse

Expand Down Expand Up @@ -70,7 +70,7 @@ def usage():
if help_msg:
print(f" {help_msg}")
print("")
print("Note about --link option:")
print("Note about --link and --fastlink option:")
print(" --link enables just the source HDF5 metadata to be ingested while the dataset data")
print(" is left in the original file and fetched as needed.")
print(" When used with files stored in AWS S3, the source file can be specified using the S3")
Expand All @@ -79,6 +79,8 @@ def usage():
print(" For Posix or Azure deployments, the source file needs to be copied to a regular file,")
print(" and the --linkpath option should be used to specifiy the Azure container name and path, or ")
print(" (for HSDS deployed with POSIX) the file path relative to the server ROOT_DIR")
print(" --fastlink works like --link except metadata about chunk locations will be determined as")
print(" as needed. This will lesen the time needed for hsload for files containing many chunks.")
print("")
print(cfg.get_see_also(cmd))
print("")
Expand Down Expand Up @@ -110,6 +112,7 @@ def main():
help="Link to dataset data without initializing chunk locations (will be set server side)")
cfg.setitem("linkpath", None, flags=["--linkpath",], choices=["PATH_URI",],
help="Use the given URI for the link references rather than the src path")
cfg.setitem("h5image", False, flags=["--h5image"], help="store as hdf5 file image")
cfg.setitem("compression", None, flags=["--compression",], choices=COMPRESSION_FILTERS,
help="use the given compression algorithm for -z option (lz4 is default)")
cfg.setitem("ignorefilters", False, flags=["--ignore-filters"], help="ignore any filters used by source dataset")
Expand Down Expand Up @@ -164,9 +167,7 @@ def main():
if cfg["link"]:
logging.info("checking libversion")

if (
h5py.version.version_tuple.major == 2 and h5py.version.version_tuple.minor < 10
):
if (h5py.version.version_tuple.major == 2 and h5py.version.version_tuple.minor < 10):
abort("link option requires h5py version 2.10 or higher")

if h5py.version.hdf5_version_tuple < (1, 10, 6):
Expand Down Expand Up @@ -225,7 +226,10 @@ def main():
else:
s3path = None
try:
fin = h5py.File(src_file, mode="r")
if cfg["h5image"]:
fin = open(src_file, "rb") # open the file image
else:
fin = h5py.File(src_file)
except IOError as ioe:
abort(f"Error opening file {src_file}: {ioe}")

Expand Down Expand Up @@ -288,21 +292,29 @@ def main():
else:
compression_opts = None

# do the actual load
kwargs = {
"verbose": cfg["verbose"],
"dataload": dataload,
"s3path": s3path,
"compression": compression,
"compression_opts": compression_opts,
"ignorefilters": cfg["ignorefilters"],
"append": cfg["append"],
"extend_dim": cfg["extend_dim"],
"extend_offset": cfg["extend_offset"],
"ignore_error": cfg["ignore_error"],
"no_clobber": no_clobber
}
load_file(fin, fout, **kwargs)
if cfg["h5image"]:
kwargs = {
"verbose": cfg["verbose"],
"dataload": dataload,
"s3path": s3path,
}
load_h5image(fin, fout, **kwargs)
else:
# regular load to shared data format
kwargs = {
"verbose": cfg["verbose"],
"dataload": dataload,
"s3path": s3path,
"compression": compression,
"compression_opts": compression_opts,
"ignorefilters": cfg["ignorefilters"],
"append": cfg["append"],
"extend_dim": cfg["extend_dim"],
"extend_offset": cfg["extend_offset"],
"ignore_error": cfg["ignore_error"],
"no_clobber": no_clobber,
}
load_file(fin, fout, **kwargs)

msg = f"File {src_file} uploaded to domain: {tgt}"
logging.info(msg)
Expand Down
199 changes: 158 additions & 41 deletions h5pyd/_apps/utillib.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,38 @@ def get_fillvalue(dset):


def is_compact(dset):
if isinstance(dset.id.id, str):
if is_h5py(dset):
cpl = dset.id.get_create_plist()
if cpl.get_layout() == h5py.h5d.COMPACT:
return True
else:
return False
else:
return False # compact storage not used with HSDS
cpl = dset.id.get_create_plist()
if cpl.get_layout() == h5py.h5d.COMPACT:
return True


# ----------------------------------------------------------------------------------
def get_chunk_layout(dset):
if is_h5py(dset):
msg = "get_chunk_layout called on hdf5 dataset"
logging.error(msg)
raise IOError(msg)
dset_json = dset.id.dcpl_json
if "layout" not in dset_json:
msg = f"expect to find layout key in dset_json: {dset_json}"
logging.error(msg)
raise IOError(msg)
layout = dset_json["layout"]
logging.debug(f"got chunk layout for dset id: {dset.id.id}: {layout}")
return layout


def get_chunk_layout_class(dset):
layout_json = get_chunk_layout(dset)
if layout_json and "class" in layout_json:
return layout_json["class"]
else:
return False
return None


def convert_dtype(srcdt, ctx):
Expand Down Expand Up @@ -424,22 +449,6 @@ def resize_dataset(dset, extent, axis=0):
raise


# ----------------------------------------------------------------------------------
def get_chunk_layout(dset):
if is_h5py(dset):
msg = "get_chunk_layout called on hdf5 dataset"
logging.error(msg)
raise IOError(msg)
dset_json = dset.id.dcpl_json
if "layout" not in dset_json:
msg = f"expect to find layout key in dset_json: {dset_json}"
logging.error(msg)
raise IOError(msg)
layout = dset_json["layout"]
logging.debug(f"got chunk layout for dset id: {dset.id.id}: {layout}")
return layout


# ----------------------------------------------------------------------------------
def get_chunk_dims(dset):
if dset.chunks:
Expand Down Expand Up @@ -789,19 +798,82 @@ def create_chunktable(dset, dset_dims, ctx):
return chunks


# ----------------------------------------------------------------------------------
def create_h5image_chunktable(num_bytes, s3path, dataload, fout):
logging.debug(f"create_h5image_chunktable({num_bytes}, {s3path}")
CHUNK_SIZE = 2 * 1024 * 1024 # 2MB
chunks = {}
num_chunks = -(num_bytes // -CHUNK_SIZE) # ceil
if dataload in ("link", "fastlink") and s3path:
chunks = {"file_uri": s3path}

if num_chunks <= 0:
msg = "unexpected error in setting chunks for h5image"
logging.error(msg)
raise ValueError(msg)
elif num_chunks == 1:
chunks["class"] = "H5D_CONTIGUOUS_REF"
chunks["offset"] = 0
chunks["size"] = num_bytes
logging.debug(f"using chunk layout for link option: {chunks}")
elif num_chunks <= 100:
# 2MB - 200 MB
chunks["class"] = "H5D_CHUNKED_REF"
chunks["dims"] = [CHUNK_SIZE,]
# set the chunk locations
chunk_map = {}
chunks["dims"] = [CHUNK_SIZE,]
offset = 0

for i in range(num_chunks):
if offset + CHUNK_SIZE > num_bytes:
chunk_size = (offset + CHUNK_SIZE) - num_bytes
else:
chunk_size = CHUNK_SIZE
chunk_map[str(i)] = (offset, chunk_size)
offset += chunk_size

chunks["chunks"] = chunk_map
else:
# num_chunks > 100
# create anonymous dataset to hold chunk info
chunks["class"] = "H5D_CHUNKED_REF_INDIRECT"
chunks["dims"] = [CHUNK_SIZE,]
dt = get_chunktable_dtype()

chunktable_dims = [num_chunks,]
anon_dset = fout.create_dataset(None, chunktable_dims, dtype=dt)
msg = f"created chunk table: {anon_dset}"
logging.info(msg)
chunks["chunk_table"] = anon_dset.id.id
else:
# non-linked case
chunks["class"] = "H5D_CHUNKED"
if num_chunks <= 1:
chunk_shape = [num_bytes,]
else:
chunk_shape = [CHUNK_SIZE,]
chunks["dims"] = chunk_shape

logging.info(f"using chunk layout: {chunks}")
return chunks


# ----------------------------------------------------------------------------------
def update_chunktable(src, tgt, ctx):
layout = tgt.id.layout
if not layout:
tgt_layout = get_chunk_layout(tgt)

if not tgt_layout:
raise IOError("expected dataset layout to be set")
if layout["class"] != "H5D_CHUNKED_REF_INDIRECT":
tgt_layout_class = tgt_layout.get("class")
if tgt_layout_class != "H5D_CHUNKED_REF_INDIRECT":
logging.info("update_chunktable not supported for this chunk class")
return
if ctx["dataload"] == "fastlink":
logging.info("skip update_chunktable for fastload")
return
rank = len(tgt.shape)
chunktable_id = layout["chunk_table"]
chunktable_id = tgt_layout["chunk_table"]

fout = ctx["fout"]
logging.info(f"update_chunk_table {src.name}, id: {src.id.id}")
Expand Down Expand Up @@ -844,16 +916,16 @@ def update_chunktable(src, tgt, ctx):
raise IOError(msg)
return

layout = get_chunk_layout(src)
layout_class = layout["class"]
if layout_class == "H5D_CONTIGUOUS_REF":
chunk_offset = layout["offset"]
chunk_size = layout["size"]
file_uri = layout["file_uri"]
src_layout = get_chunk_layout(src)
src_layout_class = src_layout["class"]
if src_layout_class == "H5D_CONTIGUOUS_REF":
chunk_offset = src_layout["offset"]
chunk_size = src_layout["size"]
file_uri = src_layout["file_uri"]
chunk_arr = [(chunk_offset, chunk_size, file_uri),]
elif layout_class == "H5D_CHUNKED_REF":
file_uri = layout["file_uri"]
chunkmap = layout["chunks"] # e.g.{'0_2': [4016, 2000000]}}
elif src_layout_class == "H5D_CHUNKED_REF":
file_uri = src_layout["file_uri"]
chunkmap = src_layout["chunks"] # e.g.{'0_2': [4016, 2000000]}}
for k in chunkmap:
v = chunkmap[k]
v.append(file_uri)
Expand All @@ -864,9 +936,9 @@ def update_chunktable(src, tgt, ctx):
index.append(int(chunk_indices[i]))
index = tuple(index)
chunk_arr[index] = v
elif layout_class == "H5D_CHUNKED_REF_INDIRECT":
file_uri = layout["file_uri"]
orig_chunktable_id = layout["chunk_table"]
elif src_layout_class == "H5D_CHUNKED_REF_INDIRECT":
file_uri = src_layout["file_uri"]
orig_chunktable_id = src_layout["chunk_table"]
orig_chunktable = fout[f"datasets/{orig_chunktable_id}"]
# iterate through contents and add file uri
arr = orig_chunktable[...]
Expand All @@ -884,7 +956,7 @@ def update_chunktable(src, tgt, ctx):
tgt_index = tuple(tgt_index)
chunk_arr[it.multi_index] = e
else:
msg = f"expected chunk ref class but got: {layout_class}"
msg = f"expected chunk ref class but got: {src_layout_class}"
logging.error(msg)
if not ctx["ignore_error"]:
raise IOError(msg)
Expand Down Expand Up @@ -1316,9 +1388,9 @@ def write_dataset(src, tgt, ctx):
print(msg)
resize_dataset(tgt, new_extent, axis=0)

if not is_h5py(tgt) and tgt.id.layout["class"] != "H5D_CHUNKED":
if not is_h5py(tgt) and get_chunk_layout_class(tgt) != "H5D_CHUNKED":
# this is one of the ref layouts
if tgt.id.layout["class"] == "H5D_CHUNKED_REF_INDIRECT":
if get_chunk_layout_class(tgt) == "H5D_CHUNKED_REF_INDIRECT":
# don't write chunks, but update chunktable for chunk ref indirect
update_chunktable(src, tgt, ctx)
else:
Expand Down Expand Up @@ -1542,7 +1614,52 @@ def create_datatype(obj, ctx):
srcid_desobj_map[obj.id.__hash__()] = fout[obj.name]


# create_datatype
def get_filesize(f):
f.seek(0, 2) # seek to end of file
num_bytes = f.tell()
f.seek(0) # reset synch pointer
return num_bytes


def load_h5image(
fin,
fout,
verbose=False,
dataload="ingest",
s3path=None
):
num_bytes = get_filesize(fin)
msg = f"input file: {num_bytes} bytes"
logging.info(msg)
if verbose:
print(msg)

chunks = create_h5image_chunktable(num_bytes, s3path, dataload, fout)

dset = fout.create_dataset("h5image", (num_bytes,), chunks=chunks, dtype=np.uint8)
if dataload == "ingest":
# copy the file data by pages
page_size = dset.chunks[0]
if verbose:
print(f"page size: {page_size}")
offset = 0
while True:
data = fin.read(page_size)
if len(data) == 0:
break
arr = np.frombuffer(data, dtype=dset.dtype)
dset[offset:(offset + len(data))] = arr
offset += len(data)
msg = f"wrote {len(data)} bytes"
logging.info(msg)
if verbose:
print(msg)

msg = "load h5imag done"
logging.info(msg)
if verbose:
print(msg)


# ---------------------------------------------------------------------------------
def load_file(
Expand Down
Loading

0 comments on commit b01e9c6

Please sign in to comment.