-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] support df.to_parquet and df.read_parquet() #165
[FEAT] support df.to_parquet and df.read_parquet() #165
Conversation
212fe05
to
75d3734
Compare
I found that there's also some bug in checking if parquet file exists in info() so I rename the title |
Hi @kylebarron , I am wondering about the test here. Originally, Lines 47 to 59 in 428a66d
Should I try to make its output as async def _cat(
self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
):
paths = await self._expand_path(path, recursive=recursive)
coros = [self._cat_file(path, **kwargs) for path in paths if not self._isdir(path)] # ignore dir for cat_file
batch_size = batch_size or self.batch_size Refer to fsspec, it simply gives |
@martindurant wrote that test and is obviously more familiar with fsspec than I am... @martindurant do you have any suggestions here? |
Keep pinging me until I have a chance to look at this :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some comments on the code as it stands.
However, the outstanding issues is: how to construct these instances via fsspec.open(). It would mean
- registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).
- writing a _get_kwargs_from_urls to create the right obstore instance for the given path(s), including the bucket. This would also be a way to stash the value of the bucket, for later asserting the paths are right.
The alternative way, annoying for the user, would be to explicitly pass a premade instance with filesystem= (sometimes fs=) to the given loading function.
obstore/python/obstore/fsspec.py
Outdated
"version": head["version"], | ||
} | ||
except FileNotFoundError: | ||
# try ls, refer to the info implementation in fsspec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this PR need the extra code? Are you trying open() with globs? I don't know the details on head_async, whether it might already achieve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the code that stores the parquet as: file.csv/00000
, file.csv/00001
, ...etc, when reading the file.csv/
from s3, the info()
will give FileNotFoundError
. As I known, s3's folder is not an object but a prefix, which cause this error from happending. So I add the code if getting FileNotFoundError
in head_async
to solve it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, the same old "is it a folder" problem - I am well familiar with this.
Co-authored-by: Martin Durant <[email protected]>
Thank you!
I'm in favor of this approach. I definitely wouldn't do it explicitly on import, but I'd propose we have |
Hi @martindurant , I've opened a new draft PR for this to ensure consistency in how instances are constructed across methods. My goal is to align the usage with fsspec. With this PR, obstore can be registered as an fsspec storage backend using: fsspec.register_implementation("s3", S3FsspecStore) The bucket is extracted from the file path and used as a cache key when creating obstore objects. Here's an example usage that I would like to achieve: fsspec.register_implementation("s3", S3FsspecStore)
fs: AsyncFsspecStore = fsspec.filesystem(
"s3",
config={
"endpoint": "http://localhost:30002",
"access_key_id": "minio",
"secret_access_key": "miniostorage",
"virtual_hosted_style_request": True, # path contain bucket name
},
client_options={"timeout": "99999s", "allow_http": "true"},
retry_config={
"max_retries": 2,
"backoff": {
"base": 2,
"init_backoff": timedelta(seconds=2),
"max_backoff": timedelta(seconds=16),
},
"retry_timeout": timedelta(minutes=3),
},
)
fs.cat_file("my-s3-bucket/test.txt") Does this align with your expectations? Please let me know if you have any suggestions! |
Hi @kylebarron , I think we can directly use |
I like this because it means that our fsspec subclasses could potentially stay private. So in theory the only API exported from But overall I think having |
Exactly what I was thinking - the user can call register themselves as int he example above, but it would be useful to provide a utility function that knows what to register, so the user only needs to call one thing once. |
I will continue on this once this PR is merge, so that we can use the new way to construct the obstore insntance in |
@@ -542,5 +737,6 @@ def _register(protocol: str, *, asynchronous: bool) -> None: | |||
"asynchronous": asynchronous, | |||
}, # Assign protocol dynamically | |||
), | |||
clobber=False, | |||
# Override any existing implementations of the same protocol | |||
clobber=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, I changed this, because I think we want to override any of the default fsspec protocols
I pushed a couple updates here:
|
Thanks for starting this! |
Add write() function for
BufferedFileSimple
used whan calling fsspec.open().obstore/obstore/python/obstore/fsspec.py
Lines 177 to 186 in b40d59b
Related to issue #164