Skip to content

Commit

Permalink
fix(flowFromSuspend, flowFromNonSuspend): add CancellableFlow behav…
Browse files Browse the repository at this point in the history
…ior (#218)
  • Loading branch information
hoc081098 authored Dec 23, 2023
1 parent 6d96bc2 commit cfa8f63
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package com.hoc081098.flowext

import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector

Expand All @@ -34,6 +36,8 @@ import kotlinx.coroutines.flow.FlowCollector
* This function is similar to [RxJava's fromCallable](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#fromCallable-java.util.concurrent.Callable-).
* See also [flowFromSuspend] for the suspend version.
*
* The returned [Flow] is cancellable and has the same behaviour as [kotlinx.coroutines.flow.cancellable].
*
* ## Example of usage:
*
* ```kotlin
Expand Down Expand Up @@ -71,5 +75,9 @@ public fun <T> flowFromNonSuspend(function: () -> T): Flow<T> =
// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch,
// and we guarantee all Flow's constraints: context preservation and exception transparency.
private class FlowFromNonSuspend<T>(private val function: () -> T) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = collector.emit(function())
override suspend fun collect(collector: FlowCollector<T>) {
val value = function()
currentCoroutineContext().ensureActive()
collector.emit(value)
}
}
10 changes: 9 additions & 1 deletion src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package com.hoc081098.flowext

import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector

Expand All @@ -34,6 +36,8 @@ import kotlinx.coroutines.flow.FlowCollector
* This function is similar to [RxJava's fromCallable](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#fromCallable-java.util.concurrent.Callable-).
* See also [flowFromNonSuspend] for the non-suspend version.
*
* The returned [Flow] is cancellable and has the same behaviour as [kotlinx.coroutines.flow.cancellable].
*
* ## Example of usage:
*
* ```kotlin
Expand Down Expand Up @@ -75,5 +79,9 @@ public fun <T> flowFromSuspend(function: suspend () -> T): Flow<T> =
// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch,
// and we guarantee all Flow's constraints: context preservation and exception transparency.
private class FlowFromSuspend<T>(private val function: suspend () -> T) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = collector.emit(function())
override suspend fun collect(collector: FlowCollector<T>) {
val value = function()
currentCoroutineContext().ensureActive()
collector.emit(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.take

@ExperimentalCoroutinesApi
Expand Down Expand Up @@ -89,4 +95,19 @@ class FlowFromNonSuspendTest : BaseTest() {

flow.test(listOf(Event.Value(42), Event.Complete))
}

@Test
fun testCancellable() = runTest {
var sum = 0
val flow = flowFromNonSuspend { 1 }
.onStart { currentCoroutineContext().cancel() }
.onEach { sum += it }

flow.launchIn(this).join()
assertEquals(0, sum)

sum = 0
flow.cancellable().launchIn(this).join()
assertEquals(0, sum)
}
}
21 changes: 21 additions & 0 deletions src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ import kotlin.test.assertEquals
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.withContext

Expand Down Expand Up @@ -119,4 +125,19 @@ class FlowFromSuspendTest : BaseTest() {

flow.test(listOf(Event.Value(42), Event.Complete))
}

@Test
fun testCancellable() = runTest {
var sum = 0
val flow = flowFromSuspend { 1 }
.onStart { currentCoroutineContext().cancel() }
.onEach { sum += it }

flow.launchIn(this).join()
assertEquals(0, sum)

sum = 0
flow.cancellable().launchIn(this).join()
assertEquals(0, sum)
}
}

0 comments on commit cfa8f63

Please sign in to comment.