Skip to content

Commit

Permalink
feat: basic fsspec writing (#1016)
Browse files Browse the repository at this point in the history
* use paramiko instead of sshfs

* use specified port

* test default handler behaviour

* default to fsspec instead of error if scheme not found

* attempt to close socket

* fix ci

* Revert "fix ci"

This reverts commit e56e337.

* broader exception

* also handle socket exception

* get user robust

* enable github test with skip if api limit is hit (so we sometimes test it, but it never fails due to api limits). TODO: update exception class

* add memory filesystem test

* add zip and tar tests

* fix memory test

* zip/tar tests

* add test for writing with fsspec without any integration (file-like object)

* update docstrings, add some type hints

* add helper method to produce sink from a path or file-like object

* add fsspec writing test for integration

* helper function to check if file-like object

* return file-like object

* explicit argument

* rename test

* create empty file

* rename test

* pass storage options to fsspec

* test ssh and memory writing

* split fsspec tests into reading and writing

* loosen the check for file-like object

* add test for file update

* file update test

* properly truncate the file

* refactor how sink is created (no changes)

* annotation to avoid warning

* close file if sink initialization fails

* sink will handle fsspec in a similar way to local path for open and close

* rename to reading

* check if test works

* missing import

* remove parent dirs

* fix zip tar tests

* skip github if api limits hit

* attempt to fix windows paths

* use more complex uri with object in zip test

* debug

* paths

* add new test case to object url split

* add new failing test case: TODO make it work

* revert debug changes

* working in new test case

* modified where file is truncated

* revert file-like check

* revert is-file-like check

* unified rb+ mode (fsspec uses "rb+" not "r+b" it can be confusing sometimes)

* add http write test (not implemented error check)

* skip test for debugging

* understand test failure

* use r+b instead of rb+

* use new sink

* writing memory test

* correctly truncate file with fsspec

* use fsspec to get parent dir

* aiohttp import skip

* isstr

* use ports instead of string

* improve path:obj split to handle chained protocols (protocol1::protocol2://)

* working url chain to some extent

* attempt to fix ci

* cleanup

* attempt to fix ci
  • Loading branch information
lobis authored Nov 15, 2023
1 parent 113c58c commit 28b0f7b
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 74 deletions.
42 changes: 27 additions & 15 deletions src/uproot/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,23 +286,26 @@ def regularize_path(path):
pass

_uri_scheme = re.compile("^(" + "|".join([re.escape(x) for x in _schemes]) + ")://")
_uri_scheme_chain = re.compile(
"^(" + "|".join([re.escape(x) for x in _schemes]) + ")::"
)


def file_object_path_split(path: str) -> tuple[str, str | None]:
def file_object_path_split(urlpath: str) -> tuple[str, str | None]:
"""
Split a path with a colon into a file path and an object-in-file path.
Args:
path: The path to split. Example: ``"https://localhost:8888/file.root:tree"``
urlpath: The path to split. Example: ``"https://localhost:8888/file.root:tree"``
Returns:
A tuple of the file path and the object-in-file path. If there is no
object-in-file path, the second element is ``None``.
Example: ``("https://localhost:8888/file.root", "tree")``
"""

path: str = regularize_path(path)
path = path.strip()
urlpath: str = regularize_path(urlpath).strip()
path = urlpath

def _split_path(path: str) -> list[str]:
parts = path.split(":")
Expand All @@ -313,16 +316,22 @@ def _split_path(path: str) -> list[str]:
return parts

if "://" not in path:
# assume it's a local file path
parts = _split_path(path)
elif _uri_scheme.match(path):
path = "file://" + path

# replace the match of _uri_scheme_chain with "" until there is no match
while _uri_scheme_chain.match(path):
path = _uri_scheme_chain.sub("", path)

if _uri_scheme.match(path):
# if not a local path, attempt to match a URI scheme
parsed_url = urlparse(path)
parsed_url_path = parsed_url.path
if path.startswith("file://"):
parsed_url_path = path[7:]
else:
parsed_url_path = urlparse(path).path

if parsed_url_path.startswith("//"):
# This can be a leftover from url chaining in fsspec
# TODO: replace this with str.removeprefix once Python 3.8 is dropped
parsed_url_path = parsed_url_path[2:]

parts = _split_path(parsed_url_path)
else:
# invalid scheme
Expand All @@ -336,12 +345,15 @@ def _split_path(path: str) -> list[str]:
elif len(parts) == 2:
obj = parts[1]
# remove the object from the path (including the colon)
path = path[: -len(obj) - 1]
obj = obj.strip()
urlpath = urlpath[: -len(obj) - 1]
# clean badly placed slashes
obj = obj.strip().lstrip("/")
while "//" in obj:
obj = obj.replace("//", "/")
else:
raise ValueError(f"could not split object from path {path}")

return path, obj
return urlpath, obj


def file_path_to_source_class(file_path, options):
Expand Down Expand Up @@ -412,7 +424,7 @@ def file_path_to_source_class(file_path, options):
windows_absolute_path = file_path

parsed_url = urlparse(file_path)
if parsed_url.scheme.upper() == "FILE":
if parsed_url.scheme.lower() == "file":
parsed_url_path = unquote(parsed_url.path)
else:
parsed_url_path = parsed_url.path
Expand Down
36 changes: 26 additions & 10 deletions src/uproot/sink/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
(although files are flushed after every object-write).
"""

from __future__ import annotations

import numbers
import os
Expand All @@ -28,7 +29,7 @@ class FileSink:
"""

@classmethod
def from_object(cls, obj):
def from_object(cls, obj) -> FileSink:
"""
Args:
obj (file-like object): An object with ``read``, ``write``, ``seek``,
Expand Down Expand Up @@ -59,23 +60,38 @@ def from_object(cls, obj):
)
return self

def __init__(self, file_path):
@classmethod
def from_fsspec(cls, open_file) -> FileSink:
import fsspec

if not isinstance(open_file, fsspec.core.OpenFile):
raise TypeError("""argument should be of type fsspec.core.OpenFile""")
self = cls(None)
self._fsspec_open_file = open_file
return self

def __init__(self, file_path: str | None):
self._file_path = file_path
self._file = None
self._fsspec_open_file = None

@property
def file_path(self):
def file_path(self) -> str | None:
"""
A path to the file, which is None if constructed with a file-like object.
"""
return self._file_path

def _ensure(self):
if self._file is None:
if self._file_path is None:
raise TypeError("FileSink created from an object cannot be reopened")
if self._file:
return

if self._fsspec_open_file:
self._file = self._fsspec_open_file.open()
else:
self._file = open(self._file_path, "r+b")
self._file.seek(0)

self._file.seek(0)

def __getstate__(self):
state = dict(self.__dict__)
Expand All @@ -101,7 +117,7 @@ def flush(self):
return self._file.flush()

@property
def closed(self):
def closed(self) -> bool:
"""
True if the file is closed; False otherwise.
"""
Expand All @@ -124,7 +140,7 @@ def __exit__(self, exception_type, exception_value, traceback):
self.close()

@property
def in_path(self):
def in_path(self) -> str:
if self._file_path is None:
return ""
else:
Expand Down Expand Up @@ -157,7 +173,7 @@ def set_file_length(self, length):
if missing > 0:
self._file.write(b"\x00" * missing)

def read(self, location, num_bytes, insist=True):
def read(self, location, num_bytes, insist=True) -> bytes:
"""
Args:
location (int): Position in the file to read.
Expand Down
5 changes: 3 additions & 2 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ def __init__(self, file_path: str, **options):
exclude_keys = set(default_options.keys())
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}

protocol = fsspec.core.split_protocol(file_path)[0]
self._async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
self._executor = FSSpecLoopExecutor()

self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **storage_options)

# What should we do when there is a chain of filesystems?
self._async_impl = self._fs.async_impl

self._file = self._fs.open(self._file_path)
self._fh = None
self._num_requests = 0
Expand Down
Loading

0 comments on commit 28b0f7b

Please sign in to comment.