Skip to content

Commit

Permalink
Select(complete)
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 11, 2023
1 parent 5d2c858 commit b7ceb91
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public static partial class EventFactory
{
// TODO: this is working space, will remove this file after complete.

// TODO: Range, Repeat, Defer, DeferAsync, FromAsync, FromAsyncPattern, FromEvent, FromEventPattern, Start, Using, Create
// TODO: Defer, DeferAsync, FromAsync, FromAsyncPattern, FromEvent, FromEventPattern, Start, Using, Create
// Timer, Interval, TimerFrame, IntervalFrame, ToObservable(ToEvent)

// TODO: Convert
Expand All @@ -22,7 +22,9 @@ public static partial class EventFactory

// AsObservable
// AsSingleUnitObservable

// AsUnitObservable
// AsUnitComplete
// AsNeverComplete

// Repeat
Expand Down
33 changes: 33 additions & 0 deletions src/R3/Operators/Select.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ public static Event<TResult> Select<TMessage, TResult>(this Event<TMessage> sour
{
return new Select<TMessage, TResult>(source, selector);
}

public static CompletableEvent<TMessageResult, TCompleteResult> Select<TMessage, TComplete, TMessageResult, TCompleteResult>(
this CompletableEvent<TMessage, TComplete> source,
Func<TMessage, TMessageResult> messageSelector,
Func<TComplete, TCompleteResult> completeSelector)
{
return new Select<TMessage, TComplete, TMessageResult, TCompleteResult>(source, messageSelector, completeSelector);
}
}
}

Expand All @@ -32,4 +40,29 @@ public override void OnNext(TMessage message)
}
}
}

internal sealed class Select<TMessage, TComplete, TMessageResult, TCompleteResult>(
CompletableEvent<TMessage, TComplete> source,
Func<TMessage, TMessageResult> messageSelector,
Func<TComplete, TCompleteResult> completeSelector
) : CompletableEvent<TMessageResult, TCompleteResult>
{
protected override IDisposable SubscribeCore(Subscriber<TMessageResult, TCompleteResult> subscriber)
{
return source.Subscribe(new _Select(subscriber, messageSelector, completeSelector));
}

class _Select(Subscriber<TMessageResult, TCompleteResult> subscriber, Func<TMessage, TMessageResult> messageSelector, Func<TComplete, TCompleteResult> completeSelector) : Subscriber<TMessage, TComplete>
{
public override void OnNext(TMessage message)
{
subscriber.OnNext(messageSelector(message));
}

protected override void OnCompletedCore(TComplete complete)
{
subscriber.OnCompleted(completeSelector(complete));
}
}
}
}

0 comments on commit b7ceb91

Please sign in to comment.