diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt index 410009b0..b61925da 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt @@ -24,6 +24,8 @@ package com.hoc081098.flowext +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector @@ -34,6 +36,8 @@ import kotlinx.coroutines.flow.FlowCollector * This function is similar to [RxJava's fromCallable](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#fromCallable-java.util.concurrent.Callable-). * See also [flowFromSuspend] for the suspend version. * + * The returned [Flow] is cancellable and has the same behaviour as [kotlinx.coroutines.flow.cancellable]. + * * ## Example of usage: * * ```kotlin @@ -71,5 +75,9 @@ public fun flowFromNonSuspend(function: () -> T): Flow = // We don't need to use `AbstractFlow` here because we only emit a single value without a context switch, // and we guarantee all Flow's constraints: context preservation and exception transparency. private class FlowFromNonSuspend(private val function: () -> T) : Flow { - override suspend fun collect(collector: FlowCollector) = collector.emit(function()) + override suspend fun collect(collector: FlowCollector) { + val value = function() + currentCoroutineContext().ensureActive() + collector.emit(value) + } } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt index efe5f283..f444aecc 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt @@ -24,6 +24,8 @@ package com.hoc081098.flowext +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector @@ -34,6 +36,8 @@ import kotlinx.coroutines.flow.FlowCollector * This function is similar to [RxJava's fromCallable](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#fromCallable-java.util.concurrent.Callable-). * See also [flowFromNonSuspend] for the non-suspend version. * + * The returned [Flow] is cancellable and has the same behaviour as [kotlinx.coroutines.flow.cancellable]. + * * ## Example of usage: * * ```kotlin @@ -75,5 +79,9 @@ public fun flowFromSuspend(function: suspend () -> T): Flow = // We don't need to use `AbstractFlow` here because we only emit a single value without a context switch, // and we guarantee all Flow's constraints: context preservation and exception transparency. private class FlowFromSuspend(private val function: suspend () -> T) : Flow { - override suspend fun collect(collector: FlowCollector) = collector.emit(function()) + override suspend fun collect(collector: FlowCollector) { + val value = function() + currentCoroutineContext().ensureActive() + collector.emit(value) + } } diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt index 6fc29360..02a2ca0a 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt @@ -33,7 +33,13 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.flow.cancellable import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.take @ExperimentalCoroutinesApi @@ -89,4 +95,19 @@ class FlowFromNonSuspendTest : BaseTest() { flow.test(listOf(Event.Value(42), Event.Complete)) } + + @Test + fun testCancellable() = runTest { + var sum = 0 + val flow = flowFromNonSuspend { 1 } + .onStart { currentCoroutineContext().cancel() } + .onEach { sum += it } + + flow.launchIn(this).join() + assertEquals(0, sum) + + sum = 0 + flow.cancellable().launchIn(this).join() + assertEquals(0, sum) + } } diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt index decf96bf..00b1e136 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt @@ -34,8 +34,14 @@ import kotlin.test.assertEquals import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.cancellable import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.take import kotlinx.coroutines.withContext @@ -119,4 +125,19 @@ class FlowFromSuspendTest : BaseTest() { flow.test(listOf(Event.Value(42), Event.Complete)) } + + @Test + fun testCancellable() = runTest { + var sum = 0 + val flow = flowFromSuspend { 1 } + .onStart { currentCoroutineContext().cancel() } + .onEach { sum += it } + + flow.launchIn(this).join() + assertEquals(0, sum) + + sum = 0 + flow.cancellable().launchIn(this).join() + assertEquals(0, sum) + } }