Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] support df.to_parquet and df.read_parquet() #165

Merged
merged 24 commits into from
Mar 3, 2025

Conversation

machichima
Copy link
Contributor

@machichima machichima commented Jan 27, 2025

Add write() function for BufferedFileSimple used whan calling fsspec.open().

def _open(self, path, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
return BufferedFileSimple(self, path, mode, **kwargs)
class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
super().__init__(fs, path, mode, **kwargs)

Related to issue #164

@machichima machichima changed the title [WIP] fsspec write method for open() [WIP] support df.to_parquet and df.read_parquet() Jan 30, 2025
@machichima
Copy link
Contributor Author

I found that there's also some bug in checking if parquet file exists in info() so I rename the title

@machichima
Copy link
Contributor Author

Hi @kylebarron ,

I am wondering about the test here. Originally, fs.info("dir") for directory will raise file not found error, which cause error in using df.to_parquet(). After fixing it, the line fs.cat("dir", recursive=True) will raise FileNotFoundError for "dir" as fs.info("dir") has no error so "dir" will be processed.

def test_multi_file_ops(fs):
data = {"dir/test1": b"test data1", "dir/test2": b"test data2"}
fs.pipe(data)
out = fs.cat(list(data))
assert out == data
out = fs.cat("dir", recursive=True)
assert out == data
fs.cp("dir", "dir2", recursive=True)
out = fs.find("", detail=False)
assert out == ["afile", "dir/test1", "dir/test2", "dir2/test1", "dir2/test2"]
fs.rm(["dir", "dir2"], recursive=True)
out = fs.find("", detail=False)
assert out == ["afile"]

Should I try to make its output as {"dir/test1": b"test data1", "dir/test2": b"test data2"} here? Which requires to override _cat() in fsspec as follow

    async def _cat(
        self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
    ):
        paths = await self._expand_path(path, recursive=recursive)
        coros = [self._cat_file(path, **kwargs) for path in paths if not self._isdir(path)]   # ignore dir for cat_file
        batch_size = batch_size or self.batch_size

Refer to fsspec, it simply gives FileNotFoundError when doing so. Maybe we can just remove this line or make it assert if FileNotFound raise?

@kylebarron
Copy link
Member

@martindurant wrote that test and is obviously more familiar with fsspec than I am... @martindurant do you have any suggestions here?

@martindurant
Copy link
Contributor

Keep pinging me until I have a chance to look at this :)

Copy link
Contributor

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some comments on the code as it stands.

However, the outstanding issues is: how to construct these instances via fsspec.open(). It would mean

  • registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).
  • writing a _get_kwargs_from_urls to create the right obstore instance for the given path(s), including the bucket. This would also be a way to stash the value of the bucket, for later asserting the paths are right.

The alternative way, annoying for the user, would be to explicitly pass a premade instance with filesystem= (sometimes fs=) to the given loading function.

"version": head["version"],
}
except FileNotFoundError:
# try ls, refer to the info implementation in fsspec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this PR need the extra code? Are you trying open() with globs? I don't know the details on head_async, whether it might already achieve this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the code that stores the parquet as: file.csv/00000, file.csv/00001, ...etc, when reading the file.csv/ from s3, the info() will give FileNotFoundError. As I known, s3's folder is not an object but a prefix, which cause this error from happending. So I add the code if getting FileNotFoundError in head_async to solve it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, the same old "is it a folder" problem - I am well familiar with this.

@kylebarron
Copy link
Member

I made some comments on the code as it stands.

Thank you!

  • registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).

I'm in favor of this approach. I definitely wouldn't do it explicitly on import, but I'd propose we have obstore.fsspec.register() which would register these protocols with fsspec's registry.

@machichima
Copy link
Contributor Author

machichima commented Feb 6, 2025

I made some comments on the code as it stands.

However, the outstanding issues is: how to construct these instances via fsspec.open(). It would mean

  • registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).
  • writing a _get_kwargs_from_urls to create the right obstore instance for the given path(s), including the bucket. This would also be a way to stash the value of the bucket, for later asserting the paths are right.

The alternative way, annoying for the user, would be to explicitly pass a premade instance with filesystem= (sometimes fs=) to the given loading function.

Hi @martindurant ,

I've opened a new draft PR for this to ensure consistency in how instances are constructed across methods. My goal is to align the usage with fsspec.

With this PR, obstore can be registered as an fsspec storage backend using:

fsspec.register_implementation("s3", S3FsspecStore)

The bucket is extracted from the file path and used as a cache key when creating obstore objects. Here's an example usage that I would like to achieve:

fsspec.register_implementation("s3", S3FsspecStore)
fs: AsyncFsspecStore = fsspec.filesystem(
    "s3",
    config={
        "endpoint": "http://localhost:30002",
        "access_key_id": "minio",
        "secret_access_key": "miniostorage",
        "virtual_hosted_style_request": True,  # path contain bucket name
    },
    client_options={"timeout": "99999s", "allow_http": "true"},
    retry_config={
        "max_retries": 2,
        "backoff": {
            "base": 2,
            "init_backoff": timedelta(seconds=2),
            "max_backoff": timedelta(seconds=16),
        },
        "retry_timeout": timedelta(minutes=3),
    },
)

fs.cat_file("my-s3-bucket/test.txt")

Does this align with your expectations? Please let me know if you have any suggestions!
Thanks!

@machichima
Copy link
Contributor Author

I'm in favor of this approach. I definitely wouldn't do it explicitly on import, but I'd propose we have obstore.fsspec.register() which would register these protocols with fsspec's registry.

Hi @kylebarron ,

I think we can directly use fsspec.register() for this? Which can be used as: fsspec.register_implementation("s3", AsyncFsspecStore). Or do you mean that we can do something like: obstore.fsspec.register("s3") so that we do not need to create more classes inherit from AsyncFsspecStore?

@kylebarron
Copy link
Member

Or do you mean that we can do something like: obstore.fsspec.register("s3") so that we do not need to create more classes inherit from AsyncFsspecStore?

I like this because it means that our fsspec subclasses could potentially stay private. So in theory the only API exported from obstore.fsspec would be register(). In practice, that might not be enough for all fsspec use cases.

But overall I think having obstore.fsspec.register, even if that function is a one-liner that wraps fsspec.register, is useful for simplicity.

@martindurant
Copy link
Contributor

I think having obstore.fsspec.register, even if that function is a one-liner that wraps fsspec.register

Exactly what I was thinking - the user can call register themselves as int he example above, but it would be useful to provide a utility function that knows what to register, so the user only needs to call one thing once.

@machichima
Copy link
Contributor Author

I will continue on this once this PR is merge, so that we can use the new way to construct the obstore insntance in open() too

@machichima machichima changed the title [WIP] support df.to_parquet and df.read_parquet() [FEAT] support df.to_parquet and df.read_parquet() Mar 2, 2025
@machichima machichima requested a review from kylebarron March 2, 2025 12:42
@@ -542,5 +737,6 @@ def _register(protocol: str, *, asynchronous: bool) -> None:
"asynchronous": asynchronous,
}, # Assign protocol dynamically
),
clobber=False,
# Override any existing implementations of the same protocol
clobber=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, I changed this, because I think we want to override any of the default fsspec protocols

@kylebarron
Copy link
Member

I pushed a couple updates here:

  • Always use the underlying Rust BufferedFile and BufferedWriter where possible, and use the provided fsspec methods as little as possible.
    • Override the provided read, tell, seek, readline, readlines, flush, write methods
    • Use the underlying rust reader for readline and readlines
  • Added typing overloads for the BufferedFile __init__ for either readers/writers.
  • Minimized what we set on self. We don't need to set fs, store, or path on self.

@kylebarron kylebarron enabled auto-merge (squash) March 3, 2025 18:46
@kylebarron kylebarron merged commit 31ed2e4 into developmentseed:main Mar 3, 2025
4 checks passed
@kylebarron
Copy link
Member

Thanks for starting this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants