Skip to content

Commit

Permalink
Optimize Coroutine invocations
Browse files Browse the repository at this point in the history
KClass instantiation in CoroutinesUtils is suboptimal, and should be
replaced by KTypes#isSubtypeOf checks using pre-instantiated types for
Flow, Mono and Publisher.

This commit impact on performances is significant since a throughput
increase between 2x and 3x has been measured on basic endpoints.

Closes gh-32390
  • Loading branch information
sdeleuze committed Mar 7, 2024
1 parent 6d9a2eb commit 579dbc4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.JvmClassMappingKt;
import kotlin.reflect.KClass;
import kotlin.reflect.KClassifier;
import kotlin.reflect.KFunction;
import kotlin.reflect.KParameter;
import kotlin.reflect.KType;
import kotlin.reflect.full.KCallables;
import kotlin.reflect.full.KClasses;
import kotlin.reflect.full.KClassifiers;
import kotlin.reflect.full.KTypes;
import kotlin.reflect.jvm.KCallablesJvm;
import kotlin.reflect.jvm.KTypesJvm;
import kotlin.reflect.jvm.ReflectJvmMapping;
Expand Down Expand Up @@ -58,6 +59,12 @@
*/
public abstract class CoroutinesUtils {

private static final KType flowType = KClassifiers.getStarProjectedType(JvmClassMappingKt.getKotlinClass(Flow.class));

private static final KType monoType = KClassifiers.getStarProjectedType(JvmClassMappingKt.getKotlinClass(Mono.class));

private static final KType publisherType = KClassifiers.getStarProjectedType(JvmClassMappingKt.getKotlinClass(Publisher.class));

/**
* Convert a {@link Deferred} instance to a {@link Mono}.
*/
Expand Down Expand Up @@ -137,18 +144,15 @@ public static Publisher<?> invokeSuspendingFunction(CoroutineContext context, Me
.filter(result -> result != Unit.INSTANCE)
.onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);

KClassifier returnType = function.getReturnType().getClassifier();
if (returnType != null) {
if (returnType.equals(JvmClassMappingKt.getKotlinClass(Flow.class))) {
return mono.flatMapMany(CoroutinesUtils::asFlux);
}
else if (returnType.equals(JvmClassMappingKt.getKotlinClass(Mono.class))) {
return mono.flatMap(o -> ((Mono<?>)o));
}
else if (returnType instanceof KClass<?> kClass &&
Publisher.class.isAssignableFrom(JvmClassMappingKt.getJavaClass(kClass))) {
return mono.flatMapMany(o -> ((Publisher<?>)o));
}
KType returnType = function.getReturnType();
if (KTypes.isSubtypeOf(returnType, flowType)) {
return mono.flatMapMany(CoroutinesUtils::asFlux);
}
else if (KTypes.isSubtypeOf(returnType, monoType)) {
return mono.flatMap(o -> ((Mono<?>)o));
}
else if (KTypes.isSubtypeOf(returnType, publisherType)) {
return mono.flatMapMany(o -> ((Publisher<?>)o));
}
return mono;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,29 @@ class CoroutinesUtilsTests {
Assertions.assertThatIllegalArgumentException().isThrownBy { CoroutinesUtils.invokeSuspendingFunction(method, this, "foo") }
}

@Test
fun invokeSuspendingFunctionWithMono() {
val method = CoroutinesUtilsTests::class.java.getDeclaredMethod("suspendingFunctionWithMono", Continuation::class.java)
val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this)
Assertions.assertThat(publisher).isInstanceOf(Mono::class.java)
StepVerifier.create(publisher)
.expectNext("foo")
.expectComplete()
.verify()
}

@Test
fun invokeSuspendingFunctionWithFlux() {
val method = CoroutinesUtilsTests::class.java.getDeclaredMethod("suspendingFunctionWithFlux", Continuation::class.java)
val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this)
Assertions.assertThat(publisher).isInstanceOf(Flux::class.java)
StepVerifier.create(publisher)
.expectNext("foo")
.expectNext("bar")
.expectComplete()
.verify()
}

@Test
fun invokeSuspendingFunctionWithFlow() {
val method = CoroutinesUtilsTests::class.java.getDeclaredMethod("suspendingFunctionWithFlow", Continuation::class.java)
Expand Down Expand Up @@ -213,6 +236,16 @@ class CoroutinesUtilsTests {
return value
}

suspend fun suspendingFunctionWithMono(): Mono<String> {
delay(1)
return Mono.just("foo")
}

suspend fun suspendingFunctionWithFlux(): Flux<String> {
delay(1)
return Flux.just("foo", "bar")
}

suspend fun suspendingFunctionWithFlow(): Flow<String> {
delay(1)
return flowOf("foo", "bar")
Expand Down

0 comments on commit 579dbc4

Please sign in to comment.