Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUILD: Release fluxus 1.0.3 #30

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/fluxus-release-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ jobs:
if: (startsWith(github.head_ref, 'dev/') && startsWith(github.base_ref, 'release/')) || startsWith(github.ref, 'refs/heads/release/')
needs:
- check_release
- veracode_check
- unit_tests
- conda_tox_matrix
steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ repos:
exclude: condabuild/meta.yaml

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.10.0
rev: v1.11.0
hooks:
- id: mypy
entry: mypy src/ test/
Expand Down
15 changes: 15 additions & 0 deletions RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ Release Notes
*fluxus* 1.0
------------

*fluxus* 1.0.3
~~~~~~~~~~~~~~

- API: Rename methods ``iter_concurrent_conduits`` to ``iter_concurrent_producers``,
and ``aiter_concurrent_conduits`` to ``aiter_concurrent_producers``
- API: Return iterators not lists from :meth:`.SerialTransformer.process` and
:meth:`.SerialTransformer.aprocess` for greater flexibility in processing the results
- API: Removed functions ``iter()`` and ``aiter()`` from class
:class:`.SerialTransformer`, to further streamline the API and given they can be
easily replaced by repeated calls to :meth:`.SerialTransformer.process` and
:meth:`.SerialTransformer.aprocess`
- FIX: Updated logic for iterating over concurrent producers and transformers to ensure
shared conduits never run more than once


*fluxus* 1.0.2
~~~~~~~~~~~~~~

Expand Down
17 changes: 16 additions & 1 deletion src/fluxus/_passthrough.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def final_conduit(self) -> Never:
"""[see superclass]"""
raise NotImplementedError("Final conduit is not defined for passthroughs")

@property
def chained_conduits(self) -> Never:
"""[see superclass]"""
raise NotImplementedError("Chained conduits are not defined for passthroughs")

def get_final_conduits(self) -> Iterator[Never]:
"""
Returns an empty iterator since passthroughs do not define a final conduit.
Expand All @@ -91,6 +96,16 @@ def get_connections(self, *, ingoing: Collection[SerialConduit[Any]]) -> Never:

:param ingoing: the ingoing conduits (ignored)
:return: nothing; passthroughs do not define connections
:raises NotImplementedError: passthroughs do not define connections
:raises NotImplementedError: connections are not defined for passthroughs
"""
raise NotImplementedError("Connections are not defined for passthroughs")

def get_isolated_conduits(self) -> Never:
"""
Fails with a :class:`NotImplementedError` since passthroughs are transparent in
flows and therefore isolated conduits are not defined.

:return: nothing; passthroughs do not define isolated conduits
:raises NotImplementedError: isolated conduits are not defined for passthroughs
"""
raise NotImplementedError("Isolated conduits are not defined for passthroughs")
31 changes: 1 addition & 30 deletions src/fluxus/_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncIterator, Iterator
from typing import Any, Generic, TypeVar, final
from typing import Generic, TypeVar, final

from pytools.asyncio import arun, iter_async_to_sync
from pytools.typing import issubclass_generic

from ._passthrough import Passthrough
from .core import AtomicConduit
from .core.transformer import SerialTransformer

Expand Down Expand Up @@ -111,30 +109,3 @@ def atransform(
:param source_product: the existing product to use as input
:return: the new product
"""


#
# Auxiliary functions
#


def _validate_concurrent_passthrough(
conduit: SerialTransformer[Any, Any] | Passthrough
) -> None:
"""
Validate that the given conduit is valid as a concurrent conduit with a passthrough.

To be valid, its input type must be a subtype of its product type.

:param conduit: the conduit to validate
"""

if not (
isinstance(conduit, Passthrough)
or issubclass_generic(conduit.input_type, conduit.product_type)
):
raise TypeError(
"Conduit is not a valid concurrent conduit with a passthrough because its "
f"input type {conduit.input_type} is not a subtype of its product type "
f"{conduit.product_type}:\n{conduit}"
)
55 changes: 26 additions & 29 deletions src/fluxus/core/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@

import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncIterable, Collection, Iterable, Iterator
from typing import Any, Generic, TypeVar, cast

from typing_extensions import Self
from collections.abc import (
AsyncIterable,
AsyncIterator,
Awaitable,
Collection,
Iterable,
Iterator,
)
from typing import Any, Generic, TypeVar, cast, final

from pytools.api import inheritdoc
from pytools.typing import get_common_generic_base, issubclass_generic
from pytools.typing import issubclass_generic

from ._conduit import Conduit, SerialConduit

Expand Down Expand Up @@ -65,17 +70,11 @@ class Source(Conduit[T_Product_ret], Generic[T_Product_ret], metaclass=ABCMeta):
"""

@property
@abstractmethod
def product_type(self) -> type[T_Product_ret]:
"""
The type of the products produced by this conduit.
"""
from .. import Passthrough

return get_common_generic_base(
cast(SerialSource[T_Product_ret], source).product_type
for source in self.iter_concurrent_conduits()
if not isinstance(source, Passthrough)
)


class Processor(
Expand All @@ -100,7 +99,7 @@ def input_type(self) -> type[T_SourceProduct_arg]:
@abstractmethod
def process(
self, input: Iterable[T_SourceProduct_arg]
) -> list[T_Output_ret] | T_Output_ret:
) -> Iterator[T_Output_ret] | T_Output_ret:
"""
Generate new products from the given input.

Expand All @@ -109,16 +108,17 @@ def process(
"""

@abstractmethod
async def aprocess(
def aprocess(
self, input: AsyncIterable[T_SourceProduct_arg]
) -> list[T_Output_ret] | T_Output_ret:
) -> AsyncIterator[T_Output_ret] | Awaitable[T_Output_ret]:
"""
Generate new products asynchronously from the given input.

:param input: the input products
:return: the generated output or outputs
"""

@abstractmethod
def is_valid_source(
self,
source: SerialConduit[T_SourceProduct_arg],
Expand All @@ -133,20 +133,6 @@ def is_valid_source(
:return: ``True`` if the given conduit is valid source for this conduit,
``False`` otherwise
"""
from .. import Passthrough

if not isinstance(source, SerialSource):
return False

ingoing_product_type = source.product_type
return all(
issubclass_generic(
ingoing_product_type,
cast(Self, processor).input_type,
)
for processor in self.iter_concurrent_conduits()
if not isinstance(processor, Passthrough)
)


class SerialSource(
Expand Down Expand Up @@ -178,6 +164,17 @@ class SerialProcessor(
A processor that processes products sequentially.
"""

@final
def is_valid_source(
self,
source: SerialConduit[T_SourceProduct_arg],
) -> bool:
"""[see superclass]"""
if not isinstance(source, SerialSource):
return False

return issubclass_generic(source.product_type, self.input_type)

def get_connections(
self, *, ingoing: Collection[SerialConduit[Any]]
) -> Iterator[tuple[SerialConduit[Any], SerialConduit[Any]]]:
Expand Down
29 changes: 17 additions & 12 deletions src/fluxus/core/_chained_base_.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,26 @@ def is_chained(self) -> bool:

@property
@abstractmethod
def _source(self) -> Source[T_SourceProduct_ret]:
def source(self) -> Source[T_SourceProduct_ret]:
"""
The source producer of this conduit.
"""

@property
@abstractmethod
def _processor(self) -> Processor[T_SourceProduct_ret, T_Output_ret]:
def processor(self) -> Processor[T_SourceProduct_ret, T_Output_ret]:
"""
The second conduit in this chained conduit, processing the output of the
:attr:`._source` conduit.
:attr:`.source` conduit.
"""

def get_final_conduits(self) -> Iterator[SerialConduit[T_Output_ret]]:
"""[see superclass]"""
if self._processor._has_passthrough:
if self.processor._has_passthrough:
yield from cast(
Iterator[SerialConduit[T_Output_ret]], self._source.get_final_conduits()
Iterator[SerialConduit[T_Output_ret]], self.source.get_final_conduits()
)
yield from self._processor.get_final_conduits()
yield from self.processor.get_final_conduits()

def get_connections(
self, *, ingoing: Collection[SerialConduit[Any]]
Expand All @@ -97,8 +97,8 @@ def get_connections(

:return: an iterable of connections
"""
source = self._source
processor = self._processor
source = self.source
processor = self.processor

# We first yield all connections from within the source
yield from source.get_connections(ingoing=ingoing)
Expand All @@ -113,11 +113,16 @@ def get_connections(
# Then we get all connections of the processor, including ingoing connections
yield from processor.get_connections(ingoing=processor_ingoing)

def get_isolated_conduits(self) -> Iterator[SerialConduit[T_Output_ret]]:
"""[see superclass]"""
# Chained conduits are never isolated
yield from ()

def to_expression(self, *, compact: bool = False) -> Expression:
"""[see superclass]"""
return self._source.to_expression(
return self.source.to_expression(
compact=compact
) >> self._processor.to_expression(compact=compact)
) >> self.processor.to_expression(compact=compact)


class _SerialChainedConduit(
Expand All @@ -132,13 +137,13 @@ class _SerialChainedConduit(

@property
@abstractmethod
def _source(self) -> SerialSource[T_SourceProduct_ret]:
def source(self) -> SerialSource[T_SourceProduct_ret]:
"""[see superclass]"""

@property
def chained_conduits(self) -> Iterator[SerialConduit[Any]]:
"""
The chained conduits in the flow leading up to this conduit.
"""
yield from self._source.chained_conduits
yield from self.source.chained_conduits
yield self.final_conduit
8 changes: 0 additions & 8 deletions src/fluxus/core/_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

from typing_extensions import Self

from pytools.api import inheritdoc

from ._conduit import Conduit

log = logging.getLogger(__name__)
Expand All @@ -52,7 +50,6 @@
#


@inheritdoc(match="[see superclass]")
class ConcurrentConduit(
Conduit[T_Product_ret], Generic[T_Product_ret], metaclass=ABCMeta
):
Expand All @@ -71,11 +68,6 @@ def is_concurrent(self) -> bool:
"""
return True

@property
def is_chained(self) -> bool:
"""[see superclass]"""
return any(conduit.is_chained for conduit in self.iter_concurrent_conduits())

@property
def final_conduit(self) -> Self:
"""
Expand Down
Loading
Loading