diff --git a/ci/upstream.yml b/ci/upstream.yml index f312de2c..e7d896c9 100644 --- a/ci/upstream.yml +++ b/ci/upstream.yml @@ -27,8 +27,11 @@ dependencies: - pytest - pooch - fsspec + - dask + - zarr>=3.0.2 - pip - pip: - git+https://github.com/earth-mover/icechunk.git@main#subdirectory=icechunk-python # Installs zarr-python v3.0.0 as dependency - git+https://github.com/fsspec/kerchunk.git@main - imagecodecs-numcodecs==2024.6.1 + - git+https://github.com/fsspec/kerchunk.git@main diff --git a/conftest.py b/conftest.py index 0781c37e..7926b349 100644 --- a/conftest.py +++ b/conftest.py @@ -25,6 +25,22 @@ def pytest_runtest_setup(item): ) +def _xarray_subset(): + ds = xr.tutorial.open_dataset("air_temperature", chunks={}) + return ds.isel(time=slice(0, 10), lat=slice(0, 9), lon=slice(0, 18)).chunk( + {"time": 5} + ) + + +@pytest.fixture(params=[2, 3]) +def zarr_store(tmpdir, request): + ds = _xarray_subset() + filepath = f"{tmpdir}/air.zarr" + ds.to_zarr(filepath, zarr_format=request.param) + ds.close() + return filepath + + @pytest.fixture def empty_netcdf4_file(tmp_path: Path) -> str: filepath = tmp_path / "empty.nc" diff --git a/docs/api.rst b/docs/api.rst index 60716898..951b53f7 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -29,7 +29,6 @@ Serialization :toctree: generated/ VirtualiZarrDatasetAccessor.to_kerchunk - VirtualiZarrDatasetAccessor.to_zarr VirtualiZarrDatasetAccessor.to_icechunk Information diff --git a/docs/releases.rst b/docs/releases.rst index ed9e3cd0..c4df05a9 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -9,6 +9,9 @@ v1.3.1 (unreleased) New Features ~~~~~~~~~~~~ +- Adds a Zarr reader to ``open_virtual_dataset``, which allows opening Zarr V2 and V3 stores as virtual datasets. + (:pull:`#271`) By `Raphael Hagen `_. + Breaking changes ~~~~~~~~~~~~~~~~ diff --git a/docs/usage.md b/docs/usage.md index 93046226..eb3d0804 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -456,31 +456,6 @@ session.commit("Appended second dataset") See the [Icechunk documentation](https://icechunk.io/icechunk-python/virtual/#creating-a-virtual-dataset-with-virtualizarr) for more details. -### Writing as Zarr - -Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`vds.virtualize.to_zarr ` accessor method. - -```python -combined_vds.virtualize.to_zarr('combined.zarr') -``` - -The result is a zarr v3 store on disk which contains the chunk manifest information written out as `manifest.json` files, so the store looks like this: - -``` -combined/zarr.json <- group metadata -combined/air/zarr.json <- array metadata -combined/air/manifest.json <- array manifest -... -``` - -The advantage of this format is that any zarr v3 reader that understands the chunk manifest ZEP could read from this store, no matter what language it is written in (e.g. via `zarr-python`, `zarr-js`, or rust). This reading would also not require `fsspec`. - -```{note} -Currently there are not yet any zarr v3 readers which understand the chunk manifest ZEP, so until then this feature cannot be used for data processing. - -This store can however be read by {py:func}`~virtualizarr.open_virtual_dataset`, by passing `filetype="zarr_v3"`. -``` - ## Opening Kerchunk references as virtual datasets You can open existing Kerchunk `json` or `parquet` references as Virtualizarr virtual datasets. This may be useful for converting existing Kerchunk formatted references to storage formats like [Icechunk](https://icechunk.io/). diff --git a/virtualizarr/accessor.py b/virtualizarr/accessor.py index f0dfb966..597a9f36 100644 --- a/virtualizarr/accessor.py +++ b/virtualizarr/accessor.py @@ -7,7 +7,6 @@ from virtualizarr.manifests import ManifestArray from virtualizarr.types.kerchunk import KerchunkStoreRefs from virtualizarr.writers.kerchunk import dataset_to_kerchunk_refs -from virtualizarr.writers.zarr import dataset_to_zarr if TYPE_CHECKING: from icechunk import IcechunkStore # type: ignore[import-not-found] @@ -24,21 +23,6 @@ class VirtualiZarrDatasetAccessor: def __init__(self, ds: Dataset): self.ds: Dataset = ds - def to_zarr(self, storepath: str) -> None: - """ - Serialize all virtualized arrays in this xarray dataset as a Zarr store. - - Currently requires all variables to be backed by ManifestArray objects. - - Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. - See https://github.com/zarr-developers/zarr-specs/issues/287 - - Parameters - ---------- - storepath : str - """ - dataset_to_zarr(self.ds, storepath) - def to_icechunk( self, store: "IcechunkStore", diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index 96c13322..ef3d1ff4 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -17,7 +17,7 @@ KerchunkVirtualBackend, NetCDF3VirtualBackend, TIFFVirtualBackend, - ZarrV3VirtualBackend, + ZarrVirtualBackend, ) from virtualizarr.readers.common import VirtualBackend from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions @@ -25,7 +25,7 @@ # TODO add entrypoint to allow external libraries to add to this mapping VIRTUAL_BACKENDS = { "kerchunk": KerchunkVirtualBackend, - "zarr_v3": ZarrV3VirtualBackend, + "zarr": ZarrVirtualBackend, "dmrpp": DMRPPVirtualBackend, "hdf5": HDFVirtualBackend, "netcdf4": HDFVirtualBackend, # note this is the same as for hdf5 @@ -72,8 +72,7 @@ def automatically_determine_filetype( # TODO how do we handle kerchunk json / parquet here? if Path(filepath).suffix == ".zarr": - # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... - raise NotImplementedError() + return FileType.zarr # Read magic bytes from local or remote file fpath = _FsspecFSFromFilepath( diff --git a/virtualizarr/codecs.py b/virtualizarr/codecs.py index 94f6d1aa..37f77e7c 100644 --- a/virtualizarr/codecs.py +++ b/virtualizarr/codecs.py @@ -3,7 +3,7 @@ from virtualizarr.zarr import Codec if TYPE_CHECKING: - from zarr import Array # type: ignore + from zarr import Array, AsyncArray # type: ignore from zarr.core.abc.codec import ( # type: ignore ArrayArrayCodec, ArrayBytesCodec, @@ -14,14 +14,14 @@ def get_codecs( - array: Union["ManifestArray", "Array"], + array: Union["ManifestArray", "Array", "AsyncArray"], normalize_to_zarr_v3: bool = False, ) -> Union[Codec, tuple["ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec", ...]]: """ - Get the codecs for either a ManifestArray or a Zarr Array. + Get the codecs for either a ManifestArray, a Zarr Array or an Async Zarr Array. Parameters: - array (Union[ManifestArray, ZarrArray]): The input array, either ManifestArray or Zarr Array. + array (Union[ManifestArray, Array, AsyncArray]): The input array, either ManifestArray or Zarr Array. Returns: List[Optional[Codec]]: A list of codecs or an empty list if no codecs are found. @@ -30,6 +30,7 @@ def get_codecs( ImportError: If `zarr` is required but not installed. ValueError: If the array type is unsupported. """ + if _is_manifest_array(array): return _get_manifestarray_codecs(array, normalize_to_zarr_v3) # type: ignore[arg-type] @@ -65,9 +66,9 @@ def _get_manifestarray_codecs( def _is_zarr_array(array: object) -> bool: """Check if the array is an instance of Zarr Array.""" try: - from zarr import Array + from zarr import Array, AsyncArray - return isinstance(array, Array) + return isinstance(array, (Array, AsyncArray)) except ImportError: return False diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 666f4854..01ac3577 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -1,4 +1,3 @@ -import json import re from collections.abc import ItemsView, Iterable, Iterator, KeysView, ValuesView from pathlib import PosixPath @@ -48,7 +47,6 @@ def with_validation( """ # note: we can't just use `__init__` or a dataclass' `__post_init__` because we need `fs_root` to be an optional kwarg - path = validate_and_normalize_path_to_uri(path, fs_root=fs_root) validate_byte_range(offset=offset, length=length) return ChunkEntry(path=path, offset=offset, length=length) @@ -84,7 +82,8 @@ def validate_and_normalize_path_to_uri(path: str, fs_root: str | None = None) -> return urlunparse(components) elif any(path.startswith(prefix) for prefix in VALID_URI_PREFIXES): - if not PosixPath(path).suffix: + # Question: This feels fragile, is there a better way to ID a Zarr + if not PosixPath(path).suffix and "zarr" not in path: raise ValueError( f"entries in the manifest must be paths to files, but this path has no file suffix: {path}" ) @@ -96,7 +95,7 @@ def validate_and_normalize_path_to_uri(path: str, fs_root: str | None = None) -> # using PosixPath here ensures a clear error would be thrown on windows (whose paths and platform are not officially supported) _path = PosixPath(path) - if not _path.suffix: + if not _path.suffix and "zarr" not in path: raise ValueError( f"entries in the manifest must be paths to files, but this path has no file suffix: {path}" ) @@ -436,21 +435,6 @@ def __eq__(self, other: Any) -> bool: lengths_equal = (self._lengths == other._lengths).all() return paths_equal and offsets_equal and lengths_equal - @classmethod - def from_zarr_json(cls, filepath: str) -> "ChunkManifest": - """Create a ChunkManifest from a Zarr manifest.json file.""" - - with open(filepath, "r") as manifest_file: - entries = json.load(manifest_file) - - return cls(entries=entries) - - def to_zarr_json(self, filepath: str) -> None: - """Write the manifest to a Zarr manifest.json file.""" - entries = self.dict() - with open(filepath, "w") as json_file: - json.dump(entries, json_file, indent=4, separators=(", ", ": ")) - def rename_paths( self, new: str | Callable[[str], str], diff --git a/virtualizarr/readers/__init__.py b/virtualizarr/readers/__init__.py index c776a9ae..3d887844 100644 --- a/virtualizarr/readers/__init__.py +++ b/virtualizarr/readers/__init__.py @@ -5,7 +5,9 @@ from virtualizarr.readers.kerchunk import KerchunkVirtualBackend from virtualizarr.readers.netcdf3 import NetCDF3VirtualBackend from virtualizarr.readers.tiff import TIFFVirtualBackend -from virtualizarr.readers.zarr_v3 import ZarrV3VirtualBackend +from virtualizarr.readers.zarr import ( + ZarrVirtualBackend, +) __all__ = [ "DMRPPVirtualBackend", @@ -15,5 +17,5 @@ "KerchunkVirtualBackend", "NetCDF3VirtualBackend", "TIFFVirtualBackend", - "ZarrV3VirtualBackend", + "ZarrVirtualBackend", ] diff --git a/virtualizarr/readers/common.py b/virtualizarr/readers/common.py index 0a7ad36e..1b3c6b1f 100644 --- a/virtualizarr/readers/common.py +++ b/virtualizarr/readers/common.py @@ -45,16 +45,24 @@ def maybe_open_loadable_vars_and_indexes( # TODO Really we probably want a dedicated backend that iterates over all variables only once # TODO See issue #124 for a suggestion of how to avoid calling xarray here. - fpath = _FsspecFSFromFilepath( - filepath=filepath, reader_options=reader_options - ).open_file() + fpath = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options) + + # Updates the Xarray open_dataset kwargs if Zarr + + if fpath.upath.suffix == ".zarr": + engine = "zarr" + xr_input = fpath.upath + + else: + engine = None + xr_input = fpath.open_file() # type: ignore - # fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any. ds = open_dataset( - fpath, # type: ignore[arg-type] + xr_input, # type: ignore[arg-type] drop_variables=drop_variables, group=group, decode_times=decode_times, + engine=engine, ) if indexes is None: diff --git a/virtualizarr/readers/zarr.py b/virtualizarr/readers/zarr.py new file mode 100644 index 00000000..787d22c4 --- /dev/null +++ b/virtualizarr/readers/zarr.py @@ -0,0 +1,316 @@ +from __future__ import annotations + +import asyncio +from itertools import starmap +from pathlib import Path # noqa +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Iterable, + Mapping, + Optional, + TypeVar, +) + +import numpy as np +from xarray import Dataset, Index, Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri # noqa +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + maybe_open_loadable_vars_and_indexes, +) +from virtualizarr.utils import check_for_collisions +from virtualizarr.zarr import ZARR_DEFAULT_FILL_VALUE, ZArray + +if TYPE_CHECKING: + import zarr + +# Vendored directly from Zarr-python V3's private API +# https://github.com/zarr-developers/zarr-python/blob/458299857141a5470ba3956d8a1607f52ac33857/src/zarr/core/common.py#L53 +T = TypeVar("T", bound=tuple[Any, ...]) +V = TypeVar("V") + + +async def _concurrent_map( + items: Iterable[T], + func: Callable[..., Awaitable[V]], + limit: int | None = None, +) -> list[V]: + if limit is None: + return await asyncio.gather(*list(starmap(func, items))) + + else: + sem = asyncio.Semaphore(limit) + + async def run(item: tuple[Any]) -> V: + async with sem: + return await func(*item) + + return await asyncio.gather( + *[asyncio.ensure_future(run(item)) for item in items] + ) + + +async def build_chunk_manifest( + zarr_array: zarr.AsyncArray, prefix: str, filepath: str +) -> ChunkManifest: + """Build a ChunkManifest with the from_arrays method""" + + key_tuples = [(x,) async for x in zarr_array.store.list_prefix(prefix)] + + filepath_list = [filepath] * len(key_tuples) + + def combine_path_chunk(filepath: str, chunk_key: str): + return filepath + "/" + chunk_key + + vectorized_chunk_path_combine_func = np.vectorize( + combine_path_chunk, otypes=[np.dtypes.StringDType()] + ) + + # turn the tuples of chunks to a flattened list with :list(sum(key_tuples, ())) + _paths = vectorized_chunk_path_combine_func( + filepath_list, list(sum(key_tuples, ())) + ) + + # _offsets: np.ndarray[Any, np.dtype[np.uint64]] + _offsets = np.array([0] * len(_paths), dtype=np.uint64) + + # _lengths: np.ndarray[Any, np.dtype[np.uint64]] + lengths = await _concurrent_map((key_tuples), zarr_array.store.getsize) + _lengths = np.array(lengths, dtype=np.uint64) + + return ChunkManifest.from_arrays( + paths=_paths, # type: ignore + offsets=_offsets, + lengths=_lengths, + ) + + +async def build_zarray_metadata(zarr_array: zarr.AsyncArray[Any]): + attrs = zarr_array.metadata.attributes + + fill_value = zarr_array.metadata.fill_value + if fill_value is not None: + fill_value = ZARR_DEFAULT_FILL_VALUE[zarr_array.metadata.fill_value.dtype.kind] + + zarr_format = zarr_array.metadata.zarr_format + # set ZArray specific values depending on Zarr version + if zarr_format == 2: + compressors = zarr_array.compressors[0].get_config() # type: ignore[union-attr] + array_dims = attrs["_ARRAY_DIMENSIONS"] + + elif zarr_format == 3: + serializer = zarr_array.serializer.to_dict() # type: ignore[union-attr] # noqa: F841 + # serializer is unused in ZArray. Maybe we will need this in the ZArray refactor + # https://github.com/zarr-developers/VirtualiZarr/issues/411 + compressors = zarr_array.compressors[ + 0 + ].to_dict() # ZArray expects a dict, not a list of dicts, so only the first val from the tuples? + array_dims = zarr_array.metadata.dimension_names # type: ignore[union-attr] + if fill_value is None: + raise ValueError( + "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" + ) + + else: + raise NotImplementedError("Zarr format is not recognized as v2 or v3.") + + filters = ( + zarr_array.filters if zarr_array.filters else None + ) # if tuple is empty, assign filters to None + + array_zarray = ZArray( + shape=zarr_array.shape, + chunks=zarr_array.chunks, # type: ignore[attr-defined] + dtype=zarr_array.dtype.name, # type: ignore + fill_value=fill_value, + order=zarr_array.order, + compressor=compressors, + filters=filters, # type: ignore[arg-type] + zarr_format=zarr_format, + ) + + return { + "zarray_array": array_zarray, + "array_dims": array_dims, + "array_metadata": attrs, + } + + +async def virtual_variable_from_zarr_array( + zarr_array: zarr.AsyncArray[Any], filepath: str +): + zarr_prefix = zarr_array.basename + + if zarr_array.metadata.zarr_format == 3: + # if we have Zarr format/version 3, we add /c/ to the chunk paths + zarr_prefix = f"{zarr_prefix}/c" + + zarray_array = await build_zarray_metadata(zarr_array=zarr_array) + + chunk_manifest = await build_chunk_manifest( + zarr_array, prefix=zarr_prefix, filepath=filepath + ) + + manifest_array = ManifestArray( + zarray=zarray_array["zarray_array"], chunkmanifest=chunk_manifest + ) + return Variable( + dims=zarray_array["array_dims"], + data=manifest_array, + attrs=zarray_array["array_metadata"], + ) + + +async def virtual_dataset_from_zarr_group( + zarr_group: zarr.AsyncGroup, + filepath: str, + group: str, + drop_variables: Iterable[str] | None = [], + virtual_variables: Iterable[str] | None = [], + loadable_variables: Iterable[str] | None = [], + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: dict = {}, +): + # appease the mypy gods + if virtual_variables is None: + virtual_variables = [] + if drop_variables is None: + drop_variables = [] + + virtual_zarr_arrays = await asyncio.gather( + *[zarr_group.getitem(var) for var in virtual_variables] + ) + + virtual_variable_arrays = await asyncio.gather( + *[ + virtual_variable_from_zarr_array(zarr_array=array, filepath=filepath) # type: ignore[arg-type] + for array in virtual_zarr_arrays + ] + ) + + # build a dict mapping for use later in construct_virtual_dataset + virtual_variable_array_mapping = { + array.basename: result + for array, result in zip(virtual_zarr_arrays, virtual_variable_arrays) + } + + # flatten nested tuples and get set -> list + coord_names = list( + set( + [ + item + for tup in [val.dims for val in virtual_variable_arrays] + for item in tup + ] + ) + ) + + non_loadable_variables = list(set(virtual_variables).union(set(drop_variables))) + + loadable_vars, indexes = maybe_open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=non_loadable_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_variable_array_mapping, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=zarr_group.attrs, + ) + + +class ZarrVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + virtual_backend_kwargs: Optional[dict] = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + import asyncio + + import zarr + + # We should remove this once zarr v3 is pinned! + # --------------------------- + from packaging import version + + if version.parse(zarr.__version__).major < 3: + raise ImportError("Zarr V3 is required") + # --------------------------- + + async def _open_virtual_dataset( + filepath=filepath, + group=group, + drop_variables=drop_variables, + loadable_variables=loadable_variables, + decode_times=decode_times, + indexes=indexes, + virtual_backend_kwargs=virtual_backend_kwargs, + reader_options=reader_options, + ): + if virtual_backend_kwargs: + raise NotImplementedError( + "Zarr reader does not understand any virtual_backend_kwargs" + ) + + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) + + filepath = validate_and_normalize_path_to_uri( + filepath, fs_root=Path.cwd().as_uri() + ) + + if reader_options is None: + reader_options = {} + + zg = await zarr.api.asynchronous.open_group( + filepath, + storage_options=reader_options.get("storage_options"), + mode="r", + ) + + zarr_array_keys = [key async for key in zg.array_keys()] + + missing_vars = set(loadable_variables) - set(zarr_array_keys) + if missing_vars: + raise ValueError( + f"Some loadable variables specified are not present in this zarr store: {missing_vars}" + ) + virtual_vars = list( + set(zarr_array_keys) - set(loadable_variables) - set(drop_variables) + ) + + return await virtual_dataset_from_zarr_group( + zarr_group=zg, + filepath=filepath, + group=group, + virtual_variables=virtual_vars, + drop_variables=drop_variables, + loadable_variables=loadable_variables, + decode_times=decode_times, + indexes=indexes, + reader_options=reader_options, + ) + + return asyncio.run(_open_virtual_dataset()) diff --git a/virtualizarr/readers/zarr_v3.py b/virtualizarr/readers/zarr_v3.py deleted file mode 100644 index 70bf66e8..00000000 --- a/virtualizarr/readers/zarr_v3.py +++ /dev/null @@ -1,161 +0,0 @@ -import json -from pathlib import Path -from typing import Iterable, Mapping, Optional - -import numcodecs -import numpy as np -from xarray import Dataset, Index, Variable - -from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.readers.common import VirtualBackend, separate_coords -from virtualizarr.zarr import ZArray - - -class ZarrV3VirtualBackend(VirtualBackend): - @staticmethod - def open_virtual_dataset( - filepath: str, - group: str | None = None, - drop_variables: Iterable[str] | None = None, - loadable_variables: Iterable[str] | None = None, - decode_times: bool | None = None, - indexes: Mapping[str, Index] | None = None, - virtual_backend_kwargs: Optional[dict] = None, - reader_options: Optional[dict] = None, - ) -> Dataset: - """ - Read a Zarr v3 store containing chunk manifests and return an xarray Dataset containing virtualized arrays. - - This is experimental - chunk manifests are not part of the Zarr v3 Spec. - """ - - if virtual_backend_kwargs: - raise NotImplementedError( - "Zarr_v3 reader does not understand any virtual_backend_kwargs" - ) - - storepath = Path(filepath) - - if group: - raise NotImplementedError() - - if loadable_variables or decode_times: - raise NotImplementedError() - - if reader_options: - raise NotImplementedError() - - drop_vars: list[str] - if drop_variables is None: - drop_vars = [] - else: - drop_vars = list(drop_variables) - - ds_attrs = attrs_from_zarr_group_json(storepath / "zarr.json") - coord_names = ds_attrs.pop("coordinates", []) - - # TODO recursive glob to create a datatree - # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it - # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 - all_paths = storepath.glob("*/") - directory_paths = [p for p in all_paths if not p.is_file()] - - vars = {} - for array_dir in directory_paths: - var_name = array_dir.name - if var_name in drop_vars: - break - - zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") - manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) - - marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) - var = Variable(data=marr, dims=dim_names, attrs=attrs) - vars[var_name] = var - - if indexes is None: - raise NotImplementedError() - elif indexes != {}: - # TODO allow manual specification of index objects - raise NotImplementedError() - else: - indexes = dict(**indexes) # for type hinting: to allow mutation - - data_vars, coords = separate_coords(vars, indexes, coord_names) - - ds = Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, - ) - - return ds - - -def attrs_from_zarr_group_json(filepath: Path) -> dict: - with open(filepath) as metadata_file: - attrs = json.load(metadata_file) - return attrs["attributes"] - - -def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: - with open(filepath) as metadata_file: - metadata = json.load(metadata_file) - - if { - "name": "chunk-manifest-json", - "configuration": { - "manifest": "./manifest.json", - }, - } not in metadata.get("storage_transformers", []): - raise ValueError( - "Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP." - ) - - attrs = metadata.pop("attributes") - dim_names = metadata.pop("dimension_names") - - chunk_shape = tuple(metadata["chunk_grid"]["configuration"]["chunk_shape"]) - shape = tuple(metadata["shape"]) - zarr_format = metadata["zarr_format"] - - if metadata["fill_value"] is None: - raise ValueError( - "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" - ) - else: - fill_value = metadata["fill_value"] - - all_codecs = [ - codec - for codec in metadata["codecs"] - if codec["name"] not in ("transpose", "bytes") - ] - compressor, *filters = [ - _configurable_to_num_codec_config(_filter) for _filter in all_codecs - ] - zarray = ZArray( - chunks=chunk_shape, - compressor=compressor, - dtype=np.dtype(metadata["data_type"]), - fill_value=fill_value, - filters=filters or None, - order="C", - shape=shape, - zarr_format=zarr_format, - ) - - return zarray, dim_names, attrs - - -def _configurable_to_num_codec_config(configurable: dict) -> dict: - """ - Convert a zarr v3 configurable into a numcodecs codec. - """ - configurable_copy = configurable.copy() - codec_id = configurable_copy.pop("name") - if codec_id.startswith("numcodecs."): - codec_id = codec_id[len("numcodecs.") :] - configuration = configurable_copy.pop("configuration") - return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() diff --git a/virtualizarr/tests/test_codecs.py b/virtualizarr/tests/test_codecs.py index 23cb494e..6087271d 100644 --- a/virtualizarr/tests/test_codecs.py +++ b/virtualizarr/tests/test_codecs.py @@ -7,7 +7,6 @@ from virtualizarr import ChunkManifest, ManifestArray from virtualizarr.codecs import get_codecs from virtualizarr.tests import ( - requires_zarr_python, requires_zarr_python_v3, ) from virtualizarr.zarr import Codec @@ -149,7 +148,7 @@ def test_zarr_v3(self): expected_codecs = tuple([BytesCodec(endian="little")]) assert actual_codecs == expected_codecs - @requires_zarr_python + @requires_zarr_python_v3 def test_unsupported_zarr_python(self): zarr_array = self.create_zarr_array() unsupported_zarr_version = "2.18.3" diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 14cc8d3d..c713a02e 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -1,4 +1,3 @@ -from os.path import relpath from pathlib import Path import numpy as np @@ -13,7 +12,9 @@ has_kerchunk, parametrize_over_hdf_backends, requires_kerchunk, + requires_network, requires_zarr_python, + requires_zarr_python_v3, ) from virtualizarr.translators.kerchunk import ( dataset_from_kerchunk_refs, @@ -80,6 +81,7 @@ def test_numpy_arrays_to_inlined_kerchunk_refs( vds = open_virtual_dataset( netcdf4_file, loadable_variables=vars_to_inline, indexes={}, backend=hdf_backend ) + refs = vds.virtualize.to_kerchunk(format="dict") # TODO I would just compare the entire dicts but kerchunk returns inconsistent results - see https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202 @@ -90,6 +92,68 @@ def test_numpy_arrays_to_inlined_kerchunk_refs( assert refs["refs"]["time/0"] == expected["refs"]["time/0"] +@requires_zarr_python_v3 +@requires_network +@requires_kerchunk +@pytest.mark.skip( + reason="WIP on kerchunk v3 RT- fails in RT: RuntimeError: 'error during blosc decompression: -1' for Zarr v3 + kerchunk v3 branch" +) +@pytest.mark.parametrize( + "zarr_store", + [ + pytest.param(2, id="Zarr V2"), + pytest.param(3, id="Zarr V3"), + ], + indirect=True, +) +def test_zarr_roundtrip_kerchunk(zarr_store): + comparion_ds = xr.open_zarr(zarr_store) + + ds = open_virtual_dataset( + zarr_store, + indexes={}, + ) + ds_refs = ds.virtualize.to_kerchunk(format="dict") + roundtrip = xr.open_dataset(ds_refs, engine="kerchunk", decode_times=False) + + xrt.assert_equal(comparion_ds, roundtrip) + + +@requires_zarr_python_v3 +@requires_network +@pytest.mark.skip(reason="WIP on icechunk round-trip/") +@pytest.mark.parametrize( + "zarr_store", + [ + pytest.param(2, id="Zarr V2"), + pytest.param(3, id="Zarr V3"), + ], + indirect=True, +) +def test_zarr_roundtrip_icechunk(zarr_store): + import icechunk # type: ignore[import-not-found] + + # open zarr store with Xarray for comparison + comparion_ds = xr.open_zarr(zarr_store) + + ds = open_virtual_dataset( + zarr_store, + indexes={}, + ) + + # Note: this was done with icechunk 0.1.0a15 - syntax could be incorrect + storage = icechunk.storage.in_memory_storage() + repo = icechunk.Repository.open_or_create(storage=storage) + session = repo.writable_session("main") + + # Write the virtual dataset to icechunk + + ds.virtualize.to_icechunk(session.store) + + rtds = xr.open_zarr(session.store) + xrt.assert_equal(comparion_ds, rtds) + + def roundtrip_as_kerchunk_dict(vds: xr.Dataset, tmpdir, **kwargs): # write those references to an in-memory kerchunk-formatted references dictionary ds_refs = vds.virtualize.to_kerchunk(format="dict") @@ -287,8 +351,7 @@ def test_convert_absolute_paths_to_uris(self, netcdf4_file, hdf_backend): assert path == expected_path def test_convert_relative_paths_to_uris(self, netcdf4_file, hdf_backend): - relative_path = relpath(netcdf4_file) - vds = open_virtual_dataset(relative_path, indexes={}, backend=hdf_backend) + vds = open_virtual_dataset(netcdf4_file, indexes={}, backend=hdf_backend) expected_path = Path(netcdf4_file).as_uri() diff --git a/virtualizarr/tests/test_readers/test_zarr.py b/virtualizarr/tests/test_readers/test_zarr.py new file mode 100644 index 00000000..cd4d2cc5 --- /dev/null +++ b/virtualizarr/tests/test_readers/test_zarr.py @@ -0,0 +1,121 @@ +import numpy as np +import pytest + +from virtualizarr import open_virtual_dataset +from virtualizarr.manifests import ManifestArray +from virtualizarr.tests import requires_network, requires_zarr_python_v3 + + +@requires_zarr_python_v3 +@requires_network +@pytest.mark.parametrize( + "zarr_store", + [ + pytest.param( + 2, + id="Zarr V2", + ), + pytest.param( + 3, + id="Zarr V3", + ), + ], + indirect=True, +) +class TestOpenVirtualDatasetZarr: + def test_loadable_variables(self, zarr_store, loadable_variables=["time", "air"]): + # check that loadable variables works + vds = open_virtual_dataset( + filepath=zarr_store, loadable_variables=loadable_variables, indexes={} + ) + assert isinstance(vds["time"].data, np.ndarray) + assert isinstance(vds["air"].data, np.ndarray) + + def test_drop_variables(self, zarr_store, drop_variables=["air"]): + # check variable is dropped + vds = open_virtual_dataset( + filepath=zarr_store, drop_variables=drop_variables, indexes={} + ) + assert len(vds.data_vars) == 0 + + def test_virtual_dataset_zarr_attrs(self, zarr_store): + import zarr + + from virtualizarr.zarr import ZARR_DEFAULT_FILL_VALUE, ZArray + + zg = zarr.open_group(zarr_store) + vds = open_virtual_dataset(filepath=zarr_store, indexes={}) + + non_var_arrays = ["time", "lat", "lon"] + # check dims and coords are present + assert set(vds.coords) == set(non_var_arrays) + assert set(vds.dims) == set(non_var_arrays) + # check vars match + assert set(vds.keys()) == set(["air"]) + + # arrays are ManifestArrays + for array in list(vds): + assert isinstance(vds[array].data, ManifestArray) + + # check top level attrs + assert zg.attrs.asdict() == vds.attrs + + # check ZArray values + arrays = [val for val in zg.keys()] + + def _validate_attr_match( + array: str, zarr_array: zarr.Array | zarr.AsyncArray, zarray: ZArray + ): + zarr_array_fill_value = zarr_array.fill_value # type: ignore[union-attr] + + if zarr_array_fill_value: + zarr_array_fill_value = ZARR_DEFAULT_FILL_VALUE[ + zarr_array_fill_value.dtype.kind + ] + else: + zarr_array_fill_value = 0 + zarr_array_filters = ( + zarr_array.filters if zarr_array.filters else None + ) # if tuple is empty, assign filters to None to match ZArray def + + assert zarr_array.shape == zarray.shape, ( + f"Mismatch in [shape] for {array} between Zarr Array: {zarr_array.shape} and ZArray: {zarray.shape}" + ) + assert zarr_array.chunks == zarray.chunks, ( + f"Mismatch in [chunks] for {array} between Zarr Array: {zarr_array.chunks} and ZArray: {zarray.chunks}" + ) + assert zarr_array.dtype == zarray.dtype, ( + f"Mismatch in [dtype] between Zarr Array: {zarr_array.dtype} and ZArray: {(zarray.dtype,)}" + ) + assert zarr_array_fill_value == zarray.fill_value, ( + f"Mismatch in [fill_value] for {array} between Zarr Array: {zarr_array_fill_value} and ZArray: {zarray.fill_value}" + ) + assert zarr_array.order == zarray.order, ( + f"Mismatch in [order] for {array} between Zarr Array: {zarr_array.order} and ZArray: {zarray.order}" + ) + assert zarr_array_filters == zarray.filters, ( + f"Mismatch in [filters] for {array} between Zarr Array: {zarr_array_filters} and ZArray: {(zarray.filters,)}" + ) + assert zarr_array.metadata.zarr_format == zarray.zarr_format, ( + f"Mismatch in [zarr_format] for {array} between Zarr Array: {zarr_array.metadata.zarr_format} and ZArray: {(zarray.zarr_format,)}" + ) + + if zarr_array.metadata.zarr_format == 2: + zarr_array_compressor = zarr_array.compressor.get_config() # type: ignore[union-attr] + elif zarr_array.metadata.zarr_format == 3: + zarr_array_compressor = zarr_array.compressors[0].to_dict() + else: + raise NotImplementedError( + f"Zarr format {zarr_array.metadata.zarr_format} not in [2,3]" + ) + + assert zarr_array_compressor == zarray.compressor, ( + f"Mismatch in [compressor] for {array} between Zarr Array: {zarr_array_compressor} and ZArray: {zarray.compressor}" + ) + + [ + _validate_attr_match( + array=array, zarr_array=zg[array], zarray=vds[array].data.zarray + ) + for array in arrays + ] diff --git a/virtualizarr/tests/test_writers/test_zarr.py b/virtualizarr/tests/test_writers/test_zarr.py deleted file mode 100644 index 9ca281cb..00000000 --- a/virtualizarr/tests/test_writers/test_zarr.py +++ /dev/null @@ -1,62 +0,0 @@ -import json - -import pytest -import xarray.testing as xrt -from xarray import Dataset - -pytest.importorskip("zarr.core.metadata.v3") - -from virtualizarr import open_virtual_dataset -from virtualizarr.backend import FileType -from virtualizarr.readers.zarr_v3 import metadata_from_zarr_json -from virtualizarr.writers.zarr import dataset_to_zarr - - -def isconfigurable(value: dict) -> bool: - """ - Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict - """ - return "name" in value and "configuration" in value - - -def test_zarr_v3_metadata_conformance(tmpdir, vds_with_manifest_arrays: Dataset): - """ - Checks that the output metadata of an array variable conforms to this spec - for the required attributes: - https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#metadata - """ - dataset_to_zarr(vds_with_manifest_arrays, tmpdir / "store.zarr") - # read the a variable's metadata - with open(tmpdir / "store.zarr/a/zarr.json", mode="r") as f: - metadata = json.loads(f.read()) - assert metadata["zarr_format"] == 3 - assert metadata["node_type"] == "array" - assert isinstance(metadata["shape"], list) and all( - isinstance(dim, int) for dim in metadata["shape"] - ) - assert isinstance(metadata["data_type"], str) or isconfigurable( - metadata["data_type"] - ) - assert isconfigurable(metadata["chunk_grid"]) - assert isconfigurable(metadata["chunk_key_encoding"]) - assert isinstance(metadata["fill_value"], (bool, int, float, str, list)) - assert ( - isinstance(metadata["codecs"], list) - and len(metadata["codecs"]) == 1 - and all(isconfigurable(codec) for codec in metadata["codecs"]) - ) - - -def test_zarr_v3_roundtrip(tmpdir, vds_with_manifest_arrays: Dataset): - vds_with_manifest_arrays.virtualize.to_zarr(tmpdir / "store.zarr") - roundtrip = open_virtual_dataset( - tmpdir / "store.zarr", filetype=FileType.zarr_v3, indexes={} - ) - - xrt.assert_identical(roundtrip, vds_with_manifest_arrays) - - -def test_metadata_roundtrip(tmpdir, vds_with_manifest_arrays: Dataset): - dataset_to_zarr(vds_with_manifest_arrays, tmpdir / "store.zarr") - zarray, _, _ = metadata_from_zarr_json(tmpdir / "store.zarr/a/zarr.json") - assert zarray == vds_with_manifest_arrays.a.data.zarray diff --git a/virtualizarr/tests/test_zarr.py b/virtualizarr/tests/test_zarr.py deleted file mode 100644 index 95dbf55f..00000000 --- a/virtualizarr/tests/test_zarr.py +++ /dev/null @@ -1,29 +0,0 @@ -import numpy as np - -from virtualizarr.zarr import ZArray - - -def test_replace_partial(): - arr = ZArray(shape=(2, 3), chunks=(1, 1), dtype=np.dtype(" OpenFileType: """Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input. @@ -50,18 +52,26 @@ def read_bytes(self, bytes: int) -> bytes: with self.open_file() as of: return of.read(bytes) + def get_mapper(self): + """Returns a mapper for use with Zarr""" + return self.fs.get_mapper(self.filepath) + def __post_init__(self) -> None: """Initialize the fsspec filesystem object""" import fsspec from upath import UPath - universal_filepath = UPath(self.filepath) - protocol = universal_filepath.protocol + if not isinstance(self.filepath, UPath): + upath = UPath(self.filepath) + + self.upath = upath + self.protocol = upath.protocol + self.filepath = upath.as_uri() self.reader_options = self.reader_options or {} storage_options = self.reader_options.get("storage_options", {}) # type: ignore - self.fs = fsspec.filesystem(protocol, **storage_options) + self.fs = fsspec.filesystem(self.protocol, **storage_options) def check_for_collisions( diff --git a/virtualizarr/writers/zarr.py b/virtualizarr/writers/zarr.py deleted file mode 100644 index b9529ad5..00000000 --- a/virtualizarr/writers/zarr.py +++ /dev/null @@ -1,114 +0,0 @@ -from pathlib import Path - -import numpy as np -from xarray import Dataset -from xarray.core.variable import Variable - -from virtualizarr.vendor.zarr.utils import json_dumps -from virtualizarr.zarr import ZArray - - -def dataset_to_zarr(ds: Dataset, storepath: str) -> None: - """ - Write an xarray dataset whose variables wrap ManifestArrays to a v3 Zarr store, writing chunk references into manifest.json files. - - Currently requires all variables to be backed by ManifestArray objects. - - Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. - See https://github.com/zarr-developers/zarr-specs/issues/287 - - Parameters - ---------- - ds: xr.Dataset - storepath: str - """ - - from virtualizarr.manifests import ManifestArray - - _storepath = Path(storepath) - Path.mkdir(_storepath, exist_ok=False) - - # should techically loop over groups in a tree but a dataset corresponds to only one group - group_metadata = {"zarr_format": 3, "node_type": "group", "attributes": ds.attrs} - with open(_storepath / "zarr.json", "wb") as group_metadata_file: - group_metadata_file.write(json_dumps(group_metadata)) - - for name, var in ds.variables.items(): - array_dir = _storepath / str(name) - marr = var.data - - # TODO move this check outside the writing loop so we don't write an incomplete store on failure? - # TODO at some point this should be generalized to also write in-memory arrays as normal zarr chunks, see GH isse #62. - if not isinstance(marr, ManifestArray): - raise TypeError( - "Only xarray objects wrapping ManifestArrays can be written to zarr using this method, " - f"but variable {name} wraps an array of type {type(marr)}" - ) - - Path.mkdir(array_dir, exist_ok=False) - - # write the chunk references into a manifest.json file - # and the array metadata into a zarr.json file - to_zarr_json(var, array_dir) - - -def to_zarr_json(var: Variable, array_dir: Path) -> None: - """ - Write out both the zarr.json and manifest.json file into the given zarr array directory. - - Follows the Zarr v3 manifest storage transformer ZEP (see https://github.com/zarr-developers/zarr-specs/issues/287). - - Parameters - ---------- - var : xr.Variable - Must be wrapping a ManifestArray - dirpath : str - Zarr store array directory into which to write files. - """ - - marr = var.data - - marr.manifest.to_zarr_json(array_dir / "manifest.json") - - metadata = zarr_v3_array_metadata( - marr.zarray, [str(x) for x in var.dims], var.attrs - ) - with open(array_dir / "zarr.json", "wb") as metadata_file: - metadata_file.write(json_dumps(metadata)) - - -def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> dict: - """Construct a v3-compliant metadata dict from v2 zarray + information stored on the xarray variable.""" - # TODO it would be nice if we could use the zarr-python metadata.ArrayMetadata classes to do this conversion for us - metadata = zarray.dict() - - # adjust to match v3 spec - metadata["zarr_format"] = 3 - metadata["node_type"] = "array" - metadata["data_type"] = str(np.dtype(metadata.pop("dtype"))) - metadata["chunk_grid"] = { - "name": "regular", - "configuration": {"chunk_shape": metadata.pop("chunks")}, - } - metadata["chunk_key_encoding"] = { - "name": "default", - "configuration": {"separator": "/"}, - } - metadata["codecs"] = tuple(c.to_dict() for c in zarray._v3_codec_pipeline()) - metadata.pop("filters") - metadata.pop("compressor") - metadata.pop("order") - - # indicate that we're using the manifest storage transformer ZEP - metadata["storage_transformers"] = [ - { - "name": "chunk-manifest-json", - "configuration": {"manifest": "./manifest.json"}, - } - ] - - # add information from xarray object - metadata["dimension_names"] = dim_names - metadata["attributes"] = attrs - - return metadata diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index 4c36e7dc..57f36854 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -183,6 +183,8 @@ def _v3_codec_pipeline(self) -> Any: [_num_codec_config_to_configurable(filter) for filter in self.filters] ) + # compressor {'name': 'zstd', 'configuration': {'level': 0, 'checksum': False}} + # gives: *** KeyError: 'id' for zarr v3 if self.compressor: codec_configs.append(_num_codec_config_to_configurable(self.compressor))