Skip to content

Commit

Permalink
Ensure that flow operators propagate the cancellation exceptions (#4038)
Browse files Browse the repository at this point in the history
Before this change, it could happen that some size-limiting
operators upstream swallowed the requests to limit the flow size
emitted by the operators downstream.

This could cause `onCompletion` calls between these operators to
incorrectly report that the flow was not in fact limited by the
downstream operators.

Additionally, in the presence of additional size-limiting operators
in the chain, `first` and `single` and their variants could exhibit
incorrect behavior where emitting a value from `onCompletion` would
overwrite their output.

Fixes #4035
  • Loading branch information
dkhalanskyjb committed Feb 15, 2024
1 parent 761bdeb commit d0dabb9
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 15 deletions.
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
val collectJob = Job()
(second as SendChannel<*>).invokeOnClose {
// Optimization to avoid AFE allocation when the other flow is done
if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
if (collectJob.isActive) collectJob.cancel(AbortFlowException(collectJob))
}

try {
Expand All @@ -124,14 +124,14 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
flow.collect { value ->
withContextUndispatched(scopeContext, Unit, cnt) {
val otherValue = second.receiveCatching().getOrElse {
throw it ?:AbortFlowException(this@unsafeFlow)
throw it ?: AbortFlowException(collectJob)
}
emit(transform(value, NULL.unbox(otherValue)))
}
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this@unsafeFlow)
e.checkOwnership(owner = collectJob)
} finally {
second.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

/**
* This exception is thrown when operator need no more elements from the flow.
* This exception should never escape outside of operator's implementation.
* This exception is thrown when an operator needs no more elements from the flow.
* The operator should never allow this exception to be thrown past its own boundary.
* This exception can be safely ignored by non-terminal flow operator if and only if it was caught by its owner
* (see usages of [checkOwnership]).
* Therefore, the [owner] parameter must be unique for every invocation of every operator.
*/
internal expect class AbortFlowException(owner: FlowCollector<*>) : CancellationException {
public val owner: FlowCollector<*>
internal expect class AbortFlowException(owner: Any) : CancellationException {
val owner: Any
}

internal fun AbortFlowException.checkOwnership(owner: FlowCollector<*>) {
internal fun AbortFlowException.checkOwnership(owner: Any) {
if (this.owner !== owner) throw this
}

Expand Down
9 changes: 5 additions & 4 deletions kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Requested element count $count should be positive" }
return flow {
val ownershipMarker = Any()
var consumed = 0
try {
collect { value ->
Expand All @@ -56,18 +57,18 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
return@collect emitAbort(value, ownershipMarker)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
e.checkOwnership(owner = ownershipMarker)
}
}
}

private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
private suspend fun <T> FlowCollector<T>.emitAbort(value: T, ownershipMarker: Any) {
emit(value)
throw AbortFlowException(this)
throw AbortFlowException(ownershipMarker)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,90 @@ class OnCompletionTest : TestBase() {
.take(1)
.collect()
}

/**
* Tests that the operators that are used to limit the flow (like [take] and [zip]) faithfully propagate the
* cancellation exception to the original owner.
*/
@Test
fun testOnCompletionBetweenLimitingOperators() = runTest {
// `zip` doesn't eat the exception thrown by `take`:
flowOf(1, 2, 3)
.zip(flowOf(4, 5)) { a, b -> a + b }
.onCompletion {
expect(2)
assertNotNull(it)
}
.take(1)
.collect {
expect(1)
}

// `take` doesn't eat the exception thrown by `zip`:
flowOf(1, 2, 3)
.take(2)
.onCompletion {
expect(4)
assertNotNull(it)
}
.zip(flowOf(4)) { a, b -> a + b }
.collect {
expect(3)
}

// `take` doesn't eat the exception thrown by `first`:
flowOf(1, 2, 3)
.take(2)
.onCompletion {
expect(5)
assertNotNull(it)
}
.first()

// `zip` doesn't eat the exception thrown by `first`:
flowOf(1, 2, 3)
.zip(flowOf(4, 5)) { a, b -> a + b }
.onCompletion {
expect(6)
assertNotNull(it)
}
.first()

// `take` doesn't eat the exception thrown by another `take`:
flowOf(1, 2, 3)
.take(2)
.onCompletion {
expect(8)
assertNotNull(it)
}
.take(1)
.collect {
expect(7)
}

// `zip` doesn't eat the exception thrown by another `zip`:
flowOf(1, 2, 3)
.zip(flowOf(4, 5)) { a, b -> a + b }
.onCompletion {
expect(10)
assertNotNull(it)
}
.zip(flowOf(6)) { a, b -> a + b }
.collect {
expect(9)
}

finish(11)
}

/**
* Tests that emitting new elements after completion doesn't overwrite the old elements.
*/
@Test
fun testEmittingElementsAfterCancellation() = runTest {
assertEquals(1, flowOf(1, 2, 3)
.take(100)
.onCompletion { emit(4) }
.first())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
actual val owner: FlowCollector<*>
actual val owner: Any
) : CancellationException("Flow was aborted, no more elements needed")
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
@JvmField @Transient actual val owner: FlowCollector<*>
@JvmField @Transient actual val owner: Any
) : CancellationException("Flow was aborted, no more elements needed") {

override fun fillInStackTrace(): Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
actual val owner: FlowCollector<*>
actual val owner: Any
) : CancellationException("Flow was aborted, no more elements needed")
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")

0 comments on commit d0dabb9

Please sign in to comment.