Skip to content

Commit

Permalink
Merge pull request #1483 from markusdlugi/main
Browse files Browse the repository at this point in the history
fix: properly remove subscriptions in ReplayOperator
  • Loading branch information
jponge committed Jan 12, 2024
2 parents 3b71c14 + 90c2609 commit 44894c1
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public boolean hasReachedCompletion() {
return current.value instanceof Completion;
}

public boolean willReachCompletion() {
return !hasReachedCompletion() && current.next.value instanceof Completion;
}

public boolean hasReachedFailure() {
return current.value instanceof Failure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ReplayOperator<T> extends AbstractMulti<T> {

private final AtomicBoolean upstreamSubscriptionRequested = new AtomicBoolean();
private volatile Subscription upstreamSubscription = null;
private final CopyOnWriteArrayList<ReplaySubscription> subscriptions = new CopyOnWriteArrayList<>();
protected final CopyOnWriteArrayList<ReplaySubscription> subscriptions = new CopyOnWriteArrayList<>();

public ReplayOperator(Multi<T> upstream, long numberOfItemsToReplay) {
this.upstream = upstream;
Expand All @@ -39,11 +39,11 @@ public void subscribe(MultiSubscriber<? super T> subscriber) {
upstream.subscribe(new UpstreamSubscriber(subscriber));
}
ReplaySubscription replaySubscription = new ReplaySubscription(subscriber);
subscriber.onSubscribe(replaySubscription);
subscriptions.add(replaySubscription);
subscriber.onSubscribe(replaySubscription);
}

private class ReplaySubscription implements Subscription {
protected class ReplaySubscription implements Subscription {

private final MultiSubscriber<? super T> downstream;
private final AtomicLong demand = new AtomicLong();
Expand Down Expand Up @@ -115,6 +115,12 @@ private void drain() {
downstream.onItem(item);
emitted++;
}
if (!done && cursor.willReachCompletion()) {
cancel();
cursor.readCompletion();
downstream.onComplete();
return;
}
demand.addAndGet(-emitted);
if (wip.decrementAndGet() == 0) {
return;
Expand All @@ -123,7 +129,7 @@ private void drain() {
}
}

private class UpstreamSubscriber implements MultiSubscriber<T>, ContextSupport {
protected class UpstreamSubscriber implements MultiSubscriber<T>, ContextSupport {

private final MultiSubscriber<? super T> initialSubscriber;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -66,6 +71,17 @@ void basicReplayAll() {
sub.assertCompleted();
}

@Test
void shouldCompleteWhenRequestEqualsMax() {
Multi<Integer> upstream = Multi.createFrom().range(1, 10);
Multi<Integer> replay = Multi.createBy().replaying().upTo(9).ofMulti(upstream);

AssertSubscriber<Integer> sub = replay.subscribe().withSubscriber(AssertSubscriber.create());
sub.request(9);
assertThat(sub.getItems()).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9);
sub.assertCompleted();
}

@Test
void basicReplayLatest3() {
ExecutorService pool = Executors.newFixedThreadPool(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.smallrye.mutiny.operators.multi.replay;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;

public class ReplayOperatorTest {

@Test
public void shouldRemoveSubscriptionAfterCompletion() {
// given
var upstream = Multi.createFrom().range(0, 3);
var operator = new ReplayOperator<>(upstream, 3);

// when
var subscriber = operator.subscribe().withSubscriber(AssertSubscriber.create(3));
var subscriber2 = operator.subscribe().withSubscriber(AssertSubscriber.create(3));

// then
subscriber.assertItems(0, 1, 2).assertCompleted();
subscriber2.assertItems(0, 1, 2).assertCompleted();
assertEquals(0, operator.subscriptions.size());
}
}

0 comments on commit 44894c1

Please sign in to comment.