Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that flow operators propagate the cancellation exceptions #4038

Merged
merged 3 commits into from
Feb 15, 2024

Conversation

dkhalanskyjb
Copy link
Contributor

Before this change, it could happen that some size-limiting operators upstream swallowed the requests to limit the flow size emitted by the operators downstream.

This could cause onCompletion calls between these operators to incorrectly report that the flow was not in fact limited by the downstream operators.

Additionally, in the presence of additional size-limiting operators in the chain, first and single and their variants could exhibit incorrect behavior where emitting a value from onCompletion would overwrite their output.

Fixes #4035

Before this change, it could happen that some size-limiting
operators upstream swallowed the requests to limit the flow size
emitted by the operators downstream.

This could cause `onCompletion` calls between these operators to
incorrectly report that the flow was not in fact limited by the
downstream operators.

Additionally, in the presence of additional size-limiting operators
in the chain, `first` and `single` and their variants could exhibit
incorrect behavior where emitting a value from `onCompletion` would
overwrite their output.

Fixes #4035
@qwwdfsad
Copy link
Contributor

Could you please double-check my understanding?

At first glance, the behavioural difference in this snippet

flowOf(1)
    .take(1)
    .onEach {  } // 1
    .onCompletion { println("??? $it") }
    .first()

with 1 being commented out or not just doesn't make any sense. I tried to figure it out by just looking at the commit message, and it was unclear, thus decided to understand the root cause before looking at the solution to avoid spoiling the fun review.

It seems to me that the root cause is the following:

  1. take uses FlowCollector that it receives in an argument as an ownership token, and it checks the ownership in catch block
  2. The problem is that FlowCollector is not unique to the take itself and, in fact, depends on the downstream operator. It might be a collector from onEach, or it might be an onCompletion collector.
  3. onCompletion collector, which is collectWhile collector, throws an AbortFlowException and uses itself as an ownership token.
  4. According to 2, ownership check in take might pass or might not pass, leading to the observed behaviour

So the root cause is straightforward -- we cannot use FlowCollector from an argument as an ownership token

Copy link
Contributor

@qwwdfsad qwwdfsad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

@dkhalanskyjb dkhalanskyjb merged commit d0dabb9 into develop Feb 15, 2024
1 check failed
@dkhalanskyjb dkhalanskyjb deleted the dk-fix-AbortFlowException-ownership branch February 15, 2024 09:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants