Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 protocol #895

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions strax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .storage.file_rechunker import *
from .storage.mongo import *
from .storage.zipfiles import *
from .storage.s3 import *

from .config import *
from .plugins import *
Expand Down
52 changes: 47 additions & 5 deletions strax/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,42 @@ def _load_file(f, compressor, dtype):


@export
def save_file(f, data, compressor="zstd"):
def save_file(f, data, compressor="zstd", is_s3_path=False):
"""Save data to file and return number of bytes written.

:param f: file name or handle to save to
:param data: data (numpy array) to save
:param compressor: compressor to use

"""

if isinstance(f, str):
final_fn = f
temp_fn = f + "_temp"
with open(temp_fn, mode="wb") as write_file:
result = _save_file(write_file, data, compressor)
os.rename(temp_fn, final_fn)
return result
if is_s3_path is False:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if is_s3_path is False: -> if not is_s3_path:

with open(temp_fn, mode="wb") as write_file:
result = _save_file(write_file, data, compressor)
os.rename(temp_fn, final_fn)
return result
else:
s3_interface = strax.S3Frontend(
s3_access_key_id=None,
s3_secret_access_key=None,
path="",
deep_scan=False,
)
# Copy temp file to final file
result = _save_file_to_s3(s3_interface, temp_fn, data, compressor)
s3_interface.s3.copy_object(
Bucket=s3_interface.BUCKET,
Key=final_fn,
CopySource={"Bucket": s3_interface.BUCKET, "Key": temp_fn},
)

# Delete the temporary file
s3_interface.s3.delete_object(Bucket=s3_interface.BUCKET, Key=temp_fn)

return result
else:
return _save_file(f, data, compressor)

Expand All @@ -99,6 +120,27 @@ def _save_file(f, data, compressor="zstd"):
return len(d_comp)


def _save_file_to_s3(s3_client, key, data, compressor=None):
# Use this method to save file directly to S3
# If compression is needed, handle it here
# Use `BytesIO` to handle binary data in-memory
assert isinstance(data, np.ndarray), "Please pass a numpy array"

# Create a binary buffer to simulate writing to a file
buffer = BytesIO()

# Simulate saving file content (you can compress or directly write data here)
if compressor:
data = COMPRESSORS[compressor]["compress"](data)
buffer.write(data)
buffer.seek(0) # Reset the buffer to the beginning

# Upload buffer to S3 under the specified key
s3_client.s3.put_object(Bucket=s3_client.BUCKET, Key=key, Body=buffer.getvalue())

return len(data)


def _compress_blosc(data):
if data.nbytes >= blosc.MAX_BUFFERSIZE:
raise ValueError("Blosc's input buffer cannot exceed ~2 GB")
Expand Down
Loading
Loading