Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async improvements #28

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ from object_store import ObjectStore, ObjectMeta, Path
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

store.put(Path("data"), b"some data")
store.put("data", b"some data")

data = store.get("data")
assert data == b"some data"
Expand All @@ -67,13 +67,13 @@ assert copied == data
#### Async api

```py
from object_store import ObjectStore, ObjectMeta, Path
from object_store import ObjectStore, ObjectMeta

# we use an in-memory store for demonstration purposes.
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

path = Path("data")
path = "data"
await store.put_async(path, b"some data")

data = await store.get_async(path)
Expand All @@ -86,8 +86,8 @@ meta = await store.head_async(path)
range = await store.get_range_async(path, start=0, length=4)
assert range == b"some"

await store.copy_async(Path("data"), Path("copied"))
copied = await store.get_async(Path("copied"))
await store.copy_async("data", "copied")
copied = await store.get_async("copied")
assert copied == data
```

Expand Down
155 changes: 154 additions & 1 deletion object-store/python/object_store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from io import BytesIO
from typing import List, Optional, Union
from typing import List, Optional, Union, override

# NOTE aliasing the imports with 'as' makes them public in the eyes
# of static code checkers. Thus we avoid listing them with __all__ = ...
Expand Down Expand Up @@ -45,6 +45,7 @@ class ObjectStore(_ObjectStore):

backed by the Rust object_store crate."""

@override
def head(self, location: PathLike) -> ObjectMeta:
"""Return the metadata for the specified location.

Expand All @@ -56,6 +57,19 @@ def head(self, location: PathLike) -> ObjectMeta:
"""
return super().head(_as_path(location))

@override
async def head_async(self, location: PathLike) -> ObjectMeta:
"""Return the metadata for the specified location.

Args:
location (PathLike): path / key to storage location

Returns:
ObjectMeta: metadata for object at location
"""
return await super().head_async(_as_path(location))

@override
def get(self, location: PathLike) -> bytes:
"""Return the bytes that are stored at the specified location.

Expand All @@ -67,6 +81,19 @@ def get(self, location: PathLike) -> bytes:
"""
return super().get(_as_path(location))

@override
async def get_async(self, location: PathLike) -> bytes:
"""Return the bytes that are stored at the specified location.

Args:
location (PathLike): path / key to storage location

Returns:
bytes: raw data stored in location
"""
return super().get(_as_path(location))

@override
def get_range(self, location: PathLike, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range.

Expand All @@ -80,6 +107,21 @@ def get_range(self, location: PathLike, start: int, length: int) -> bytes:
"""
return super().get_range(_as_path(location), start, length)

@override
async def get_range_async(self, location: PathLike, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range.

Args:
location (PathLike): path / key to storage location
start (int): zero-based start index
length (int): length of the byte range

Returns:
bytes: raw data range stored in location
"""
return await super().get_range_async(_as_path(location), start, length)

@override
def put(self, location: PathLike, bytes: BytesLike) -> None:
"""Save the provided bytes to the specified location.

Expand All @@ -89,6 +131,17 @@ def put(self, location: PathLike, bytes: BytesLike) -> None:
"""
return super().put(_as_path(location), _as_bytes(bytes))

@override
async def put_async(self, location: PathLike, bytes: BytesLike) -> None:
"""Save the provided bytes to the specified location.

Args:
location (PathLike): path / key to storage location
bytes (BytesLike): data to be written to location
"""
return await super().put_async(_as_path(location), _as_bytes(bytes))

@override
def delete(self, location: PathLike) -> None:
"""Delete the object at the specified location.

Expand All @@ -97,6 +150,16 @@ def delete(self, location: PathLike) -> None:
"""
return super().delete(_as_path(location))

@override
async def delete_async(self, location: PathLike) -> None:
"""Delete the object at the specified location.

Args:
location (PathLike): path / key to storage location
"""
return await super().delete_async(_as_path(location))

@override
def list(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
"""List all the objects with the given prefix.

Expand All @@ -112,6 +175,23 @@ def list(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
prefix_ = _as_path(prefix) if prefix else None
return super().list(prefix_)

@override
async def list_async(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
"""List all the objects with the given prefix.

Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.

Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.

Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return await super().list_async(prefix_)

@override
def list_with_delimiter(self, prefix: Optional[PathLike] = None) -> ListResult:
"""List objects with the given prefix and an implementation specific
delimiter. Returns common prefixes (directories) in addition to object
Expand All @@ -129,6 +209,25 @@ def list_with_delimiter(self, prefix: Optional[PathLike] = None) -> ListResult:
prefix_ = _as_path(prefix) if prefix else None
return super().list_with_delimiter(prefix_)

@override
async def list_with_delimiter_async(self, prefix: Optional[PathLike] = None) -> ListResult:
"""List objects with the given prefix and an implementation specific
delimiter. Returns common prefixes (directories) in addition to object
metadata.

Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.

Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.

Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return await super().list_with_delimiter_async(prefix_)

@override
def copy(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another in the same object store.

Expand All @@ -140,6 +239,19 @@ def copy(self, src: PathLike, dst: PathLike) -> None:
"""
return super().copy(_as_path(src), _as_path(dst))

@override
async def copy_async(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another in the same object store.

If there exists an object at the destination, it will be overwritten.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().copy_async(_as_path(src), _as_path(dst))

@override
def copy_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another, only if destination is empty.

Expand All @@ -151,6 +263,19 @@ def copy_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""
return super().copy_if_not_exists(_as_path(src), _as_path(dst))

@override
async def copy_if_not_exists_async(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another, only if destination is empty.

Will return an error if the destination already has an object.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().copy_if_not_exists_async(_as_path(src), _as_path(dst))

@override
def rename(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

Expand All @@ -165,6 +290,22 @@ def rename(self, src: PathLike, dst: PathLike) -> None:
"""
return super().rename(_as_path(src), _as_path(dst))

@override
async def rename_async(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

By default, this is implemented as a copy and then delete source. It may not
check when deleting source that it was the same object that was originally copied.

If there exists an object at the destination, it will be overwritten.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().rename_async(_as_path(src), _as_path(dst))

@override
def rename_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

Expand All @@ -175,3 +316,15 @@ def rename_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
dst (PathLike): destination path
"""
return super().rename_if_not_exists(_as_path(src), _as_path(dst))

@override
async def rename_if_not_exists_async(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

Will return an error if the destination already has an object.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().rename_if_not_exists_async(_as_path(src), _as_path(dst))
95 changes: 92 additions & 3 deletions object-store/tests/test_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_put_get_delete_list(object_store: tuple[ObjectStore, Path]):
assert range_result == expected_data[3:7]

with pytest.raises(Exception): # noqa: B017
store.get_range(location, 200, 100)
_ = store.get_range(location, 200, 100)

head = store.head(location)
assert head.location == location
Expand All @@ -67,10 +67,71 @@ def test_put_get_delete_list(object_store: tuple[ObjectStore, Path]):
assert len(files) == 0

with pytest.raises(FileNotFoundError):
store.get(location)
_ = store.get(location)

with pytest.raises(FileNotFoundError):
store.head(location)
_ = store.head(location)


@pytest.mark.asyncio
async def test_put_get_delete_list_async(object_store: tuple[ObjectStore, Path]):
store, _ = object_store

files = await store.list_async()
assert len(files) == 0

expected_data = b"arbitrary data"
location = ObjectStorePath("test_dir/test_file.json")
store.put("test_dir/test_file.json", expected_data)

files = await store.list_async()
assert len(files) == 1
assert files[0].location == location

files = await store.list_async(ObjectStorePath("/"))
assert len(files) == 1
assert files[0].location == location

result = await store.list_with_delimiter_async()
assert len(result.objects) == 0
assert len(result.common_prefixes) == 1
assert result.common_prefixes[0] == ObjectStorePath("test_dir")

result = await store.list_with_delimiter_async(ObjectStorePath("/"))
assert len(result.objects) == 0
assert len(result.common_prefixes) == 1
assert result.common_prefixes[0] == ObjectStorePath("test_dir")

files = await store.list_async(ObjectStorePath("test_dir"))
assert len(files) == 1
assert files[0].location == location

files = await store.list_async(ObjectStorePath("something"))
assert len(files) == 0

data = await store.get_async(location)
assert data == expected_data

range_result = await store.get_range_async(location, 3, 4)
assert range_result == expected_data[3:7]

with pytest.raises(Exception): # noqa: B017
_ = await store.get_range_async(location, 200, 100)

head = await store.head_async(location)
assert head.location == location
assert head.size == len(expected_data)

await store.delete_async(location)

files = await store.list_async()
assert len(files) == 0

with pytest.raises(FileNotFoundError):
_ = await store.get_async(location)

with pytest.raises(FileNotFoundError):
_ = await store.head_async(location)


def test_rename_and_copy(object_store: tuple[ObjectStore, Path]):
Expand Down Expand Up @@ -98,3 +159,31 @@ def test_rename_and_copy(object_store: tuple[ObjectStore, Path]):
store.get(path1)

store.delete(path2)


@pytest.mark.asyncio
async def test_rename_and_copy_async(object_store: tuple[ObjectStore, Path]):
store, _ = object_store

path1 = ObjectStorePath("test1")
path2 = ObjectStorePath("test2")
contents1 = b"cats"
contents2 = b"dogs"

# copy() make both objects identical
await store.put_async(path1, contents1)
await store.put_async(path2, contents2)
await store.copy_async(path1, path2)
new_contents = await store.get_async(path2)
assert new_contents == contents1

# rename() copies contents and deletes original
await store.put_async(path1, contents1)
await store.put_async(path2, contents2)
await store.rename_async(path1, path2)
new_contents = await store.get_async(path2)
assert new_contents == contents1
with pytest.raises(FileNotFoundError):
_ = await store.get_async(path1)

await store.delete_async(path2)
Loading