Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream RO-Crate Zip #212

Merged
merged 25 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
721e43c
add abstract method for streaming data entities
dnlbauer Jan 22, 2025
939abf4
implement streaming for file
dnlbauer Jan 22, 2025
a23da9c
implement streaming for metadata
dnlbauer Jan 22, 2025
f34dad8
implement streaming for preview
dnlbauer Jan 22, 2025
f909fbb
implement streaming for dataset
dnlbauer Jan 22, 2025
883c439
fix: dataset should not stream if root entity
dnlbauer Jan 22, 2025
77eee32
feat: add method to stream zip
dnlbauer Jan 22, 2025
08eeb02
chore: remove memory buffer class in favor of BytesIO
dnlbauer Jan 22, 2025
c034c66
fix: files from datasets should also be streamed in chunks
dnlbauer Jan 24, 2025
27457d8
fix: zip stream repeats initial bytes
dnlbauer Jan 24, 2025
0a1c4a1
fix: make sure zip stream buffer is always properly closed
dnlbauer Jan 24, 2025
e229ccd
feat: zip stream now yields predictable chunk sizes
dnlbauer Jan 24, 2025
37f2430
chore: remove type hints for consistency
dnlbauer Jan 24, 2025
bf1b990
feat: include unlisted files in zip stream
dnlbauer Jan 26, 2025
3634e4b
feat: write_zip now uses zip streaming
dnlbauer Jan 24, 2025
1a4253f
feat: add streaming example
dnlbauer Jan 27, 2025
2fc8fc8
fix: flake8
dnlbauer Jan 27, 2025
781052f
fix: NPE when no out_path is given for straming
dnlbauer Jan 27, 2025
0d81d70
feat: hide out_path parameter of streaming api in an internal wrapper
dnlbauer Jan 27, 2025
c636220
feat: add test for streaming without write_zip
dnlbauer Jan 27, 2025
6b33a90
test for unlisted file presence when writing zip
simleo Jan 29, 2025
5d49dab
fix+refactor: streaming should not ignore unlisted files
dnlbauer Jan 29, 2025
2178b42
adjustments for plain DataEntity and large files
simleo Feb 3, 2025
09bff64
add author Daniel Bauer
dnlbauer Feb 3, 2025
04128fa
add copyright for Senckenberg, SGN
dnlbauer Feb 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/fastapi/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
Streaming RO-Crates from a web server

This example demonstrates how to create an RO-Crate on-the-fly
and stream the result to the client.
By using `stream_zip`, the RO-Crate is not written to disk and remote
data is only fetched on the fly.

To run: `fastapi dev main.py`, then visit http://localhost:8000/crate
"""

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from rocrate.rocrate import ROCrate
from io import StringIO

app = FastAPI()


@app.get("/crate")
async def get():
crate = ROCrate()

# Add a remote file
crate.add_file(
"https://raw.githubusercontent.com/ResearchObject/ro-crate-py/refs/heads/master/test/test-data/sample_file.txt",
fetch_remote=True
)

# Add a file containing a string to the crate
crate.add_file(
source=StringIO("Hello, World!"),
dest_path="test-data/hello.txt"
)

# Stream crate to client as a zip file
return StreamingResponse(
crate.stream_zip(),
media_type="application/rocrate+zip",
headers={
"Content-Disposition": "attachment; filename=crate.zip",
}
)
3 changes: 3 additions & 0 deletions examples/fastapi/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
../../
fastapi
fastapi-cli
31 changes: 31 additions & 0 deletions rocrate/memory_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from io import RawIOBase


class MemoryBuffer(RawIOBase):
"""
A buffer class that supports reading and writing binary data.
The buffer automatically resets upon reading to make sure all data is read only once.
"""

def __init__(self):
self._buffer = b''

def write(self, data):
if self.closed:
raise ValueError('write to closed file')
self._buffer += data
return len(data)

def read(self, size=-1):
if self.closed:
raise ValueError('read from closed file')
if size < 0:
data = self._buffer
self._buffer = b''
else:
data = self._buffer[:size]
self._buffer = self._buffer[size:]
return data

def __len__(self):
return len(self._buffer)
11 changes: 11 additions & 0 deletions rocrate/model/data_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,14 @@ class DataEntity(Entity):

def write(self, base_path):
pass

def stream(self, chunk_size=8192):
""" Stream the data from the source. Each chunk of the content is yielded as a tuple
containing the name of the destination file relative to the crate and the chunk of data.
The destination file name is required because a DataEntity can be a file or a
collection of files (Dataset) and the caller need to know to which file a chunk belongs.
For collection of files, the caller can assume that files are streamed one after another,
meaning once the destination name changes, a file can be closed and the next one can be
openend.
"""
raise NotImplementedError
105 changes: 76 additions & 29 deletions rocrate/model/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import errno
import os
import shutil
import warnings
from pathlib import Path
from urllib.request import urlopen

Expand All @@ -43,37 +43,84 @@ def _empty(self):
def format_id(self, identifier):
return identifier.rstrip("/") + "/"

def _write_from_url(self, base_path):
if self.validate_url and not self.fetch_remote:
with urlopen(self.source) as _:
self._jsonld['sdDatePublished'] = iso_now()
if self.fetch_remote:
out_file_path, out_file = None, None
for rel_path, chunk in self._stream_folder_from_url():
path = base_path / rel_path
if path != out_file_path:
if out_file:
out_file.close()
out_file_path = Path(path)
out_file_path.parent.mkdir(parents=True, exist_ok=True)
out_file = open(out_file_path, 'wb')
out_file.write(chunk)
if out_file:
out_file.close()

def _copy_folder(self, base_path):
abs_out_path = base_path / self.id
if self.source is None:
abs_out_path.mkdir(parents=True, exist_ok=True)
else:
if not Path(self.source).exists():
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
)
abs_out_path.mkdir(parents=True, exist_ok=True)
if not self.crate.source:
self.crate._copy_unlisted(self.source, abs_out_path)

def write(self, base_path):
out_path = Path(base_path) / self.id
base_path = Path(base_path)
if is_url(str(self.source)):
if self.validate_url and not self.fetch_remote:
self._write_from_url(base_path)
else:
self._copy_folder(base_path)

def stream(self, chunk_size=8192):
if self.source is None:
return
elif is_url(str(self.source)):
yield from self._stream_folder_from_url(chunk_size)
else:
yield from self._stream_folder_from_path(chunk_size)

def _stream_folder_from_path(self, chunk_size=8192):
if not Path(str(self.source)).exists():
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
)
if not self.crate.source:
for root, _, files in os.walk(self.source):
root = Path(root)
for name in files:
source = root / name
dest = source.relative_to(Path(self.source).parent)
with open(source, 'rb') as f:
while chunk := f.read(chunk_size):
yield str(dest), chunk

def _stream_folder_from_url(self, chunk_size=8192):
if not self.fetch_remote:
if self.validate_url:
with urlopen(self.source) as _:
self._jsonld['sdDatePublished'] = iso_now()
if self.fetch_remote:
self.__get_parts(out_path)
else:
if self.source is None:
out_path.mkdir(parents=True, exist_ok=True)
else:
if not Path(self.source).exists():
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
)
out_path.mkdir(parents=True, exist_ok=True)
if not self.crate.source:
self.crate._copy_unlisted(self.source, out_path)
base = self.source.rstrip("/")
for entry in self._jsonld.get("hasPart", []):
try:
part = entry["@id"]
if is_url(part) or part.startswith("/"):
raise RuntimeError(f"'{self.source}': part '{part}' is not a relative path")
part_uri = f"{base}/{part}"
rel_out_path = Path(self.id) / part

def __get_parts(self, out_path):
out_path.mkdir(parents=True, exist_ok=True)
base = self.source.rstrip("/")
for entry in self._jsonld.get("hasPart", []):
try:
part = entry["@id"]
except KeyError:
continue
if is_url(part) or part.startswith("/"):
raise RuntimeError(f"'{self.source}': part '{part}' is not a relative path")
part_uri = f"{base}/{part}"
part_out_path = out_path / part
with urlopen(part_uri) as r, open(part_out_path, 'wb') as f:
shutil.copyfileobj(r, f)
with urlopen(part_uri) as response:
while chunk := response.read(chunk_size):
yield str(rel_out_path), chunk
except KeyError:
warnings.warn(f"'hasPart' entry in {self.id} is missing '@id'. Skipping.")
116 changes: 86 additions & 30 deletions rocrate/model/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,41 +40,97 @@ def _empty(self):
}
return val

def _has_writeable_stream(self):
if isinstance(self.source, (BytesIO, StringIO)):
return True
elif is_url(str(self.source)):
return self.fetch_remote
else:
return self.source is not None

def _write_from_stream(self, out_file_path):
if not self._has_writeable_stream():
# is this does not correspond to a writeable stream (i.e. it is a url but fetch_remote is False),
# we still want to consume the stream to consume file headers, run the size calculation, etc.
all(self.stream())
return

out_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_file_path, 'wb') as out_file:
for _, chunk in self.stream():
out_file.write(chunk)

def _copy_file(self, path, out_file_path):
out_file_path.parent.mkdir(parents=True, exist_ok=True)
if not out_file_path.exists() or not out_file_path.samefile(path):
shutil.copy(path, out_file_path)
if self.record_size:
self._jsonld['contentSize'] = str(out_file_path.stat().st_size)

def write(self, base_path):
out_file_path = Path(base_path) / self.id
if isinstance(self.source, (BytesIO, StringIO)):
out_file_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'w' + ('b' if isinstance(self.source, BytesIO) else 't')
kw = {} if isinstance(self.source, BytesIO) else {'encoding': 'utf-8'}
with open(out_file_path, mode, **kw) as out_file:
content = self.source.getvalue()
out_file.write(content)
if isinstance(self.source, (BytesIO, StringIO)) or is_url(str(self.source)):
self._write_from_stream(out_file_path)
elif self.source is None:
# Allows to record a File entity whose @id does not exist, see #73
warnings.warn(f"No source for {self.id}")
else:
self._copy_file(self.source, out_file_path)

def _stream_from_stream(self, stream):
size = 0
read = stream.read()
if isinstance(self.source, StringIO):
read = read.encode('utf-8')
while len(read) > 0:
yield self.id, read
size += len(read)
read = stream.read()
if isinstance(self.source, StringIO):
read = read.encode('utf-8')

if self.record_size:
self._jsonld['contentSize'] = str(size)

def _stream_from_url(self, url, chunk_size=8192):
if self.fetch_remote or self.validate_url:
if self.validate_url:
if url.startswith("http"):
with requests.head(url) as response:
self._jsonld.update({
'contentSize': response.headers.get('Content-Length'),
'encodingFormat': response.headers.get('Content-Type')
})
if not self.fetch_remote:
date_published = response.headers.get("Last-Modified", iso_now())
self._jsonld['sdDatePublished'] = date_published
if self.fetch_remote:
size = 0
self._jsonld['contentUrl'] = str(url)
with urllib.request.urlopen(url) as response:
while chunk := response.read(chunk_size):
yield self.id, chunk
size += len(chunk)

if self.record_size:
self._jsonld['contentSize'] = str(len(content))
self._jsonld['contentSize'] = str(size)

def _stream_from_file(self, path, chunk_size=8192):
size = 0
with open(path, 'rb') as f:
while chunk := f.read(chunk_size):
yield self.id, chunk
size += len(chunk)
if self.record_size:
self._jsonld['contentSize'] = str(size)

def stream(self, chunk_size=8192):
if isinstance(self.source, (BytesIO, StringIO)):
yield from self._stream_from_stream(self.source)
elif is_url(str(self.source)):
if self.fetch_remote or self.validate_url:
if self.validate_url:
if self.source.startswith("http"):
with requests.head(self.source) as response:
self._jsonld.update({
'contentSize': response.headers.get('Content-Length'),
'encodingFormat': response.headers.get('Content-Type')
})
if not self.fetch_remote:
date_published = response.headers.get("Last-Modified", iso_now())
self._jsonld['sdDatePublished'] = date_published
if self.fetch_remote:
out_file_path.parent.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(self.source, out_file_path)
self._jsonld['contentUrl'] = str(self.source)
if self.record_size:
self._jsonld['contentSize'] = str(out_file_path.stat().st_size)
yield from self._stream_from_url(self.source, chunk_size)
elif self.source is None:
# Allows to record a File entity whose @id does not exist, see #73
warnings.warn(f"No source for {self.id}")
else:
out_file_path.parent.mkdir(parents=True, exist_ok=True)
if not out_file_path.exists() or not out_file_path.samefile(self.source):
shutil.copy(self.source, out_file_path)
if self.record_size:
self._jsonld['contentSize'] = str(out_file_path.stat().st_size)
yield from self._stream_from_file(self.source, chunk_size)
15 changes: 10 additions & 5 deletions rocrate/model/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,16 @@ def generate(self):
context = context[0]
return {'@context': context, '@graph': graph}

def write(self, base_path):
write_path = Path(base_path) / self.id
as_jsonld = self.generate()
with open(write_path, 'w', encoding='utf-8') as outfile:
json.dump(as_jsonld, outfile, indent=4, sort_keys=True)
def stream(self, chunk_size=8192):
content = self.generate()
yield self.id, str.encode(json.dumps(content, indent=4, sort_keys=True), encoding='utf-8')

def _has_writeable_stream(self):
return True

def write(self, dest_base):
write_path = Path(dest_base) / self.id
super()._write_from_stream(write_path)

@property
def root(self) -> Dataset:
Expand Down
16 changes: 10 additions & 6 deletions rocrate/model/preview.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,15 @@ def is_object_list(a):
out_html = src.render(crate=self.crate, context=context_entities, data=data_entities)
return out_html

def write(self, dest_base):
def stream(self, chunk_size=8192):
if self.source:
super().write(dest_base)
yield from super().stream()
else:
write_path = Path(dest_base) / self.id
out_html = self.generate_html()
with open(write_path, 'w', encoding='utf-8') as outfile:
outfile.write(out_html)
yield self.id, str.encode(self.generate_html(), encoding='utf-8')

def _has_writeable_stream(self):
return True

def write(self, dest_base):
write_path = Path(dest_base) / self.id
super()._write_from_stream(write_path)
Loading
Loading