Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream RO-Crate Zip #212

Merged
merged 25 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
721e43c
add abstract method for streaming data entities
dnlbauer Jan 22, 2025
939abf4
implement streaming for file
dnlbauer Jan 22, 2025
a23da9c
implement streaming for metadata
dnlbauer Jan 22, 2025
f34dad8
implement streaming for preview
dnlbauer Jan 22, 2025
f909fbb
implement streaming for dataset
dnlbauer Jan 22, 2025
883c439
fix: dataset should not stream if root entity
dnlbauer Jan 22, 2025
77eee32
feat: add method to stream zip
dnlbauer Jan 22, 2025
08eeb02
chore: remove memory buffer class in favor of BytesIO
dnlbauer Jan 22, 2025
c034c66
fix: files from datasets should also be streamed in chunks
dnlbauer Jan 24, 2025
27457d8
fix: zip stream repeats initial bytes
dnlbauer Jan 24, 2025
0a1c4a1
fix: make sure zip stream buffer is always properly closed
dnlbauer Jan 24, 2025
e229ccd
feat: zip stream now yields predictable chunk sizes
dnlbauer Jan 24, 2025
37f2430
chore: remove type hints for consistency
dnlbauer Jan 24, 2025
bf1b990
feat: include unlisted files in zip stream
dnlbauer Jan 26, 2025
3634e4b
feat: write_zip now uses zip streaming
dnlbauer Jan 24, 2025
1a4253f
feat: add streaming example
dnlbauer Jan 27, 2025
2fc8fc8
fix: flake8
dnlbauer Jan 27, 2025
781052f
fix: NPE when no out_path is given for straming
dnlbauer Jan 27, 2025
0d81d70
feat: hide out_path parameter of streaming api in an internal wrapper
dnlbauer Jan 27, 2025
c636220
feat: add test for streaming without write_zip
dnlbauer Jan 27, 2025
6b33a90
test for unlisted file presence when writing zip
simleo Jan 29, 2025
5d49dab
fix+refactor: streaming should not ignore unlisted files
dnlbauer Jan 29, 2025
2178b42
adjustments for plain DataEntity and large files
simleo Feb 3, 2025
09bff64
add author Daniel Bauer
dnlbauer Feb 3, 2025
04128fa
add copyright for Senckenberg, SGN
dnlbauer Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions rocrate/memory_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from io import RawIOBase


class MemoryBuffer(RawIOBase):
"""
A buffer class that supports reading and writing binary data.
The buffer automatically resets upon reading to make sure all data is read only once.
"""

def __init__(self):
self._buffer = b''

def write(self, data):
if self.closed:
raise ValueError('write to closed file')
self._buffer += data
return len(data)

def read(self, size=-1):
if self.closed:
raise ValueError('read from closed file')
if size < 0:
data = self._buffer
self._buffer = b''
else:
data = self._buffer[:size]
self._buffer = self._buffer[size:]
return data
12 changes: 12 additions & 0 deletions rocrate/model/data_entity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python
from typing import Generator

# Copyright 2019-2024 The University of Manchester, UK
# Copyright 2020-2024 Vlaams Instituut voor Biotechnologie (VIB), BE
Expand Down Expand Up @@ -28,3 +29,14 @@ class DataEntity(Entity):

def write(self, base_path):
pass

def stream(self) -> Generator[tuple[str, bytes], None, None]:
""" Stream the data from the source. Each chunk of the content is yielded as a tuple
containing the name of the destination file relative to the crate and the chunk of data.
The destination file name is required because a DataEntity can be a file or a
collection of files (Dataset) and the caller need to know to which file a chunk belongs.
For collection of files, the caller can assume that files are streamed one after another,
meaning once the destination name changes, a file can be closed and the next one can be
openend.
"""
raise NotImplementedError
107 changes: 78 additions & 29 deletions rocrate/model/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

import errno
import os
import shutil
import warnings
from pathlib import Path
from typing import Generator
from urllib.request import urlopen

from .file_or_dir import FileOrDir
Expand All @@ -43,37 +44,85 @@ def _empty(self):
def format_id(self, identifier):
return identifier.rstrip("/") + "/"

def _write_from_url(self, base_path):
if self.validate_url and not self.fetch_remote:
with urlopen(self.source) as _:
self._jsonld['sdDatePublished'] = iso_now()
if self.fetch_remote:
out_file_path, out_file = None, None
for rel_path, chunk in self._stream_folder_from_url():
path = base_path / rel_path
if path != out_file_path:
if out_file:
out_file.close()
out_file_path = Path(path)
out_file_path.parent.mkdir(parents=True, exist_ok=True)
out_file = open(out_file_path, 'wb')
out_file.write(chunk)
if out_file:
out_file.close()

def _copy_folder(self, base_path):
abs_out_path = base_path / self.id
if self.source is None:
abs_out_path.mkdir(parents=True, exist_ok=True)
else:
if not Path(self.source).exists():
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
)
abs_out_path.mkdir(parents=True, exist_ok=True)
if not self.crate.source:
self.crate._copy_unlisted(self.source, abs_out_path)

def write(self, base_path):
out_path = Path(base_path) / self.id
base_path = Path(base_path)
if is_url(str(self.source)):
if self.validate_url and not self.fetch_remote:
self._write_from_url(base_path)
else:
self._copy_folder(base_path)

def stream(self) -> Generator[tuple[str, bytes], None, None]:
if self.source is None:
return
elif is_url(str(self.source)):
yield from self._stream_folder_from_url()
else:
yield from self._stream_folder_from_path()

def _stream_folder_from_path(self) -> Generator[tuple[str, bytes], None, None]:
if not Path(str(self.source)).exists():
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
)
if not self.crate.source:
for root, _, files in os.walk(self.source):
root = Path(root)
for name in files:
source = root / name
dest = source.relative_to(Path(self.source).parent)
with open(source, 'rb') as f:
for chunk in f:
yield str(dest), chunk

def _stream_folder_from_url(self) -> Generator[tuple[str, bytes], None, None]:
if not self.fetch_remote:
if self.validate_url:
with urlopen(self.source) as _:
self._jsonld['sdDatePublished'] = iso_now()
if self.fetch_remote:
self.__get_parts(out_path)
else:
if self.source is None:
out_path.mkdir(parents=True, exist_ok=True)
else:
if not Path(self.source).exists():
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
)
out_path.mkdir(parents=True, exist_ok=True)
if not self.crate.source:
self.crate._copy_unlisted(self.source, out_path)
base = self.source.rstrip("/")
for entry in self._jsonld.get("hasPart", []):
try:
part = entry["@id"]
if is_url(part) or part.startswith("/"):
raise RuntimeError(f"'{self.source}': part '{part}' is not a relative path")
part_uri = f"{base}/{part}"
rel_out_path = Path(self.id) / part

def __get_parts(self, out_path):
out_path.mkdir(parents=True, exist_ok=True)
base = self.source.rstrip("/")
for entry in self._jsonld.get("hasPart", []):
try:
part = entry["@id"]
except KeyError:
continue
if is_url(part) or part.startswith("/"):
raise RuntimeError(f"'{self.source}': part '{part}' is not a relative path")
part_uri = f"{base}/{part}"
part_out_path = out_path / part
with urlopen(part_uri) as r, open(part_out_path, 'wb') as f:
shutil.copyfileobj(r, f)
with urlopen(part_uri) as response:
chunk_size = 8192
while chunk := response.read(chunk_size):
yield str(rel_out_path), chunk
except KeyError:
warnings.warn(f"'hasPart' entry in {self.id} is missing '@id'. Skipping.")
120 changes: 89 additions & 31 deletions rocrate/model/file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python

# Copyright 2019-2024 The University of Manchester, UK
# Copyright 2020-2024 Vlaams Instituut voor Biotechnologie (VIB), BE
# Copyright 2020-2024 Barcelona Supercomputing Center (BSC), ES
Expand All @@ -22,6 +21,8 @@

from pathlib import Path
import requests
from typing import Generator

import shutil
import urllib.request
import warnings
Expand All @@ -40,41 +41,98 @@ def _empty(self):
}
return val

def _has_writeable_stream(self):
if isinstance(self.source, (BytesIO, StringIO)):
return True
elif is_url(str(self.source)):
return self.fetch_remote
else:
return self.source is not None

def _write_from_stream(self, out_file_path):
if not self._has_writeable_stream():
# is this does not correspond to a writeable stream (i.e. it is a url but fetch_remote is False),
# we still want to consume the stream to consume file headers, run the size calculation, etc.
all(self.stream())
return

out_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_file_path, 'wb') as out_file:
for _, chunk in self.stream():
out_file.write(chunk)

def _copy_file(self, path, out_file_path):
out_file_path.parent.mkdir(parents=True, exist_ok=True)
if not out_file_path.exists() or not out_file_path.samefile(path):
shutil.copy(path, out_file_path)
if self.record_size:
self._jsonld['contentSize'] = str(out_file_path.stat().st_size)

def write(self, base_path):
out_file_path = Path(base_path) / self.id
if isinstance(self.source, (BytesIO, StringIO)):
out_file_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'w' + ('b' if isinstance(self.source, BytesIO) else 't')
kw = {} if isinstance(self.source, BytesIO) else {'encoding': 'utf-8'}
with open(out_file_path, mode, **kw) as out_file:
content = self.source.getvalue()
out_file.write(content)
if isinstance(self.source, (BytesIO, StringIO)) or is_url(str(self.source)):
self._write_from_stream(out_file_path)
elif self.source is None:
# Allows to record a File entity whose @id does not exist, see #73
warnings.warn(f"No source for {self.id}")
else:
self._copy_file(self.source, out_file_path)

def _stream_from_stream(self, stream):
size = 0
read = stream.read()
if isinstance(self.source, StringIO):
read = read.encode('utf-8')
while len(read) > 0:
yield self.id, read
size += len(read)
read = stream.read()
if isinstance(self.source, StringIO):
read = read.encode('utf-8')

if self.record_size:
self._jsonld['contentSize'] = str(size)

def _stream_from_url(self, url) -> Generator[tuple[str, bytes], None, None]:
if self.fetch_remote or self.validate_url:
if self.validate_url:
if url.startswith("http"):
with requests.head(url) as response:
self._jsonld.update({
'contentSize': response.headers.get('Content-Length'),
'encodingFormat': response.headers.get('Content-Type')
})
if not self.fetch_remote:
date_published = response.headers.get("Last-Modified", iso_now())
self._jsonld['sdDatePublished'] = date_published
if self.fetch_remote:
size = 0
self._jsonld['contentUrl'] = str(url)
with urllib.request.urlopen(url) as response:
chunk_size = 8192
while chunk := response.read(chunk_size):
yield self.id, chunk
size += len(chunk)

if self.record_size:
self._jsonld['contentSize'] = str(len(content))
self._jsonld['contentSize'] = str(size)

def _stream_from_file(self, path):
size = 0
with open(path, 'rb') as f:
for chunk in f:
yield self.id, chunk
size += len(chunk)
if self.record_size:
self._jsonld['contentSize'] = str(size)

def stream(self) -> Generator[tuple[str, bytes], None, None]:
if isinstance(self.source, (BytesIO, StringIO)):
yield from self._stream_from_stream(self.source)
elif is_url(str(self.source)):
if self.fetch_remote or self.validate_url:
if self.validate_url:
if self.source.startswith("http"):
with requests.head(self.source) as response:
self._jsonld.update({
'contentSize': response.headers.get('Content-Length'),
'encodingFormat': response.headers.get('Content-Type')
})
if not self.fetch_remote:
date_published = response.headers.get("Last-Modified", iso_now())
self._jsonld['sdDatePublished'] = date_published
if self.fetch_remote:
out_file_path.parent.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(self.source, out_file_path)
self._jsonld['contentUrl'] = str(self.source)
if self.record_size:
self._jsonld['contentSize'] = str(out_file_path.stat().st_size)
yield from self._stream_from_url(self.source)
elif self.source is None:
# Allows to record a File entity whose @id does not exist, see #73
warnings.warn(f"No source for {self.id}")
else:
out_file_path.parent.mkdir(parents=True, exist_ok=True)
if not out_file_path.exists() or not out_file_path.samefile(self.source):
shutil.copy(self.source, out_file_path)
if self.record_size:
self._jsonld['contentSize'] = str(out_file_path.stat().st_size)
yield from self._stream_from_file(self.source)
16 changes: 11 additions & 5 deletions rocrate/model/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import json
from pathlib import Path
from typing import Generator

from .file import File
from .dataset import Dataset
Expand Down Expand Up @@ -74,11 +75,16 @@ def generate(self):
context = context[0]
return {'@context': context, '@graph': graph}

def write(self, base_path):
write_path = Path(base_path) / self.id
as_jsonld = self.generate()
with open(write_path, 'w', encoding='utf-8') as outfile:
json.dump(as_jsonld, outfile, indent=4, sort_keys=True)
def stream(self) -> Generator[tuple[str, bytes], None, None]:
content = self.generate()
yield self.id, str.encode(json.dumps(content, indent=4, sort_keys=True), encoding='utf-8')

def _has_writeable_stream(self):
return True

def write(self, dest_base):
write_path = Path(dest_base) / self.id
super()._write_from_stream(write_path)

@property
def root(self) -> Dataset:
Expand Down
17 changes: 11 additions & 6 deletions rocrate/model/preview.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import os
from pathlib import Path
from typing import Generator

from jinja2 import Template
from .file import File
Expand Down Expand Up @@ -90,11 +91,15 @@ def is_object_list(a):
out_html = src.render(crate=self.crate, context=context_entities, data=data_entities)
return out_html

def write(self, dest_base):
def stream(self) -> Generator[tuple[str, bytes], None, None]:
if self.source:
super().write(dest_base)
yield from super().stream()
else:
write_path = Path(dest_base) / self.id
out_html = self.generate_html()
with open(write_path, 'w', encoding='utf-8') as outfile:
outfile.write(out_html)
yield self.id, str.encode(self.generate_html(), encoding='utf-8')

def _has_writeable_stream(self):
return True

def write(self, dest_base):
write_path = Path(dest_base) / self.id
super()._write_from_stream(write_path)
Loading
Loading