Skip to content

Commit

Permalink
Ability to access file contents during commit operations and of uncom…
Browse files Browse the repository at this point in the history
…mitted files (#795)

* Ability to access file contents during commit operations and of uncommitted files.

* Fetch FOAF from web archive (currently unavailable).

* Automatically transfer files between wrappers.
  • Loading branch information
kysrpex authored Jul 5, 2022
1 parent 90313f0 commit c7489ea
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 9 deletions.
3 changes: 2 additions & 1 deletion simphony_osp/develop.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Developer tools."""

from simphony_osp.interfaces.interface import Interface as Wrapper
from simphony_osp.interfaces.remote.common import get_hash
from simphony_osp.ontology.operations import Operations, find_operations

__all__ = ["Operations", "Wrapper", "find_operations"]
__all__ = ["Operations", "Wrapper", "find_operations", "get_hash"]
84 changes: 81 additions & 3 deletions simphony_osp/interfaces/interface.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Universal interface for wrapper developers."""
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from base64 import b64encode
from enum import IntEnum
from itertools import chain
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import (
TYPE_CHECKING,
BinaryIO,
Expand Down Expand Up @@ -35,6 +39,8 @@
"InterfaceDriver",
]

logger = logging.getLogger(__name__)


class BufferType(IntEnum):
"""Enum representing the two possible types of triple buffers.
Expand Down Expand Up @@ -199,6 +205,16 @@ class InterfaceDriver(Store):
_ontology: Optional[Session]
"""Ontology to be used on the session's shown to the wrapper developer."""

_file_cache: Optional[TemporaryDirectory] = None
"""Holds files that are accessed while queued.
Files are queued as byte streams (which can be exhausted). Therefore,
if a user needs to access a file before it is committed, the byte stream
needs to be copied somewhere to be committed later. This also applies
when files need to be accessed during commit by a wrapper (as they are
truly sent to the wrapper after the `commit` method has finished).
"""

# RDFLib
# ↓ -- ↓

Expand Down Expand Up @@ -246,6 +262,9 @@ def open(self, configuration: str, create: bool = False) -> None:
buffer_type: Graph(SimpleMemory()) for buffer_type in BufferType
}

# Set-up file bytestream cache
self._file_cache = TemporaryDirectory()

self.interface.open(configuration, create)

# The interface can set its base graph when `open` is called. If not
Expand Down Expand Up @@ -295,6 +314,11 @@ def close(self, commit_pending_transaction: bool = False) -> None:
self.interface.updated = None
self.interface.deleted = None

# Clear bytestream cache
if self._file_cache is not None:
self._file_cache.cleanup()
self._file_cache = None

def add(self, triple: Triple, context: Graph, quoted=False) -> None:
"""Adds triples to the interface.
Expand Down Expand Up @@ -512,7 +536,7 @@ def commit(self) -> None:
)
self.interface.base.commit()

# Queue file removal for removed file objects.
# Queue file upload and removal for file objects.
for s in chain(
self._buffer_uncaught[BufferType.DELETED][
: RDF.type : simphony_namespace.File
Expand All @@ -524,9 +548,21 @@ def commit(self) -> None:
self.queue(s, None)
for URI, file in self._queue.items():
if file is None:
self.interface.delete(URI)
if hasattr(self.interface, "delete"):
self.interface.delete(URI)
else:
logging.warning(
f"Ignoring deletion of file {URI}, as the session "
f"does not support deleting files."
)
else:
self.interface.save(URI, file)
if hasattr(self.interface, "save"):
self.interface.save(URI, file)
else:
logging.warning(
f"File {URI}, will NOT be committed to the session, "
f"as it does not support the storage of new files."
)
file.close()

# Reset buffers and file queue.
Expand Down Expand Up @@ -584,8 +620,50 @@ def compute(self, *args, **kwargs):

def queue(self, key: URIRef, file: Optional[BinaryIO]) -> None:
"""Queue a file to be committed."""
if not hasattr(self.interface, "save"):
logger.warning(
"This session does not support saving new files. The "
"contents of the file will NOT be saved during the commit "
"operation."
)

# Clear cached bytestream
file_name = b64encode(bytes(key, encoding="UTF-8")).decode("UTF-8")
file_path = Path(self._file_cache.name) / file_name
if file_path.exists():
file_path.unlink()

self._queue[key] = file

def load(self, key: URIRef) -> BinaryIO:
"""Retrieve a file."""
if key in self._queue:
file_name = b64encode(bytes(key, encoding="UTF-8")).decode("UTF-8")

# Save a temporary copy of the file and put a file handle
# pointing to the copy on the queue
if not (Path(self._file_cache.name) / file_name).exists():
queued = self._queue[key]
with open(
Path(self._file_cache.name) / file_name, "wb"
) as file:
file.write(queued.read())

file = open(Path(self._file_cache.name) / file_name, "rb")
self._queue[key] = file

# Return a file handle pointing to the copy
byte_stream = open(Path(self._file_cache.name) / file_name, "rb")
elif hasattr(self.interface, "load"):
byte_stream = self.interface.load(key)
else:
raise FileNotFoundError(
"This session does not support file storage. Unable to "
"retrieve the file contents."
)

return byte_stream

def _compute_entity_modifications(
self,
) -> Tuple[Set[OntologyEntity], Set[OntologyEntity], Set[OntologyEntity]]:
Expand Down
40 changes: 35 additions & 5 deletions simphony_osp/ontology/operations/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import logging
from typing import BinaryIO

from rdflib.term import URIRef

Expand Down Expand Up @@ -43,18 +44,47 @@ def upload(self, path: str) -> None:
Queues a file to be uploaded to the server. When a commit is
performed, the data is sent.
"""
if hasattr(self._session.driver, "queue"):
if self._session.driver is not None:
file = open(path, "rb")
self._session.driver.queue(self._identifier, file)
else:
logger.warning(
"This session does not support saving new files. The "
"contents of the file will NOT be saved."
)

def download(self, path: str) -> None:
"""Download the file."""
if hasattr(self._session.driver, "interface") and hasattr(
self._session.driver.interface, "load"
):
with self._session.driver.interface.load(self._identifier) as file:
if self._session.driver is not None:
with self._session.driver.load(self._identifier) as file:
with open(path, "wb") as new_file:
data = True
while data:
data = file.read(buf_size)
new_file.write(data)
else:
raise FileNotFoundError(
"This session does not support file storage. Unable to "
"retrieve the file contents."
)

@property
def handle(self) -> BinaryIO:
"""Get a file handle to operate with."""
if self._session.driver is not None:
return self._session.driver.load(self._identifier)
else:
raise FileNotFoundError(
"This session does not support file storage. Unable to "
"retrieve the file contents."
)

def overwrite(self, contents: BinaryIO) -> None:
"""Overwrite the file contents with a byte stream."""
if self._session.driver is not None:
self._session.driver.queue(self._identifier, contents)
else:
logger.warning(
"This session does not support saving new files. The "
"contents of the file will NOT be saved."
)
25 changes: 25 additions & 0 deletions simphony_osp/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,14 @@ def add(
)
# Get the identifiers of the individuals
identifiers = list(individual.identifier for individual in individuals)
# Get a list of files within the individuals to add
files = {
individual
for individual in individuals
if set(class_.identifier for class_ in individual.superclasses)
& {simphony_namespace.File}
and individual.session is not self
}

# Paste the individuals
"""The attributes of the individuals are always kept. The
Expand All @@ -519,6 +527,18 @@ def add(
raise RuntimeError(
"Some of the added entities already exist on the session."
)
elif (
merge
and files
and any(
(identifier, None, None) in self.graph
for identifier in {x.identifier for x in files}
)
):
raise RuntimeError(
"Some of the added file entities already exist on the "
"session. File entities cannot be merged with existing ones."
)
delete = (
(individual.identifier, None, None)
for individual in individuals
Expand All @@ -537,6 +557,11 @@ def add(
for pattern in delete:
self.graph.remove(pattern)
self.graph.addN((s, p, o, self.graph) for s, p, o in add)
files = ((file.identifier, file.operations.handle) for file in files)
for identifier, contents in files:
self.from_identifier_typed(
identifier, typing=OntologyIndividual
).operations.overwrite(contents)
added_objects = list(
self.from_identifier_typed(identifier, typing=OntologyIndividual)
for identifier in identifiers
Expand Down
22 changes: 22 additions & 0 deletions tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class TestDataspaceWrapper(unittest.TestCase):
prev_default_ontology: Session

dataspace_directory: TemporaryDirectory
second_dataspace_directory: TemporaryDirectory

@classmethod
def setUpClass(cls):
Expand All @@ -182,10 +183,12 @@ def tearDownClass(cls):
def setUp(self) -> None:
"""Create a temporary directory for files."""
self.dataspace_directory = TemporaryDirectory()
self.second_dataspace_directory = TemporaryDirectory()

def tearDown(self) -> None:
"""Clean the temporary directory for files."""
self.dataspace_directory.cleanup()
self.second_dataspace_directory.cleanup()

def test_wrapper_city(self) -> None:
"""Test adding some entities from the city ontology."""
Expand Down Expand Up @@ -268,6 +271,8 @@ def test_files(self):

with NamedTemporaryFile("w", suffix=".txt") as os_file:
os_file.write("text")
os_file.flush()
os.fsync(os_file)

# Test creating file object and filling it with a file.
with Dataspace(self.dataspace_directory.name, True) as wrapper:
Expand All @@ -294,6 +299,7 @@ def test_files(self):
shallow=False,
)
)
self.assertEqual(b"text", file.operations.handle.read())

# Test recovering the previous file and downloading it.
with Dataspace(self.dataspace_directory.name, False) as wrapper:
Expand All @@ -304,6 +310,22 @@ def test_files(self):
file.operations.download(destination)
self.assertTrue(destination.is_file())

# Test copying the file among data spaces.
with Dataspace(
self.second_dataspace_directory.name, True
) as wrapper_2:
wrapper_2.add(file)
wrapper_2.commit()

with Dataspace(
self.second_dataspace_directory.name, False
) as wrapper_2:
file_2 = wrapper_2.from_identifier(file_identifier)
contents_1 = file.operations.handle.read()
contents_2 = file_2.operations.handle.read()
self.assertEqual(contents_1, contents_2)
self.assertEqual(b"text", contents_1)

# Test deleting the file.
with Dataspace(self.dataspace_directory.name, False) as wrapper:
file = wrapper.from_identifier(file_identifier)
Expand Down

0 comments on commit c7489ea

Please sign in to comment.