Skip to content

Commit

Permalink
Fix resource problems in ReactorNettyClientRequestFactory
Browse files Browse the repository at this point in the history
This commit fixes two issues in ReactorNettyClientRequestFactory and
related types:

- Ensure that the response body is drained and closed when the response
  itself is closed, and remove the disposing of the connection, as this
  will disable the connection pool.
- Schedule blocking calls on a different scheduler, defaulting to the
  bounded elastic scheduler.

Closes spring-projectsgh-32528
  • Loading branch information
poutsma committed Apr 5, 2024
1 parent d955549 commit 96c14d1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
Expand Down Expand Up @@ -56,15 +57,18 @@ final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest

private final Duration readTimeout;

private final Scheduler scheduler;


public ReactorNettyClientRequest(HttpClient httpClient, URI uri, HttpMethod method,
Duration exchangeTimeout, Duration readTimeout) {
Duration exchangeTimeout, Duration readTimeout, Scheduler scheduler) {

this.httpClient = httpClient;
this.method = method;
this.uri = uri;
this.exchangeTimeout = exchangeTimeout;
this.readTimeout = readTimeout;
this.scheduler = scheduler;
}


Expand All @@ -90,8 +94,10 @@ protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body
ReactorNettyClientResponse result = requestSender.send((reactorRequest, nettyOutbound) ->
send(headers, body, reactorRequest, nettyOutbound))
.responseConnection((reactorResponse, connection) ->
Mono.just(new ReactorNettyClientResponse(reactorResponse, connection, this.readTimeout)))
Mono.just(new ReactorNettyClientResponse(reactorResponse, connection, this.readTimeout,
this.scheduler)))
.next()
.subscribeOn(this.scheduler)
.block(this.exchangeTimeout);

if (result == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.channel.ChannelOption;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
Expand Down Expand Up @@ -66,6 +68,8 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor

private final Object lifecycleMonitor = new Object();

private Scheduler scheduler = Schedulers.boundedElastic();


/**
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
Expand Down Expand Up @@ -192,10 +196,19 @@ public void setExchangeTimeout(Duration exchangeTimeout) {
this.exchangeTimeout = exchangeTimeout;
}

/**
* Set the scheduler to use for offloading the main Reactor event loop.
* <p>Default is {@link Schedulers#boundedElastic()}.
*/
public void setScheduler(Scheduler scheduler) {
Assert.notNull(scheduler, "Scheduler must not be null");
this.scheduler = scheduler;
}

@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout,
this.scheduler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,13 +20,15 @@
import java.io.InputStream;
import java.time.Duration;

import reactor.core.scheduler.Scheduler;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientResponse;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.support.Netty4HeadersAdapter;
import org.springframework.lang.Nullable;
import org.springframework.util.StreamUtils;

/**
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
Expand All @@ -44,15 +46,18 @@ final class ReactorNettyClientResponse implements ClientHttpResponse {

private final Duration readTimeout;

private final Scheduler scheduler;

@Nullable
private volatile InputStream body;


public ReactorNettyClientResponse(HttpClientResponse response, Connection connection, Duration readTimeout) {
public ReactorNettyClientResponse(HttpClientResponse response, Connection connection, Duration readTimeout, Scheduler scheduler) {
this.response = response;
this.connection = connection;
this.readTimeout = readTimeout;
this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders()));
this.scheduler = scheduler;
}


Expand All @@ -79,7 +84,11 @@ public InputStream getBody() throws IOException {
}

body = this.connection.inbound().receive()
.aggregate().asInputStream().block(this.readTimeout);
.aggregate()
.asInputStream()
.subscribeOn(this.scheduler)
.block(this.readTimeout);

if (body == null) {
throw new IOException("Could not receive body");
}
Expand All @@ -89,7 +98,13 @@ public InputStream getBody() throws IOException {

@Override
public void close() {
this.connection.dispose();
try{
InputStream body = getBody();
StreamUtils.drain(body);
body.close();
}
catch (IOException ignored) {
}
}

}

0 comments on commit 96c14d1

Please sign in to comment.