Skip to content

Commit

Permalink
fix: rewrite the concatMap operator
Browse files Browse the repository at this point in the history
In this implementation the operator acts as a simple no-queue forwarder
between the subscriber and the current upstream.

There is also a small dose of fine-grained locking around a few
state machine updates that can’t be expressed as non-blocking / compare & swap
operations.
  • Loading branch information
jponge committed Jan 17, 2024
1 parent 461a200 commit dded2d6
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 195 deletions.
9 changes: 8 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.class.removed",
"old": "class io.smallrye.mutiny.operators.multi.MultiConcatMapOp.ConcatMapMainSubscriber<I, O>",
"justification": "Refactoring of internal APIs"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,13 @@ public <O> Multi<O> transformToIterable(Function<? super T, ? extends Iterable<O
@CheckReturnValue
public <O> MultiFlatten<T, O> transformToUni(Function<? super T, Uni<? extends O>> mapper) {
Function<? super T, Uni<? extends O>> actual = Infrastructure.decorate(nonNull(mapper, "mapper"));
Function<? super T, ? extends Publisher<? extends O>> wrapper = res -> actual.apply(res).toMulti();
Function<? super T, ? extends Publisher<? extends O>> wrapper = res -> {
Uni<? extends O> uni = actual.apply(res);
if (uni == null) {
return Multi.createFrom().failure(new NullPointerException(MAPPER_RETURNED_NULL));
}
return uni.toMulti();
};
return new MultiFlatten<>(upstream, wrapper, 1, false);
}

Expand Down

0 comments on commit dded2d6

Please sign in to comment.