Skip to content

Commit

Permalink
FIX: Always use buffered producers to allow multiple concurrent flow …
Browse files Browse the repository at this point in the history
…paths to use the same products (#29)

* API: make properties source and processor public

* API: rename attribute transformer_group to transformer

* CODE: mark true start of private definitions section

* REFACTOR: make SerialConduit.chained_conduits abstract

* REFACTOR: don't use iter_concurrent_conduits in is_valid_source()

* REFACTOR: don't use iter_concurrent_conduits in product_type

* REFACTOR: don't use iter_concurrent_conduits in is_chained

* REFACTOR: don't use iter_concurrent_conduits in FlowGraph

* API: replace (a)iter_concurrent_conduits() to …_concurrent_producers()

* TEST: add unit test ensuring conduits shared between paths run only once

* REFACTOR: move buffered producer classes to _simple.py

* DOC: remove obsolete @inheritdoc decorators

* BUILD: upgrade mypy to 1.11

* REFACTOR: eliminate aiter_concurrent_producers()

* DOC: remove an obsolete inline comment

* API: remove SerialTransformer.[a]iter()

* REFACTOR: remove obsolete [a]iter_concurrent_conduits()
  • Loading branch information
j-ittner authored Jul 24, 2024
1 parent 2c1a641 commit 6815c08
Show file tree
Hide file tree
Showing 15 changed files with 575 additions and 660 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ repos:
exclude: condabuild/meta.yaml

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.10.0
rev: v1.11.0
hooks:
- id: mypy
entry: mypy src/ test/
Expand Down
17 changes: 16 additions & 1 deletion src/fluxus/_passthrough.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def final_conduit(self) -> Never:
"""[see superclass]"""
raise NotImplementedError("Final conduit is not defined for passthroughs")

@property
def chained_conduits(self) -> Never:
"""[see superclass]"""
raise NotImplementedError("Chained conduits are not defined for passthroughs")

def get_final_conduits(self) -> Iterator[Never]:
"""
Returns an empty iterator since passthroughs do not define a final conduit.
Expand All @@ -91,6 +96,16 @@ def get_connections(self, *, ingoing: Collection[SerialConduit[Any]]) -> Never:
:param ingoing: the ingoing conduits (ignored)
:return: nothing; passthroughs do not define connections
:raises NotImplementedError: passthroughs do not define connections
:raises NotImplementedError: connections are not defined for passthroughs
"""
raise NotImplementedError("Connections are not defined for passthroughs")

def get_isolated_conduits(self) -> Never:
"""
Fails with a :class:`NotImplementedError` since passthroughs are transparent in
flows and therefore isolated conduits are not defined.
:return: nothing; passthroughs do not define isolated conduits
:raises NotImplementedError: isolated conduits are not defined for passthroughs
"""
raise NotImplementedError("Isolated conduits are not defined for passthroughs")
31 changes: 1 addition & 30 deletions src/fluxus/_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncIterator, Iterator
from typing import Any, Generic, TypeVar, final
from typing import Generic, TypeVar, final

from pytools.asyncio import arun, iter_async_to_sync
from pytools.typing import issubclass_generic

from ._passthrough import Passthrough
from .core import AtomicConduit
from .core.transformer import SerialTransformer

Expand Down Expand Up @@ -111,30 +109,3 @@ def atransform(
:param source_product: the existing product to use as input
:return: the new product
"""


#
# Auxiliary functions
#


def _validate_concurrent_passthrough(
conduit: SerialTransformer[Any, Any] | Passthrough
) -> None:
"""
Validate that the given conduit is valid as a concurrent conduit with a passthrough.
To be valid, its input type must be a subtype of its product type.
:param conduit: the conduit to validate
"""

if not (
isinstance(conduit, Passthrough)
or issubclass_generic(conduit.input_type, conduit.product_type)
):
raise TypeError(
"Conduit is not a valid concurrent conduit with a passthrough because its "
f"input type {conduit.input_type} is not a subtype of its product type "
f"{conduit.product_type}:\n{conduit}"
)
55 changes: 26 additions & 29 deletions src/fluxus/core/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@

import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncIterable, Collection, Iterable, Iterator
from typing import Any, Generic, TypeVar, cast

from typing_extensions import Self
from collections.abc import (
AsyncIterable,
AsyncIterator,
Awaitable,
Collection,
Iterable,
Iterator,
)
from typing import Any, Generic, TypeVar, cast, final

from pytools.api import inheritdoc
from pytools.typing import get_common_generic_base, issubclass_generic
from pytools.typing import issubclass_generic

from ._conduit import Conduit, SerialConduit

Expand Down Expand Up @@ -65,17 +70,11 @@ class Source(Conduit[T_Product_ret], Generic[T_Product_ret], metaclass=ABCMeta):
"""

@property
@abstractmethod
def product_type(self) -> type[T_Product_ret]:
"""
The type of the products produced by this conduit.
"""
from .. import Passthrough

return get_common_generic_base(
cast(SerialSource[T_Product_ret], source).product_type
for source in self.iter_concurrent_conduits()
if not isinstance(source, Passthrough)
)


class Processor(
Expand All @@ -100,7 +99,7 @@ def input_type(self) -> type[T_SourceProduct_arg]:
@abstractmethod
def process(
self, input: Iterable[T_SourceProduct_arg]
) -> list[T_Output_ret] | T_Output_ret:
) -> Iterator[T_Output_ret] | T_Output_ret:
"""
Generate new products from the given input.
Expand All @@ -109,16 +108,17 @@ def process(
"""

@abstractmethod
async def aprocess(
def aprocess(
self, input: AsyncIterable[T_SourceProduct_arg]
) -> list[T_Output_ret] | T_Output_ret:
) -> AsyncIterator[T_Output_ret] | Awaitable[T_Output_ret]:
"""
Generate new products asynchronously from the given input.
:param input: the input products
:return: the generated output or outputs
"""

@abstractmethod
def is_valid_source(
self,
source: SerialConduit[T_SourceProduct_arg],
Expand All @@ -133,20 +133,6 @@ def is_valid_source(
:return: ``True`` if the given conduit is valid source for this conduit,
``False`` otherwise
"""
from .. import Passthrough

if not isinstance(source, SerialSource):
return False

ingoing_product_type = source.product_type
return all(
issubclass_generic(
ingoing_product_type,
cast(Self, processor).input_type,
)
for processor in self.iter_concurrent_conduits()
if not isinstance(processor, Passthrough)
)


class SerialSource(
Expand Down Expand Up @@ -178,6 +164,17 @@ class SerialProcessor(
A processor that processes products sequentially.
"""

@final
def is_valid_source(
self,
source: SerialConduit[T_SourceProduct_arg],
) -> bool:
"""[see superclass]"""
if not isinstance(source, SerialSource):
return False

return issubclass_generic(source.product_type, self.input_type)

def get_connections(
self, *, ingoing: Collection[SerialConduit[Any]]
) -> Iterator[tuple[SerialConduit[Any], SerialConduit[Any]]]:
Expand Down
29 changes: 17 additions & 12 deletions src/fluxus/core/_chained_base_.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,26 @@ def is_chained(self) -> bool:

@property
@abstractmethod
def _source(self) -> Source[T_SourceProduct_ret]:
def source(self) -> Source[T_SourceProduct_ret]:
"""
The source producer of this conduit.
"""

@property
@abstractmethod
def _processor(self) -> Processor[T_SourceProduct_ret, T_Output_ret]:
def processor(self) -> Processor[T_SourceProduct_ret, T_Output_ret]:
"""
The second conduit in this chained conduit, processing the output of the
:attr:`._source` conduit.
:attr:`.source` conduit.
"""

def get_final_conduits(self) -> Iterator[SerialConduit[T_Output_ret]]:
"""[see superclass]"""
if self._processor._has_passthrough:
if self.processor._has_passthrough:
yield from cast(
Iterator[SerialConduit[T_Output_ret]], self._source.get_final_conduits()
Iterator[SerialConduit[T_Output_ret]], self.source.get_final_conduits()
)
yield from self._processor.get_final_conduits()
yield from self.processor.get_final_conduits()

def get_connections(
self, *, ingoing: Collection[SerialConduit[Any]]
Expand All @@ -97,8 +97,8 @@ def get_connections(
:return: an iterable of connections
"""
source = self._source
processor = self._processor
source = self.source
processor = self.processor

# We first yield all connections from within the source
yield from source.get_connections(ingoing=ingoing)
Expand All @@ -113,11 +113,16 @@ def get_connections(
# Then we get all connections of the processor, including ingoing connections
yield from processor.get_connections(ingoing=processor_ingoing)

def get_isolated_conduits(self) -> Iterator[SerialConduit[T_Output_ret]]:
"""[see superclass]"""
# Chained conduits are never isolated
yield from ()

def to_expression(self, *, compact: bool = False) -> Expression:
"""[see superclass]"""
return self._source.to_expression(
return self.source.to_expression(
compact=compact
) >> self._processor.to_expression(compact=compact)
) >> self.processor.to_expression(compact=compact)


class _SerialChainedConduit(
Expand All @@ -132,13 +137,13 @@ class _SerialChainedConduit(

@property
@abstractmethod
def _source(self) -> SerialSource[T_SourceProduct_ret]:
def source(self) -> SerialSource[T_SourceProduct_ret]:
"""[see superclass]"""

@property
def chained_conduits(self) -> Iterator[SerialConduit[Any]]:
"""
The chained conduits in the flow leading up to this conduit.
"""
yield from self._source.chained_conduits
yield from self.source.chained_conduits
yield self.final_conduit
8 changes: 0 additions & 8 deletions src/fluxus/core/_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

from typing_extensions import Self

from pytools.api import inheritdoc

from ._conduit import Conduit

log = logging.getLogger(__name__)
Expand All @@ -52,7 +50,6 @@
#


@inheritdoc(match="[see superclass]")
class ConcurrentConduit(
Conduit[T_Product_ret], Generic[T_Product_ret], metaclass=ABCMeta
):
Expand All @@ -71,11 +68,6 @@ def is_concurrent(self) -> bool:
"""
return True

@property
def is_chained(self) -> bool:
"""[see superclass]"""
return any(conduit.is_chained for conduit in self.iter_concurrent_conduits())

@property
def final_conduit(self) -> Self:
"""
Expand Down
Loading

0 comments on commit 6815c08

Please sign in to comment.