Skip to content

Commit

Permalink
Allow passing a custom queue to the session subscribe
Browse files Browse the repository at this point in the history
This commit adds the option to add a custom queue class to the
subscribe command on the session. This allows a more flexible
usage of a subscription.

As part of this change a new circular data queue is added which
allows to have a a sort of ring buffer as a queue.
  • Loading branch information
tobiasah committed Nov 21, 2023
1 parent a5b4de8 commit 0dc8243
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 13 deletions.
31 changes: 29 additions & 2 deletions src/labone/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
request_field_type_description,
)
from labone.core.result import unwrap
from labone.core.subscription import DataQueue, streaming_handle_factory
from labone.core.subscription import DataQueue, QueueProtocol, streaming_handle_factory
from labone.core.value import AnnotatedValue

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -553,11 +553,33 @@ async def get_with_expression(
for raw_result in response.result
]

@t.overload
async def subscribe(
self,
path: LabOneNodePath,
*,
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
queue_type: None,
) -> DataQueue:
...

@t.overload
async def subscribe(
self,
path: LabOneNodePath,
*,
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
queue_type: type[QueueProtocol],
) -> QueueProtocol:
...

async def subscribe(
self,
path: LabOneNodePath,
*,
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
queue_type: type[QueueProtocol] | None = None,
) -> QueueProtocol | DataQueue:
"""Register a new subscription to a node.
Registers a new subscription to a node on the kernel/server. All
Expand All @@ -581,6 +603,10 @@ async def subscribe(
parser_callback: Function to bring values obtained from
data-queue into desired format. This may involve parsing
them or putting them into an enum.
queue_type: The type of the queue to be returned. This can be
any class matching the DataQueue interface. Only needed if the
default DataQueue class is not sufficient. If None is passed
the default DataQueue class is used. (default=None)
Returns:
An instance of the DataQueue class. This async queue will receive
Expand Down Expand Up @@ -611,7 +637,8 @@ async def subscribe(
request.subscription = subscription
response = await _send_and_wait_request(request)
unwrap(response.result) # Result(Void, Error)
return DataQueue(
new_queue_type = queue_type or DataQueue
return new_queue_type(
path=path,
register_function=streaming_handle.register_data_queue,
)
119 changes: 113 additions & 6 deletions src/labone/core/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,21 @@ def __repr__(self) -> str:
f"connected={self.connected})",
)

def fork(self) -> DataQueue:
@t.overload
def fork(self, queue_type: None) -> DataQueue:
...

@t.overload
def fork(
self,
queue_type: type[QueueProtocol],
) -> QueueProtocol:
...

def fork(
self,
queue_type: type[QueueProtocol] | None = None,
) -> DataQueue | QueueProtocol:
"""Create a fork of the subscription.
The forked subscription will receive all updates that the original
Expand All @@ -114,6 +128,12 @@ def fork(self) -> DataQueue:
Warning:
The forked subscription will not contain any values before the fork.
Args:
queue_type: The type of the queue to be returned. This can be
any class matching the DataQueue interface. Only needed if the
default DataQueue class is not sufficient. If None is passed
the default DataQueue class is used. (default=None)
Returns:
A new data queue to the same underlying subscription.
"""
Expand All @@ -123,7 +143,8 @@ def fork(self) -> DataQueue:
"sense as it would never receive data.",
)
raise errors.StreamingError(msg)
return DataQueue(
new_queue_type = queue_type or DataQueue
return new_queue_type(
path=self._path,
register_function=self._register_function,
)
Expand Down Expand Up @@ -209,6 +230,89 @@ def maxsize(self, maxsize: int) -> None:
self._maxsize = maxsize


QueueProtocol = t.TypeVar("QueueProtocol", bound=DataQueue)


class CircularDataQueue(DataQueue):
"""Circular data queue.
This data queue is identical to the DataQueue, with the exception that it
will remove the oldest item from the queue if the queue is full and a new
item is added.
"""

async def put(self, item: AnnotatedValue) -> None:
"""Put an item into the queue.
If the queue is full the oldest item will be removed and the new item
will be added to the end of the queue.
Args:
item: The item to the put in the queue.
Raises:
StreamingError: If the data queue has been disconnected.
"""
if self.full():
self.get_nowait()
await super().put(item)

def put_nowait(self, item: AnnotatedValue) -> None:
"""Put an item into the queue without blocking.
If the queue is full the oldest item will be removed and the new item
will be added to the end of the queue.
Args:
item: The item to the put in the queue.
Raises:
StreamingError: If the data queue has been disconnected.
"""
if self.full():
self.get_nowait()
super().put_nowait(item)

@t.overload
def fork(self, queue_type: None) -> CircularDataQueue:
... # pragma: no cover

@t.overload
def fork(
self,
queue_type: type[QueueProtocol],
) -> QueueProtocol:
... # pragma: no cover

def fork(
self,
queue_type: type[QueueProtocol] | None = None,
) -> CircularDataQueue | QueueProtocol:
"""Create a fork of the subscription.
The forked subscription will receive all updates that the original
subscription receives. Its connection state is independent of the original
subscription, meaning even if the original subscription is disconnected,
the forked subscription will still receive updates.
Warning:
The forked subscription will not contain any values before the fork.
Args:
queue_type: The type of the queue to be returned. This can be
any class matching the DataQueue interface. Only needed if the
default DataQueue class is not sufficient. If None is passed
the default DataQueue class is used. (default=None)
Returns:
A new data queue to the same underlying subscription.
"""
return DataQueue.fork(
self,
queue_type=queue_type if queue_type is not None else CircularDataQueue,
)


class StreamingHandle(ABC):
"""Streaming Handle server.
Expand Down Expand Up @@ -238,7 +342,10 @@ def __init__(
...

@abstractmethod
def register_data_queue(self, data_queue: weakref.ReferenceType[DataQueue]) -> None:
def register_data_queue(
self,
data_queue: weakref.ReferenceType[QueueProtocol],
) -> None:
"""Register a new data queue.
Args:
Expand Down Expand Up @@ -303,7 +410,7 @@ def __init__(
*,
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
) -> None:
self._data_queues: list[weakref.ReferenceType[DataQueue]] = []
self._data_queues = [] # type: ignore[var-annotated]

if parser_callback is None:

Expand All @@ -314,7 +421,7 @@ def parser_callback(x: AnnotatedValue) -> AnnotatedValue:

def register_data_queue(
self,
data_queue: weakref.ReferenceType[DataQueue],
data_queue: weakref.ReferenceType[QueueProtocol],
) -> None:
"""Register a new data queue.
Expand All @@ -326,7 +433,7 @@ def register_data_queue(

def _add_to_data_queue(
self,
data_queue: DataQueue | None,
data_queue: QueueProtocol | None,
value: AnnotatedValue,
) -> bool:
"""Add a value to the data queue.
Expand Down
5 changes: 3 additions & 2 deletions src/labone/nodetree/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing_extensions import TypeAlias

from labone.core.session import NodeInfo
from labone.core.subscription import DataQueue
from labone.core.subscription import QueueProtocol


NormalizedPathSegment: TypeAlias = str
Expand Down Expand Up @@ -111,7 +111,8 @@ async def subscribe(
path: LabOneNodePath,
*,
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
) -> DataQueue:
queue_type: type[QueueProtocol],
) -> QueueProtocol:
"""Register a new subscription to a node."""
...

Expand Down
62 changes: 59 additions & 3 deletions src/labone/nodetree/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from deprecation import deprecated

from labone.core.subscription import DataQueue
from labone.core.value import AnnotatedValue, Value
from labone.nodetree.errors import (
LabOneInappropriateNodeTypeError,
Expand All @@ -40,7 +41,7 @@
from labone.core.helper import LabOneNodePath
from labone.core.session import NodeInfo as NodeInfoType
from labone.core.session import NodeType
from labone.core.subscription import DataQueue
from labone.core.subscription import QueueProtocol
from labone.nodetree.enum import NodeEnum

T = t.TypeVar("T")
Expand Down Expand Up @@ -1088,10 +1089,24 @@ async def wait_for_state_change(
any value except the passed value. (default = False)
Useful when waiting for value to change from existing one.
"""
... # pragma: no cover
...

@abstractmethod
@t.overload
async def subscribe(self) -> DataQueue:
...

@t.overload
async def subscribe(
self,
queue_type: type[QueueProtocol],
) -> QueueProtocol:
...

@abstractmethod
async def subscribe(
self,
queue_type: type[QueueProtocol] | None = None,
) -> QueueProtocol | DataQueue:
"""Subscribe to a node.
Subscribing to a node will cause the server to send updates that happen
Expand All @@ -1115,6 +1130,12 @@ async def subscribe(self) -> DataQueue:
happen when the queue is garbage collected or when the queue is
closed manually.
Args:
queue_type: The type of the queue to be returned. This can be
any class matching the DataQueue interface. Only needed if the
default DataQueue class is not sufficient. If None is passed
the default DataQueue class is used. (default=None)
Returns:
A DataQueue, which can be used to receive any changes to the node in a
flexible manner.
Expand Down Expand Up @@ -1186,7 +1207,21 @@ def try_generate_subnode(
)
raise LabOneInvalidPathError(msg)

@t.overload
async def subscribe(self) -> DataQueue:
...

@t.overload
async def subscribe(
self,
queue_type: type[QueueProtocol],
) -> QueueProtocol:
...

async def subscribe(
self,
queue_type: type[QueueProtocol] | None = None,
) -> QueueProtocol | DataQueue:
"""Subscribe to a node.
Subscribing to a node will cause the server to send updates that happen
Expand All @@ -1207,13 +1242,20 @@ async def subscribe(self) -> DataQueue:
happen when the queue is garbage collected or when the queue is
closed manually.
Args:
queue_type: The type of the queue to be returned. This can be
any class matching the DataQueue interface. Only needed if the
default DataQueue class is not sufficient. If None is passed
the default DataQueue class is used. (default=None)
Returns:
A DataQueue, which can be used to receive any changes to the node in a
flexible manner.
"""
return await self._tree_manager.session.subscribe(
self.path,
parser_callback=self._tree_manager.parser,
queue_type=queue_type or DataQueue,
)

async def wait_for_state_change(
Expand Down Expand Up @@ -1322,7 +1364,21 @@ def _package_get_response(
"""
...

@t.overload
async def subscribe(self) -> DataQueue:
...

@t.overload
async def subscribe(
self,
queue_type: type[QueueProtocol],
) -> QueueProtocol:
...

async def subscribe(
self,
queue_type: type[QueueProtocol] | None = None, # noqa: ARG002
) -> QueueProtocol | DataQueue:
"""Subscribe to a node.
Currently not supported for wildcard and partial nodes.
Expand Down
Loading

0 comments on commit 0dc8243

Please sign in to comment.