Skip to content

Commit

Permalink
Improve fix of duplicate upstream subscription during reactive cache put
Browse files Browse the repository at this point in the history
This commit fixes an issue where a Cacheable method which returns a
Flux (or multi-value publisher) will be invoked once, but the returned
publisher is actually subscribed twice.

The previous fix 988f363 would cause the cached elements to depend on
the first usage pattern / request pattern, which is likely to be too
confusing to users. This fix reintroduces the notion of exhausting the
original Flux by having a second subscriber dedicated to that, but uses
`refCount(2)` to ensure that the original `Flux` returned by the cached
method is still only subscribed once.

Closes gh-32370
  • Loading branch information
simonbasle committed Mar 7, 2024
1 parent c1d4b61 commit 6d9a2eb
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,28 @@ void cacheHitDetermination(Class<?> configClass) {
}


@ParameterizedTest
@ValueSource(classes = {AsyncCacheModeConfig.class, AsyncCacheModeConfig.class})
void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);

Object key = new Object();

List<Long> l1 = service.cacheFlux(key).take(1L, true).collectList().block();
List<Long> l2 = service.cacheFlux(key).take(3L, true).collectList().block();
List<Long> l3 = service.cacheFlux(key).collectList().block();

Long first = l1.get(0);

assertThat(l1).as("l1").containsExactly(first);
assertThat(l2).as("l2").containsExactly(first, 0L, -1L);
assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L);

ctx.close();
}


@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {

Expand All @@ -119,12 +141,16 @@ CompletableFuture<Long> cacheFuture(Object arg) {

@Cacheable
Mono<Long> cacheMono(Object arg) {
return Mono.just(this.counter.getAndIncrement());
// here counter not only reflects invocations of cacheMono but subscriptions to
// the returned Mono as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Mono.defer(() -> Mono.just(this.counter.getAndIncrement()));
}

@Cacheable
Flux<Long> cacheFlux(Object arg) {
return Flux.just(this.counter.getAndIncrement(), 0L);
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.observability.DefaultSignalListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -90,7 +90,6 @@
* @author Sam Brannen
* @author Stephane Nicoll
* @author Sebastien Deleuze
* @author Simon Baslé
* @since 3.1
*/
public abstract class CacheAspectSupport extends AbstractCacheInvoker
Expand Down Expand Up @@ -1037,45 +1036,34 @@ public void performCachePut(@Nullable Object value) {


/**
* Reactor stateful SignalListener for collecting a List to cache.
* Reactive Streams Subscriber for exhausting the Flux and collecting a List
* to cache.
*/
private class CachePutSignalListener extends DefaultSignalListener<Object> {
private final class CachePutListSubscriber implements Subscriber<Object> {

private final AtomicReference<CachePutRequest> request;
private final CachePutRequest request;

private final List<Object> cacheValue = new ArrayList<>();

public CachePutSignalListener(CachePutRequest request) {
this.request = new AtomicReference<>(request);
public CachePutListSubscriber(CachePutRequest request) {
this.request = request;
}

@Override
public void doOnNext(Object o) {
this.cacheValue.add(o);
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}

@Override
public void doOnComplete() {
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
public void onNext(Object o) {
this.cacheValue.add(o);
}

@Override
public void doOnCancel() {
// Note: we don't use doFinally as we want to propagate the signal after cache put, not before
CachePutRequest r = this.request.get();
if (this.request.compareAndSet(r, null)) {
r.performCachePut(this.cacheValue);
}
public void onError(Throwable t) {
this.cacheValue.clear();
}

@Override
public void doOnError(Throwable error) {
if (this.request.getAndSet(null) != null) {
this.cacheValue.clear();
}
public void onComplete() {
this.request.performCachePut(this.cacheValue);
}
}

Expand Down Expand Up @@ -1159,8 +1147,10 @@ public Object processPutRequest(CachePutRequest request, @Nullable Object result
ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
if (adapter != null) {
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(adapter.toPublisher(result))
.tap(() -> new CachePutSignalListener(request)));
Flux<?> source = Flux.from(adapter.toPublisher(result))
.publish().refCount(2);
source.subscribe(new CachePutListSubscriber(request));
return adapter.fromPublisher(source);
}
else {
return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ void cacheHitDetermination(Class<?> configClass) {
ctx.close();
}

@ParameterizedTest
@ValueSource(classes = {EarlyCacheHitDeterminationConfig.class,
EarlyCacheHitDeterminationWithoutNullValuesConfig.class,
LateCacheHitDeterminationConfig.class,
LateCacheHitDeterminationWithValueWrapperConfig.class})
void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class);
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);

Object key = new Object();

List<Long> l1 = service.cacheFlux(key).take(1L, true).collectList().block();
List<Long> l2 = service.cacheFlux(key).take(3L, true).collectList().block();
List<Long> l3 = service.cacheFlux(key).collectList().block();

Long first = l1.get(0);

assertThat(l1).as("l1").containsExactly(first);
assertThat(l2).as("l2").containsExactly(first, 0L, -1L);
assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L);

ctx.close();
}

@CacheConfig(cacheNames = "first")
static class ReactiveCacheableService {

Expand All @@ -132,7 +156,7 @@ Mono<Long> cacheMono(Object arg) {
Flux<Long> cacheFlux(Object arg) {
// here counter not only reflects invocations of cacheFlux but subscriptions to
// the returned Flux as well. See https://github.com/spring-projects/spring-framework/issues/32370
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L));
return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L));
}
}

Expand Down

0 comments on commit 6d9a2eb

Please sign in to comment.