Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow hangs even when using terminal operators #4035

Closed
ansman opened this issue Feb 6, 2024 · 11 comments
Closed

Flow hangs even when using terminal operators #4035

ansman opened this issue Feb 6, 2024 · 11 comments
Labels

Comments

@ansman
Copy link
Contributor

ansman commented Feb 6, 2024

Describe the bug

A flow that uses take(1), onCompletion { if (it == null) awaitCancellation() } and first() can hang forever instead of returning the first value.

Interestingly, removing take(1) will cause the flow to behave as expected.

Provide a Reproducer
Here is such an example:

flowOf(1)
    .take(1)
    .onCompletion { if (it == null) awaitCancellation() }
    .first()

This code is expected to return 1, instead if suspends forever. If you remove take(1) it returns 1 as expected.

@ansman ansman added the bug label Feb 6, 2024
@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Feb 7, 2024

Sorry, but could you clarify your intentions and why this doesn't match them? Your code literally says: "await cancellation after everything's finished if the flow completed without errors and didn't need to be cancelled internally."

@denis-bezrukov
Copy link

@dkhalanskyjb the behavior is inconsistent, e.g.

flowOf(1)
    .take(1)
    .onCompletion { if (it == null) awaitCancellation() }
    .first()

Hangs
while

flowOf(1)
    .take(1)
    .onEach { } // no-op operator that doesn't change anything
    .onCompletion { if (it == null) awaitCancellation() }
    .first()

Doesn't

Similarly,

flowOf(1)
     // .take(1) commented take(1). Take(1) is no-op here effectively, as upstream (`flowOf(1)` is also a flow containing only a single element)
    .onCompletion { if (it == null) awaitCancellation() }
    .first()

doesn't hang. So the same (from functional perspective) chain behaves differently.

"await cancellation after everything's finished if the flow completed without errors and didn't need to be cancelled internally."

In this case, that means the second and third snippets are actually those with a bug

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Feb 7, 2024

Adding or removing flow operators can change the number of suspensions that collection goes through. That's expected and is intentionally left as an implementation detail. In a future release, the behavior may change. In effect, you have a race condition in your code: either the collection finishes before the last element is emitted, or it doesn't. So, there's no bug so far.

@ansman
Copy link
Contributor Author

ansman commented Feb 7, 2024

But first should abort collection after the first emission so it seems strange that it would hang. It also doesn't hang if you instead do emitAll(flow { awaitCancellation() }).

Also, how can there be a race condition when a single thread is involved?

The code also should not have any issues since an operator like take could only do its work after the downstream collector has received the item and since the terminal operator (first) throws an abort exception, that should bubble up and abort all collection.

I see the timeline as:

  1. flowOf emits an item
  2. take emits this item
  3. onCompletion emits this item
  4. first receives the item
  5. first throws an abort exception
  6. This exception is intercepted by onCompletion which calls the block with it (the block itself does nothing at this point)
  7. The exception rethrown by onCompletion
  8. It bubbles up and finally is caught by the first operator.

Judging by the implementation, I don't see how this code should behave differently than:

flowOf(1)
  .take(1)
  .let { upstream ->
    flow {
      emitAll(upstream)
      awaitCancellation()
    }
  }

I don't see why/how onCompletion would get called with null since first throws.

@dkhalanskyjb
Copy link
Collaborator

But first should abort collection after the first emission so it seems strange that it would hang.

take is optimized to immediately signal completion after the required number of elements was emitted. We could also tweak the behavior of flowOf so that it also does that. Right now, flowOf is simply implemented as

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
    // <-- this line
} // <-- this brace

(

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
)

After the last emit is called, "this line" still needs to execute. Flow is considered completed only when "this brace" is reached. We know that there's nothing going on in "this line", but the flow machinery doesn't. So, in flowOf(1).first(), first signals that it doesn't need further elements—even though there are none.

We could tweak this behavior of flowOf, but it's not clear what would be the reason for us to do so.

Also, how can there be a race condition when a single thread is involved?

Easy:

runBlocking {
  launch {
    repeat(100) { yield() }
    println("A")
  }
  launch {
    repeat(100) { yield() }
    println("B")
  }
}

A slight change in implementation of either A or B (changing 100 to 102 or 98) will change the order of operations.

The code is still deterministic: rerunning it several times will yield the same result. But the race is there: there's no explicit dependency between the moments of A and B, so changing one thing could lead to the order of execution being changed.

The code also should not have any issues since an operator like take could only do its work after the downstream collector has received the item and since the terminal operator (first) throws an abort exception, that should bubble up and abort all collection.

This would be the case, but the result of take doesn't need to be cancelled. It is implemented in such a way that it succeeds in signaling that it emitted all the elements before the exception from first needs to be thrown.

I see the timeline as:

The problem is at point 2. It essentially emits a "here's the item, but also, I'm done."

@dkhalanskyjb
Copy link
Collaborator

Though this is all theoretical. It can be fun to dig into implementation details, but clearly, no one would write the code like the one in the opening post: they'd just write 1 (or awaitCancellation().let { 1 } if the intention was to call awaitCancellation). If you can show a case where this particular change in the number of suspensions is causing issues in realistic code, we can discuss more specifically how to rewrite your code to avoid data races.

@ansman
Copy link
Contributor Author

ansman commented Feb 7, 2024

This is interesting and super helpful.
You say this:

This would be the case, but the result of take doesn't need to be cancelled. It is implemented in such a way that it succeeds in signaling that it emitted all the elements before the exception from first needs to be thrown.

But looking at the implementation, I see that take calls emitAbort which just calls emit and throws. But since emit is called first, that means that the first operator will see receive the item before that throw call and since first itself throws shouldn't that just bubble up and take never throws (or completes)?

@dkhalanskyjb
Copy link
Collaborator

Oh, yeah. Yes, you are right. This is not due to an extra suspension in flowOf, as I initially presumed. Sorry for misleading you, and thanks for pursuing this!

A side note: now that I think about it, a well-behaved first does not suffer from a concurrent race: we want every implementation of first to immediately cancel the flow after the first element is received, so no other non-cleanup code is executed. For example, flow { emit(1); println("X"); emit(2) }.first() should never print X. So, there is no way for first to complete processing the first element without throwing an exception, and consequently, for onCompletion not to catch it.

I've localized the issue. Here's the fun part: the following will also hang!

flowOf(1, 2, 3)
    .take(100) // won't hang if this is removed
    .onCompletion { if (it == null) awaitCancellation() }
    .first()

That's clearly a bug, because flowOf(1, 2, 3).take(100) has three elements, first() only returns one, and onCompletion guarantees (https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html):

[...] this operator reports exception that occur both upstream and downstream and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if the flow had fully completed successfully

@ansman
Copy link
Contributor Author

ansman commented Feb 7, 2024

Ah, great find. I think I understand a bit more now. Does this mean that an operator like first will only abort when the flow tries to emit again (or suspend?)?

@dkhalanskyjb
Copy link
Collaborator

No-no, the opposite: first will force the emissions to stop immediately (as it should), or there would be a waste of computations. That's why there are no data races in sensible implementations of first for Flow.

The issue is due to an internal implementation detail: when first signals "No more elements, please," take, who can also raise the same request, sees the exception and goes, "Oh, so I guess I asked the flow to stop emitting, and it did stop. The goal of that exception accomplished, no one else needs to see the exception" (including onCompletion). So it doesn't raise the exception further.

@ansman
Copy link
Contributor Author

ansman commented Feb 7, 2024

Ah, so take is missing an owner check then? Or rather the owner check isn't correctly implemented.

dkhalanskyjb added a commit that referenced this issue Feb 8, 2024
Before this change, it could happen that some size-limiting
operators upstream swallowed the requests to limit the flow size
emitted by the operators downstream.

This could cause `onCompletion` calls between these operators to
incorrectly report that the flow was not in fact limited by the
downstream operators.

Additionally, in the presence of additional size-limiting operators
in the chain, `first` and `single` and their variants could exhibit
incorrect behavior where emitting a value from `onCompletion` would
overwrite their output.

Fixes #4035
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants