Skip to content

Commit

Permalink
Merge #3700 into 3.6.4
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Feb 13, 2024
2 parents 96ab48c + d855ba9 commit 659b45c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ public void arbiter(II_Result r) {

// Ignore, flaky test (https://github.com/reactor/reactor-core/issues/3633)
//@JCStressTest
@Outcome(id = {"200, 0", "200, 1"}, expect = ACCEPTABLE, desc = "Should produced exactly what was requested")
@Outcome(id = {"200, 0, 0", "200, 0, 1"}, expect = ACCEPTABLE, desc = "Should " +
"produced exactly what was requested")
@State
public static class RequestAndProduceStressTest2 extends FluxSwitchMapStressTest {

Expand Down Expand Up @@ -312,9 +313,17 @@ public void outerRequest() {
}

@Arbiter
public void arbiter(II_Result r) {
r.r1 = (int) (stressSubscriber.onNextCalls.get() + switchMapMain.requested);
r.r2 = stressSubscriber.onCompleteCalls.get();
public void arbiter(III_Result r) {
r.r1 = stressSubscriber.onNextCalls.get();
r.r2 = (int) switchMapMain.requested;
r.r3 = stressSubscriber.onCompleteCalls.get();

switch (r.toString()) {
case "200, 0, 0":
case "200, 0, 1":
break;
default: throw new IllegalStateException(r + " " + fastLogger);
}

if (stressSubscriber.onNextCalls.get() < 200 && stressSubscriber.onNextDiscarded.get() < switchMapMain.requested) {
throw new IllegalStateException(r + " " + fastLogger);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,6 +103,7 @@ public void subscribe(Subscriber<? super T> s) {
}

s.onSubscribe(p); // will trigger drain() via request()
p.drain(); // ensures that empty source terminal signal is propagated without waiting for a request from the subscriber
}

boolean add(ColdTestPublisherSubscription<T> s) {
Expand Down Expand Up @@ -315,7 +316,7 @@ private void drain() {
* @return true if the TestPublisher was terminated, false otherwise
*/
private boolean emitTerminalSignalIfAny() {
if (parent.done) {
if (parent.done && this.parent.values.size() == index) {
parent.remove(this);

final Throwable t = parent.error;
Expand Down

0 comments on commit 659b45c

Please sign in to comment.