Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer timeout with fair backpressure #3332

Merged
merged 15 commits into from
Jun 7, 2023
Merged

Conversation

chemicL
Copy link
Member

@chemicL chemicL commented Jan 16, 2023

This change adds a variant of bufferTimeout that honors backpressure of a slow downstream Subscriber better than the current implementation (which just errors in the face of backpressure and timeouts).

The problem with the existing implementation is that it requests n * maxSize items from the upstream with the assumption that the items will fill a buffer which was requested. However, if timeouts occur, the amount of buffers created and delivered can exceed the accumulated demand. In such case, a backpressure error happens.

The new variant uses prefetching with 4 * maxSize items from the upstream. Internally it uses a queue, which is used to satisfy the downstream demand with no more buffers than have been requested. It uses a watermark to replenish the queue to be ready to deliver buffers in an efficient manner when the demand increases.

@chemicL chemicL self-assigned this Jan 16, 2023
@almogtavor
Copy link

@chemicL Can you update us regarding what's left in this PR? This feature would really help me and I'd like to help if possible

@chemicL
Copy link
Member Author

chemicL commented Feb 6, 2023

@almogtavor I'm waiting for @OlegDokuka to find some time to review the change. Also, I needed to move on to a different subject and couldn't spend more time thinking about JCStress tests. If you have ideas how to improve confidence in the concurrent execution of this operator, the ideas what scenarios to test would be useful. So far I ran the tough stress test suite against it on an x86 PC and an M1 ARM Mac against JDK 8, 11, and 17 and wasn't able to get it to an inconsistent state. Let me work with the team and try to get this feature in.

@almogtavor
Copy link

almogtavor commented Mar 21, 2023

@OlegDokuka @chemicL can you tell me approximately when this will get reviewed? This feature will help a lot 😊

@patpatpat123
Copy link

Upvoting this, I am also blocked by this, this fix would help. Cheering for you guys

@patpatpat123
Copy link

Hello team, I heard there is a workaround using a (window operation + concatMap) instead of buffer timeout, which can still yield the exception as of 3.5.x.

Would it be possible to share the workaround please?

@OlegDokuka
Copy link
Contributor

@patpatpat123

Hello team, I heard there is a workaround using a (window operation + concatMap) instead of buffer timeout, which can still yield the exception as of 3.5.x.

do you mean you see issues in windowTimeout, if so can you open an issue which reproduces the failure.

Thanks

@chemicL chemicL marked this pull request as ready for review May 22, 2023 14:03
@chemicL chemicL requested a review from a team as a code owner May 22, 2023 14:03
@chemicL chemicL changed the title [WIP] Buffer timeout with fair backpressure Buffer timeout with fair backpressure May 22, 2023
@chemicL chemicL added the type/enhancement A general enhancement label May 22, 2023
@chemicL chemicL added this to the 3.4.30 milestone May 22, 2023
@patpatpat123
Copy link

So basically, in order to perform some internal batch operation within a flux, for a number or a duration, (on purpose not using the words bufferTimeout or windowTimeout here) one has the following possibilities:

use bufferTimeout -> very high likelihood of encountering "Could not emit buffer due to lack of requests"
use windowTimeout + concatMap collectList -> very noticeable performance degradation

May I ask if my understanding is correct please?

@chemicL chemicL force-pushed the buffer-timeout-backpressure branch from dc5589f to befec06 Compare June 6, 2023 08:03
@chemicL chemicL changed the base branch from 3.4.x to main June 6, 2023 08:03
@chemicL chemicL removed this from the 3.4.30 milestone Jun 6, 2023
@chemicL chemicL added this to the 3.5.7 milestone Jun 6, 2023
@chemicL
Copy link
Member Author

chemicL commented Jun 6, 2023

use bufferTimeout -> very high likelihood of encountering "Could not emit buffer due to lack of requests"

As long as the consumer is slower than the producer, yes.

use windowTimeout + concatMap collectList -> very noticeable performance degradation

The performance impact depends on the workload, but in general yes, windowTimeout has a lot more to do as for every window it goes through the entire reactive lifecycle to drain the results, while the buffer is a simple data structure that can be navigated in imperative manner.

For this operator, we aim to deliver it in 3.5.7 release.

@patpatpat123
Copy link

Very clear explanation @chemicL .
Many thanks

@chemicL chemicL merged commit 59f7cb3 into main Jun 7, 2023
6 of 7 checks passed
@chemicL chemicL deleted the buffer-timeout-backpressure branch June 7, 2023 11:44
@chemicL
Copy link
Member Author

chemicL commented Jun 7, 2023

Build failure attributed to a flaky test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants