diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index e5ce819..e49f458 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -34,6 +34,10 @@ jobs: - { os: linux, manylinux: musllinux_1_2, target: armv7 } # windows - { os: windows, target: i686, python-architecture: x86 } + exclude: + # https://github.com/rust-cross/cargo-xwin/issues/76 + - os: windows + target: aarch64 runs-on: ${{ (matrix.os == 'linux' && 'ubuntu') || matrix.os }}-latest steps: - uses: actions/checkout@v4 diff --git a/Cargo.toml b/Cargo.toml index 7403bf3..2524bf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,9 @@ numpy = "0.23.0" unsafe_cell_slice = "0.2.0" serde_json = "1.0.128" pyo3-stub-gen = "0.6.1" +opendal = { version = "0.50.2", features = ["services-http"] } +tokio = { version = "1.41.1", features = ["rt-multi-thread"] } +zarrs_opendal = "0.4.0" [profile.release] lto = true diff --git a/README.md b/README.md index e43df23..c48bebe 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,14 @@ You can then use your `zarr` as normal (with some caveats)! We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here. We also export two errors, `DiscontiguousArrayError` and `CollapsedDimensionError` that can be thrown in the process of converting to indexers that `zarrs` can understand (see below for more details). -At the moment, we only support local filesystems but intend to support more in the future: https://github.com/ilan-gold/zarrs-python/issues/44 +At the moment, we only support a subset of the `zarr-python` stores: + +- [x] [LocalStore](https://zarr.readthedocs.io/en/main/_autoapi/zarr/storage/local/index.html) (FileSystem) +- [RemoteStore](https://zarr.readthedocs.io/en/main/_autoapi/zarr/storage/remote/index.html) + - [x] [HTTPFileSystem](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.http.HTTPFileSystem) + +A `NotImplementedError` will be raised if a store is not supported. +We intend to support more stores in the future: https://github.com/ilan-gold/zarrs-python/issues/44. ### Configuration diff --git a/pyproject.toml b/pyproject.toml index 77bf9c4..87acc4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ 'donfig', 'pytest', 'universal_pathlib>=0.2.0', - 'zarr>=3.0.0b2', + 'zarr>=3.0.0b2,<=3.0.0b3', ] [project.optional-dependencies] diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 6c8ce36..4223e3d 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -2,10 +2,15 @@ # ruff: noqa: E501, F401 import typing +from enum import Enum, auto import numpy import numpy.typing +class Basic: + def __new__(cls, byte_interface: typing.Any, chunk_spec: typing.Any): ... + ... + class CodecPipelineImpl: def __new__( cls, @@ -19,29 +24,34 @@ class CodecPipelineImpl: ): ... def retrieve_chunks_and_apply_index( self, - chunk_descriptions: typing.Sequence[ - tuple[ - tuple[str, typing.Sequence[int], str, typing.Sequence[int]], - typing.Sequence[slice], - typing.Sequence[slice], - ] - ], + chunk_descriptions: typing.Sequence[WithSubset], value: numpy.NDArray[typing.Any], ) -> None: ... def retrieve_chunks( - self, - chunk_descriptions: typing.Sequence[ - tuple[str, typing.Sequence[int], str, typing.Sequence[int]] - ], + self, chunk_descriptions: typing.Sequence[Basic] ) -> list[numpy.typing.NDArray[numpy.uint8]]: ... def store_chunks_with_indices( self, - chunk_descriptions: typing.Sequence[ - tuple[ - tuple[str, typing.Sequence[int], str, typing.Sequence[int]], - typing.Sequence[slice], - typing.Sequence[slice], - ] - ], + chunk_descriptions: typing.Sequence[WithSubset], value: numpy.NDArray[typing.Any], ) -> None: ... + +class FilesystemStoreConfig: + root: str + +class HttpStoreConfig: + endpoint: str + +class WithSubset: + def __new__( + cls, + item: Basic, + chunk_subset: typing.Sequence[slice], + subset: typing.Sequence[slice], + shape: typing.Sequence[int], + ): ... + ... + +class StoreConfig(Enum): + Filesystem = auto() + Http = auto() diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index f6552ba..86846d2 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -6,10 +6,7 @@ from typing import TYPE_CHECKING, TypedDict import numpy as np -from zarr.abc.codec import ( - Codec, - CodecPipeline, -) +from zarr.abc.codec import Codec, CodecPipeline from zarr.core.config import config if TYPE_CHECKING: @@ -18,7 +15,7 @@ from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec - from zarr.core.buffer import Buffer, NDBuffer + from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer from zarr.core.chunk_grids import ChunkGrid from zarr.core.common import ChunkCoords from zarr.core.indexing import SelectorTuple @@ -120,19 +117,20 @@ async def read( batch_info: Iterable[ tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple] ], - out: NDBuffer, + out: NDBuffer, # type: ignore drop_axes: tuple[int, ...] = (), # FIXME: unused ) -> None: - out = out.as_ndarray_like() # FIXME: Error if array is not in host memory + # FIXME: Error if array is not in host memory + out: NDArrayLike = out.as_ndarray_like() if not out.dtype.isnative: raise RuntimeError("Non-native byte order not supported") try: - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) - index_in_rust = True + chunks_desc = make_chunk_info_for_rust_with_indices( + batch_info, drop_axes, out.shape + ) except (DiscontiguousArrayError, CollapsedDimensionError): chunks_desc = make_chunk_info_for_rust(batch_info) - index_in_rust = False - if index_in_rust: + else: await asyncio.to_thread( self.impl.retrieve_chunks_and_apply_index, chunks_desc, @@ -140,10 +138,7 @@ async def read( ) return None chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc) - for chunk, chunk_info in zip(chunks, batch_info): - out_selection = chunk_info[3] - selection = chunk_info[2] - spec = chunk_info[1] + for chunk, (_, spec, selection, out_selection) in zip(chunks, batch_info): chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape) chunk_selected = chunk_reshaped[selection] if drop_axes: @@ -155,18 +150,17 @@ async def write( batch_info: Iterable[ tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], - value: NDBuffer, + value: NDBuffer, # type: ignore drop_axes: tuple[int, ...] = (), ) -> None: - value = value.as_ndarray_like() # FIXME: Error if array is not in host memory + # FIXME: Error if array is not in host memory + value: NDArrayLike | np.ndarray = value.as_ndarray_like() if not value.dtype.isnative: value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("=")) elif not value.flags.c_contiguous: value = np.ascontiguousarray(value) - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) - await asyncio.to_thread( - self.impl.store_chunks_with_indices, - chunks_desc, - value, + chunks_desc = make_chunk_info_for_rust_with_indices( + batch_info, drop_axes, value.shape ) + await asyncio.to_thread(self.impl.store_chunks_with_indices, chunks_desc, value) return None diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 43e8b73..0aa0d0d 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -3,18 +3,19 @@ import operator import os from functools import reduce -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import numpy as np from zarr.core.indexing import SelectorTuple, is_integer +from zarrs._internal import Basic, WithSubset + if TYPE_CHECKING: from collections.abc import Iterable from types import EllipsisType from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec - from zarr.core.common import ChunkCoords # adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor @@ -62,17 +63,6 @@ def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[sli return make_slice_selection(selector_tuple) -def convert_chunk_to_primitive( - byte_getter: ByteGetter | ByteSetter, chunk_spec: ArraySpec -) -> tuple[str, ChunkCoords, str, Any]: - return ( - str(byte_getter), - chunk_spec.shape, - str(chunk_spec.dtype), - chunk_spec.fill_value.tobytes(), - ) - - def resulting_shape_from_index( array_shape: tuple[int, ...], index_tuple: tuple[int | slice | EllipsisType | np.ndarray], @@ -149,10 +139,12 @@ def make_chunk_info_for_rust_with_indices( tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], drop_axes: tuple[int, ...], -) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]: - chunk_info_with_indices = [] + shape: tuple[int, ...], +) -> list[WithSubset]: + shape = shape if shape else (1,) # constant array + chunk_info_with_indices: list[WithSubset] = [] for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info: - chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec) + chunk_info = Basic(byte_getter, chunk_spec) out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) shape_chunk_selection_slices = get_shape_for_selector( @@ -169,7 +161,12 @@ def make_chunk_info_for_rust_with_indices( f"{shape_chunk_selection} != {shape_chunk_selection_slices}" ) chunk_info_with_indices.append( - (chunk_info, out_selection_as_slices, chunk_selection_as_slices) + WithSubset( + chunk_info, + chunk_subset=chunk_selection_as_slices, + subset=out_selection_as_slices, + shape=shape, + ) ) return chunk_info_with_indices @@ -178,8 +175,8 @@ def make_chunk_info_for_rust( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], -) -> list[tuple[str, ChunkCoords, str, Any]]: - return list( - convert_chunk_to_primitive(byte_getter, chunk_spec) - for (byte_getter, chunk_spec, _, _) in batch_info - ) +) -> list[Basic]: + return [ + Basic(byte_interface, chunk_spec) + for (byte_interface, chunk_spec, _, _) in batch_info + ] diff --git a/src/chunk_item.rs b/src/chunk_item.rs index c381f9b..785bed0 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -1,72 +1,94 @@ -use std::{num::NonZeroU64, sync::Arc}; +use std::num::NonZeroU64; use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, - types::{PySlice, PySliceMethods}, - Bound, PyErr, PyResult, + pyclass, pymethods, + types::{PyAnyMethods as _, PySlice, PySliceMethods as _}, + Bound, PyAny, PyErr, PyResult, }; +use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use zarrs::{ array::{ChunkRepresentation, DataType, FillValue}, array_subset::ArraySubset, metadata::v3::{array::data_type::DataTypeMetadataV3, MetadataV3}, - storage::{MaybeBytes, ReadableWritableListableStorageTraits, StorageError, StoreKey}, + storage::StoreKey, }; -use crate::utils::PyErrExt; - -pub(crate) type Raw<'a> = ( - // store path - String, - // shape - Vec, - // data type - String, - // fill value bytes - Vec, -); - -pub(crate) type RawWithIndices<'a> = ( - Raw<'a>, - // out selection - Vec>, - // chunk selection - Vec>, -); - -pub(crate) trait IntoItem: std::marker::Sized { - fn store_path(&self) -> &str; - fn into_item( - self, - store: Arc, - key: StoreKey, - shape: S, - ) -> PyResult; -} +use crate::{store::StoreConfig, utils::PyErrExt}; pub(crate) trait ChunksItem { - fn store(&self) -> Arc; + fn store_config(&self) -> StoreConfig; fn key(&self) -> &StoreKey; fn representation(&self) -> &ChunkRepresentation; - - fn get(&self) -> Result { - self.store().get(self.key()) - } } +#[derive(Clone)] +#[gen_stub_pyclass] +#[pyclass] pub(crate) struct Basic { - store: Arc, + store: StoreConfig, key: StoreKey, representation: ChunkRepresentation, } +#[gen_stub_pymethods] +#[pymethods] +impl Basic { + #[new] + fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult { + let store: StoreConfig = byte_interface.getattr("store")?.extract()?; + let path: String = byte_interface.getattr("path")?.extract()?; + + let chunk_shape = chunk_spec.getattr("shape")?.extract()?; + let dtype: String = chunk_spec + .getattr("dtype")? + .call_method0("__str__")? + .extract()?; + let fill_value = chunk_spec + .getattr("fill_value")? + .call_method0("tobytes")? + .extract()?; + Ok(Self { + store, + key: StoreKey::new(path).map_py_err::()?, + representation: get_chunk_representation(chunk_shape, &dtype, fill_value)?, + }) + } +} + +#[derive(Clone)] +#[gen_stub_pyclass] +#[pyclass] pub(crate) struct WithSubset { pub item: Basic, pub chunk_subset: ArraySubset, pub subset: ArraySubset, } +#[gen_stub_pymethods] +#[pymethods] +impl WithSubset { + #[new] + #[allow(clippy::needless_pass_by_value)] + fn new( + item: Basic, + chunk_subset: Vec>, + subset: Vec>, + shape: Vec, + ) -> PyResult { + let chunk_subset = + selection_to_array_subset(&chunk_subset, &item.representation.shape_u64())?; + let subset = selection_to_array_subset(&subset, &shape)?; + Ok(Self { + item, + chunk_subset, + subset, + }) + } +} + impl ChunksItem for Basic { - fn store(&self) -> Arc { + fn store_config(&self) -> StoreConfig { self.store.clone() } fn key(&self) -> &StoreKey { @@ -78,7 +100,7 @@ impl ChunksItem for Basic { } impl ChunksItem for WithSubset { - fn store(&self) -> Arc { + fn store_config(&self) -> StoreConfig { self.item.store.clone() } fn key(&self) -> &StoreKey { @@ -89,47 +111,6 @@ impl ChunksItem for WithSubset { } } -impl<'a> IntoItem for Raw<'a> { - fn store_path(&self) -> &str { - &self.0 - } - fn into_item( - self, - store: Arc, - key: StoreKey, - (): (), - ) -> PyResult { - let (_, chunk_shape, dtype, fill_value) = self; - let representation = get_chunk_representation(chunk_shape, &dtype, fill_value)?; - Ok(Basic { - store, - key, - representation, - }) - } -} - -impl IntoItem for RawWithIndices<'_> { - fn store_path(&self) -> &str { - &self.0 .0 - } - fn into_item( - self, - store: Arc, - key: StoreKey, - shape: &[u64], - ) -> PyResult { - let (raw, selection, chunk_selection) = self; - let chunk_shape = raw.1.clone(); - let item = raw.into_item(store.clone(), key, ())?; - Ok(WithSubset { - item, - chunk_subset: selection_to_array_subset(&chunk_selection, &chunk_shape)?, - subset: selection_to_array_subset(&selection, shape)?, - }) - } -} - fn get_chunk_representation( chunk_shape: Vec, dtype: &str, diff --git a/src/codec_pipeline_store_filesystem.rs b/src/codec_pipeline_store_filesystem.rs deleted file mode 100644 index ea573ee..0000000 --- a/src/codec_pipeline_store_filesystem.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::sync::Arc; - -use pyo3::{ - exceptions::{PyRuntimeError, PyValueError}, - PyErr, PyResult, -}; -use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorageTraits}; - -use crate::{utils::PyErrExt, CodecPipelineStore}; - -pub struct CodecPipelineStoreFilesystem { - store: Arc, - cwd: String, -} - -impl CodecPipelineStoreFilesystem { - pub fn new() -> PyResult { - let store = Arc::new(FilesystemStore::new("/").map_py_err::()?); - let cwd = std::env::current_dir()? - .to_string_lossy() - .replace('\\', "/"); // TODO: Check zarr-python path handling on windows - - // Remove the leading / from the cwd if preset, so cwd is a valid Zarr store path - let cwd = cwd.strip_prefix("/").unwrap_or(&cwd).to_string(); - Ok(Self { store, cwd }) - } -} - -impl CodecPipelineStore for CodecPipelineStoreFilesystem { - fn store(&self) -> Arc { - self.store.clone() - } - - fn chunk_path(&self, store_path: &str) -> PyResult { - if let Some(chunk_path) = store_path.strip_prefix("file://") { - if let Some(chunk_path) = chunk_path.strip_prefix("/") { - Ok(chunk_path.to_string()) - } else { - Ok(format!("{}/{}", self.cwd, chunk_path)) - } - } else { - Err(PyErr::new::(format!( - "a filesystem store was initialised, but received a store path without a file:// prefix: {store_path}" - ))) - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 577ada6..cbf7739 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,9 @@ #![warn(clippy::pedantic)] +#![allow(clippy::module_name_repetitions)] + +use std::borrow::Cow; +use std::sync::Arc; -use chunk_item::{ChunksItem, IntoItem}; -use concurrency::ChunkConcurrentLimitAndCodecOptions; use numpy::npyffi::PyArrayObject; use numpy::{IntoPyArray, PyArray1, PyUntypedArray, PyUntypedArrayMethods}; use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; @@ -10,40 +12,33 @@ use pyo3_stub_gen::define_stub_info_gatherer; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; -use std::borrow::Cow; -use std::sync::{Arc, Mutex}; use unsafe_cell_slice::UnsafeCellSlice; -use zarrs::array::codec::{ - ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, StoragePartialDecoder, -}; +use zarrs::array::codec::{ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder}; use zarrs::array::{ copy_fill_value_into, update_array_bytes, ArrayBytes, ArraySize, CodecChain, FillValue, }; use zarrs::array_subset::ArraySubset; use zarrs::metadata::v3::MetadataV3; -use zarrs::storage::{ReadableWritableListableStorageTraits, StorageHandle, StoreKey}; mod chunk_item; -mod codec_pipeline_store_filesystem; mod concurrency; +mod runtime; +mod store; #[cfg(test)] mod tests; mod utils; -use codec_pipeline_store_filesystem::CodecPipelineStoreFilesystem; -use utils::{PyErrExt, PyUntypedArrayExt}; - -trait CodecPipelineStore: Send + Sync { - fn store(&self) -> Arc; - fn chunk_path(&self, store_path: &str) -> PyResult; -} +use crate::chunk_item::ChunksItem; +use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; +use crate::store::StoreManager; +use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _}; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] #[pyclass] pub struct CodecPipelineImpl { + pub(crate) stores: StoreManager, pub(crate) codec_chain: Arc, - pub(crate) store: Mutex>>, pub(crate) codec_options: CodecOptions, pub(crate) chunk_concurrent_minimum: usize, pub(crate) chunk_concurrent_maximum: usize, @@ -51,52 +46,13 @@ pub struct CodecPipelineImpl { } impl CodecPipelineImpl { - fn get_store_and_path( - &self, - store_path: &str, - ) -> PyResult<(Arc, String)> { - let mut gstore = self.store.lock().map_err(|_| { - PyErr::new::("failed to lock the store mutex".to_string()) - })?; - - #[allow(clippy::collapsible_if)] - if gstore.is_none() { - if store_path.starts_with("file://") { - *gstore = Some(Arc::new(CodecPipelineStoreFilesystem::new()?)); - } - // TODO: Add support for more stores - } - - if let Some(gstore) = gstore.as_ref() { - Ok((gstore.store(), gstore.chunk_path(store_path)?)) - } else { - Err(PyErr::new::(format!( - "unsupported store for {store_path}" - ))) - } - } - - fn collect_chunk_descriptions, I, S: Copy>( - &self, - chunk_descriptions: Vec, - shape: S, - ) -> PyResult> { - chunk_descriptions - .into_iter() - .map(|raw| { - let (store, path) = self.get_store_and_path(raw.store_path())?; - let key = StoreKey::new(path).map_py_err::()?; - raw.into_item(store, key, shape) - }) - .collect() - } - fn retrieve_chunk_bytes<'a, I: ChunksItem>( + &self, item: &I, codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = item.get().map_py_err::()?; + let value_encoded = self.stores.get(item)?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain @@ -113,6 +69,7 @@ impl CodecPipelineImpl { } fn store_chunk_bytes( + &self, item: &I, codec_chain: &CodecChain, value_decoded: ArrayBytes, @@ -126,7 +83,7 @@ impl CodecPipelineImpl { .map_py_err::()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - item.store().erase(item.key()) + self.stores.erase(item) } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) @@ -134,40 +91,37 @@ impl CodecPipelineImpl { .map_py_err::()?; // Store the encoded chunk - item.store().set(item.key(), value_encoded.into()) + self.stores.set(item, value_encoded.into()) } - .map_py_err::() } fn store_chunk_subset_bytes( + &self, item: &I, codec_chain: &CodecChain, chunk_subset_bytes: ArrayBytes, chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - if !chunk_subset.inbounds(&item.representation().shape_u64()) { - return Err(PyErr::new::( - "chunk subset is out of bounds".to_string(), - )); + let array_shape = item.representation().shape_u64(); + if !chunk_subset.inbounds(&array_shape) { + return Err(PyErr::new::(format!( + "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" + ))); } + let data_type_size = item.representation().data_type().size(); - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == item.representation().shape_u64() - { + if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == array_shape { // Fast path if the chunk subset spans the entire chunk, no read required - Self::store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) + self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { // Validate the chunk subset bytes chunk_subset_bytes - .validate( - chunk_subset.num_elements(), - item.representation().data_type().size(), - ) + .validate(chunk_subset.num_elements(), data_type_size) .map_py_err::()?; // Retrieve the chunk - let chunk_bytes_old = Self::retrieve_chunk_bytes(item, codec_chain, codec_options)?; + let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; // Update the chunk let chunk_bytes_new = unsafe { @@ -178,15 +132,15 @@ impl CodecPipelineImpl { // - output bytes and output subset bytes are compatible (same data type) update_array_bytes( chunk_bytes_old, - &item.representation().shape_u64(), + &array_shape, chunk_subset, &chunk_subset_bytes, - item.representation().data_type().size(), + data_type_size, ) }; // Store the updated chunk - Self::store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) + self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) } } @@ -278,8 +232,8 @@ impl CodecPipelineImpl { let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); Ok(Self { + stores: StoreManager::default(), codec_chain, - store: Mutex::new(None), codec_options, chunk_concurrent_minimum, chunk_concurrent_maximum, @@ -290,7 +244,7 @@ impl CodecPipelineImpl { fn retrieve_chunks_and_apply_index( &self, py: Python, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { // Get input array @@ -301,8 +255,6 @@ impl CodecPipelineImpl { } let output = Self::nparray_to_unsafe_cell_slice(value); let output_shape: Vec = value.shape_zarr()?; - let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &output_shape)?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = @@ -318,8 +270,7 @@ impl CodecPipelineImpl { && item.chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - let chunk_encoded = item.get().map_py_err::()?; - if let Some(chunk_encoded) = chunk_encoded { + if let Some(chunk_encoded) = self.stores.get(&item)? { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); unsafe { @@ -352,14 +303,7 @@ impl CodecPipelineImpl { } } } else { - // Partially decode the chunk into the output buffer - let storage_handle = Arc::new(StorageHandle::new(item.store().clone())); - // NOTE: Normally a storage transformer would exist between the storage handle and the input handle - // but zarr-python does not support them nor forward them to the codec pipeline - let input_handle = Arc::new(StoragePartialDecoder::new( - storage_handle, - item.key().clone(), - )); + let input_handle = Arc::new(self.stores.decoder(&item)?); let partial_decoder = self .codec_chain .clone() @@ -396,10 +340,8 @@ impl CodecPipelineImpl { fn retrieve_chunks<'py>( &self, py: Python<'py>, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? ) -> PyResult>>> { - let chunk_descriptions = self.collect_chunk_descriptions(chunk_descriptions, ())?; - // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? @@ -409,8 +351,7 @@ impl CodecPipelineImpl { let chunk_bytes = py.allow_threads(move || { let get_chunk_subset = |item: chunk_item::Basic| { - let chunk_encoded = item.get().map_py_err::()?; - Ok(if let Some(chunk_encoded) = chunk_encoded { + Ok(if let Some(chunk_encoded) = self.stores.get(&item)? { let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain .decode( @@ -447,7 +388,7 @@ impl CodecPipelineImpl { fn store_chunks_with_indices( &self, py: Python, - chunk_descriptions: Vec, + chunk_descriptions: Vec, value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { enum InputValue<'a> { @@ -468,10 +409,7 @@ impl CodecPipelineImpl { } else { InputValue::Constant(FillValue::new(input_slice.to_vec())) }; - let input_shape: Vec = value.shape_zarr()?; - let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &input_shape)?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = @@ -490,7 +428,7 @@ impl CodecPipelineImpl { item.item.representation().data_type(), ) .map_py_err::()?; - Self::store_chunk_subset_bytes( + self.store_chunk_subset_bytes( &item, &self.codec_chain, chunk_subset_bytes, @@ -507,7 +445,7 @@ impl CodecPipelineImpl { constant_value, ); - Self::store_chunk_subset_bytes( + self.store_chunk_subset_bytes( &item, &self.codec_chain, chunk_subset_bytes, @@ -534,6 +472,8 @@ impl CodecPipelineImpl { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/runtime.rs b/src/runtime.rs new file mode 100644 index 0000000..161db08 --- /dev/null +++ b/src/runtime.rs @@ -0,0 +1,18 @@ +use std::sync::OnceLock; +use tokio::runtime::Runtime; +use zarrs::storage::storage_adapter::async_to_sync::AsyncToSyncBlockOn; + +static RUNTIME: OnceLock = OnceLock::new(); + +pub struct TokioBlockOn(tokio::runtime::Handle); + +impl AsyncToSyncBlockOn for TokioBlockOn { + fn block_on(&self, future: F) -> F::Output { + self.0.block_on(future) + } +} + +pub fn tokio_block_on() -> TokioBlockOn { + let runtime = RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")); + TokioBlockOn(runtime.handle().clone()) +} diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..7bc8abb --- /dev/null +++ b/src/store.rs @@ -0,0 +1,85 @@ +use std::{collections::HashMap, sync::Arc}; + +use opendal::Builder; +use pyo3::{ + exceptions::{PyNotImplementedError, PyValueError}, + types::{PyAnyMethods, PyStringMethods, PyTypeMethods}, + Bound, FromPyObject, PyAny, PyErr, PyResult, +}; +use pyo3_stub_gen::derive::gen_stub_pyclass_enum; +use zarrs::storage::{ + storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, ReadableWritableListableStorage, +}; + +use crate::{runtime::tokio_block_on, utils::PyErrExt}; + +mod filesystem; +mod http; +mod manager; + +pub use self::filesystem::FilesystemStoreConfig; +pub use self::http::HttpStoreConfig; +pub(crate) use self::manager::StoreManager; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[gen_stub_pyclass_enum] +pub enum StoreConfig { + Filesystem(FilesystemStoreConfig), + Http(HttpStoreConfig), + // TODO: Add support for more stores +} + +impl<'py> FromPyObject<'py> for StoreConfig { + fn extract_bound(store: &Bound<'py, PyAny>) -> PyResult { + let name = store.get_type().name()?; + let name = name.to_str()?; + match name { + "LocalStore" => { + let root: String = store.getattr("root")?.call_method0("__str__")?.extract()?; + Ok(StoreConfig::Filesystem(FilesystemStoreConfig::new(root))) + } + "RemoteStore" => { + let fs = store.getattr("fs")?; + let fs_name = fs.get_type().name()?; + let fs_name = fs_name.to_str()?; + let path: String = store.getattr("path")?.extract()?; + let storage_options: HashMap> = + fs.getattr("storage_options")?.extract()?; + match fs_name { + "HTTPFileSystem" => Ok(StoreConfig::Http(HttpStoreConfig::new( + &path, + &storage_options, + )?)), + _ => Err(PyErr::new::(format!( + "zarrs-python does not support {fs_name} (RemoteStore) stores" + ))), + } + } + _ => Err(PyErr::new::(format!( + "zarrs-python does not support {name} stores" + ))), + } + } +} + +impl TryFrom<&StoreConfig> for ReadableWritableListableStorage { + type Error = PyErr; + + fn try_from(value: &StoreConfig) -> Result { + match value { + StoreConfig::Filesystem(config) => config.try_into(), + StoreConfig::Http(config) => config.try_into(), + } + } +} + +fn opendal_builder_to_sync_store( + builder: B, +) -> PyResult { + let operator = opendal::Operator::new(builder) + .map_py_err::()? + .finish(); + let store = Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator)); + let store = Arc::new(AsyncToSyncStorageAdapter::new(store, tokio_block_on())); + Ok(store) +} diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs new file mode 100644 index 0000000..8ee865b --- /dev/null +++ b/src/store/filesystem.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; + +use pyo3::{exceptions::PyRuntimeError, pyclass, PyErr}; +use pyo3_stub_gen::derive::gen_stub_pyclass; +use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorage}; + +use crate::utils::PyErrExt; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[gen_stub_pyclass] +#[pyclass] +pub struct FilesystemStoreConfig { + #[pyo3(get, set)] + pub root: String, +} + +impl FilesystemStoreConfig { + pub fn new(root: String) -> Self { + Self { root } + } +} + +impl TryInto for &FilesystemStoreConfig { + type Error = PyErr; + + fn try_into(self) -> Result { + let store = + Arc::new(FilesystemStore::new(self.root.clone()).map_py_err::()?); + Ok(store) + } +} diff --git a/src/store/http.rs b/src/store/http.rs new file mode 100644 index 0000000..0c7820b --- /dev/null +++ b/src/store/http.rs @@ -0,0 +1,46 @@ +use std::collections::HashMap; + +use pyo3::{exceptions::PyValueError, pyclass, Bound, PyAny, PyErr, PyResult}; +use pyo3_stub_gen::derive::gen_stub_pyclass; +use zarrs::storage::ReadableWritableListableStorage; + +use super::opendal_builder_to_sync_store; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[gen_stub_pyclass] +#[pyclass] +pub struct HttpStoreConfig { + #[pyo3(get, set)] + pub endpoint: String, +} + +impl HttpStoreConfig { + pub fn new(path: &str, storage_options: &HashMap>) -> PyResult { + if !storage_options.is_empty() { + for storage_option in storage_options.keys() { + match storage_option.as_str() { + // TODO: Add support for other storage options + "asynchronous" => {} + _ => { + return Err(PyValueError::new_err(format!( + "Unsupported storage option for HTTPFileSystem: {storage_option}" + ))); + } + } + } + } + + Ok(Self { + endpoint: path.to_string(), + }) + } +} + +impl TryInto for &HttpStoreConfig { + type Error = PyErr; + + fn try_into(self) -> Result { + let builder = opendal::services::Http::default().endpoint(&self.endpoint); + opendal_builder_to_sync_store(builder) + } +} diff --git a/src/store/manager.rs b/src/store/manager.rs new file mode 100644 index 0000000..d32a68d --- /dev/null +++ b/src/store/manager.rs @@ -0,0 +1,61 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, Mutex}, +}; + +use pyo3::{exceptions::PyRuntimeError, PyResult}; +use zarrs::{ + array::codec::StoragePartialDecoder, + storage::{Bytes, MaybeBytes, ReadableWritableListableStorage, StorageHandle}, +}; + +use crate::{chunk_item::ChunksItem, store::PyErrExt as _}; + +use super::StoreConfig; + +#[derive(Default)] +pub(crate) struct StoreManager(Mutex>); + +impl StoreManager { + fn store(&self, item: &I) -> PyResult { + use std::collections::btree_map::Entry::{Occupied, Vacant}; + match self + .0 + .lock() + .map_py_err::()? + .entry(item.store_config()) + { + Occupied(e) => Ok(e.get().clone()), + Vacant(e) => Ok(e.insert((&item.store_config()).try_into()?).clone()), + } + } + + pub(crate) fn get(&self, item: &I) -> PyResult { + self.store(item)? + .get(item.key()) + .map_py_err::() + } + + pub(crate) fn set(&self, item: &I, value: Bytes) -> PyResult<()> { + self.store(item)? + .set(item.key(), value) + .map_py_err::() + } + + pub(crate) fn erase(&self, item: &I) -> PyResult<()> { + self.store(item)? + .erase(item.key()) + .map_py_err::() + } + + pub(crate) fn decoder(&self, item: &I) -> PyResult { + // Partially decode the chunk into the output buffer + let storage_handle = Arc::new(StorageHandle::new(self.store(item)?)); + // NOTE: Normally a storage transformer would exist between the storage handle and the input handle + // but zarr-python does not support them nor forward them to the codec pipeline + Ok(StoragePartialDecoder::new( + storage_handle, + item.key().clone(), + )) + } +} diff --git a/src/tests.rs b/src/tests.rs index 355e8ec..2cf4570 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,3 +1,5 @@ +use pyo3::ffi::c_str; + use numpy::PyUntypedArray; use pyo3::{ types::{PyAnyMethods, PyModule}, @@ -10,13 +12,15 @@ use crate::CodecPipelineImpl; fn test_nparray_to_unsafe_cell_slice_empty() -> PyResult<()> { pyo3::prepare_freethreaded_python(); Python::with_gil(|py| { - let arr: Bound<'_, PyUntypedArray> = PyModule::from_code_bound( + let arr: Bound<'_, PyUntypedArray> = PyModule::from_code( py, - "def empty_array(): + c_str!( + "def empty_array(): import numpy as np - return np.empty(0, dtype=np.uint8)", - "", - "", + return np.empty(0, dtype=np.uint8)" + ), + c_str!(""), + c_str!(""), )? .getattr("empty_array")? .call0()? diff --git a/tests/test_zarrs_http.py b/tests/test_zarrs_http.py new file mode 100644 index 0000000..7b23db1 --- /dev/null +++ b/tests/test_zarrs_http.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import aiohttp +import numpy as np +import pytest +import zarr +from zarr.storage.remote import RemoteStore + +ARR_REF = np.array( + [ + [np.nan, np.nan, np.nan, np.nan, 0.1, 0.1, -0.6, 0.1], + [np.nan, np.nan, np.nan, np.nan, 0.1, 0.1, -1.6, 0.1], + [np.nan, np.nan, np.nan, np.nan, 0.1, 0.1, -2.6, 0.1], + [np.nan, np.nan, np.nan, np.nan, -3.4, -3.5, -3.6, 0.1], + [1.0, 1.0, 1.0, -4.3, -4.4, -4.5, -4.6, 1.1], + [1.0, 1.0, 1.0, -5.3, -5.4, -5.5, -5.6, 1.1], + [1.0, 1.0, 1.0, 1.0, 1.1, 1.1, -6.6, 1.1], + [1.0, 1.0, 1.0, 1.0, -7.4, -7.5, -7.6, -7.7], + ] +) + +URL = "https://raw.githubusercontent.com/LDeakin/zarrs/main/zarrs/tests/data/array_write_read.zarr/group/array" + + +def test_zarrs_http(): + arr = zarr.open(URL) + assert arr.shape == (8, 8) + assert np.allclose(arr[:], ARR_REF, equal_nan=True) + + +@pytest.mark.xfail(reason="Storage options are not supported for HTTP store") +def test_zarrs_http_kwargs(): + store = RemoteStore.from_url( + URL, storage_options={"auth": aiohttp.BasicAuth("user", "pass")} + ) + arr = zarr.open(store) + assert arr.shape == (8, 8) + assert np.allclose(arr[:], ARR_REF, equal_nan=True)