Skip to content

Commit

Permalink
Merge pull request #3592 from jamezp/backports-6.2
Browse files Browse the repository at this point in the history
Back Ports from main to 6.2
  • Loading branch information
jamezp committed May 3, 2023
2 parents 60d0d94 + bfa8dc7 commit 45c034e
Show file tree
Hide file tree
Showing 23 changed files with 633 additions and 340 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package org.jboss.resteasy.client.jaxrs.engines.jetty;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
Expand All @@ -28,17 +26,14 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.ByteBufferPool;
import org.jboss.logging.Logger;
import org.jboss.resteasy.client.jaxrs.engines.AsyncClientHttpEngine;
import org.jboss.resteasy.client.jaxrs.internal.ClientInvocation;
import org.jboss.resteasy.client.jaxrs.internal.ClientResponse;

public class JettyClientEngine implements AsyncClientHttpEngine {
private static final AtomicBoolean WARN_BUF = new AtomicBoolean();
public static final String REQUEST_TIMEOUT_MS = JettyClientEngine.class + "$RequestTimeout";
// Yeah, this is the Jersey one, but there's no standard one and it makes more sense to reuse than make our own...
public static final String FOLLOW_REDIRECTS = "jersey.config.client.followRedirects";
Expand Down Expand Up @@ -78,7 +73,7 @@ public HostnameVerifier getHostnameVerifier() {

@Override
public ClientResponse invoke(Invocation invocation) {
Future<ClientResponse> future = submit((ClientInvocation) invocation, true, NOP, null);
Future<ClientResponse> future = submit((ClientInvocation) invocation, false, NOP, null);
try {
return future.get(1, TimeUnit.HOURS); // There's already an idle and connect timeout, do we need one here?
} catch (InterruptedException e) {
Expand All @@ -91,15 +86,6 @@ public ClientResponse invoke(Invocation invocation) {
}
}

/**
* Implementation note: due to lack of asynchronous message decoders the request must either be buffered,
* or it must have a {@code null} extractor and type parameter {@code <T>} must be {@link ClientResponse},
* which will read the data through its stream. It is not possible to use the synchronous JAX-RS message
* decoding infrastructure without buffering or spinning up auxiliary threads (arguably more expensive than buffering).
*
* @see AsyncClientHttpEngine#submit(ClientInvocation, boolean, InvocationCallback,
* org.jboss.resteasy.client.jaxrs.engines.AsyncClientHttpEngine.ResultExtractor)
*/
@Override
public <T> Future<T> submit(ClientInvocation invocation, boolean bufIn, InvocationCallback<T> callback,
ResultExtractor<T> extractor) {
Expand All @@ -112,112 +98,75 @@ public <T> CompletableFuture<T> submit(ClientInvocation request, boolean buffere
return doSubmit(request, buffered, null, extractor);
}

private <T> CompletableFuture<T> doSubmit(ClientInvocation invocation, boolean bufIn, InvocationCallback<T> callback,
private <T> CompletableFuture<T> doSubmit(ClientInvocation invocation, boolean buffered, InvocationCallback<T> callback,
ResultExtractor<T> extractor) {
final boolean buffered;
if (!bufIn && extractor != null) {
if (!WARN_BUF.getAndSet(true)) {
Logger LOG = Logger.getLogger(JettyClientEngine.class);
LOG.error("TODO: ResultExtractor is synchronous and may not be used without buffering - forcing buffer mode.");
}
buffered = true;
} else {
buffered = bufIn;
}
final ExecutorService asyncExecutor = invocation.asyncInvocationExecutor();

final Request request = client.newRequest(invocation.getUri());
final CompletableFuture<T> future = new RequestFuture<T>(request);
// readEntity calls releaseConnection which calls cancel, so don't let that interrupt us
final AtomicBoolean completing = new AtomicBoolean();

invocation.getMutableProperties().forEach(request::attribute);
request.method(invocation.getMethod());
invocation.getHeaders().asMap().forEach((h, vs) -> vs.forEach(v -> request.header(h, v)));
request.headers(mutableHeaders -> invocation.getHeaders().asMap()
.forEach((h, vs) -> vs.forEach(v -> mutableHeaders.add(h, v))));
configureTimeout(request);
if (request.getAttributes().get(FOLLOW_REDIRECTS) == Boolean.FALSE) {
request.followRedirects(false);
}

if (invocation.getEntity() != null) {
final ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
invocation.writeRequestBody(os);
} catch (IOException e) {
future.completeExceptionally(e);
if (callback != null) {
callback.failed(e);
final OutputStreamRequestContent contentOut = new OutputStreamRequestContent(
Objects.toString(invocation.getHeaders().getMediaType(), null));
asyncExecutor.execute(() -> {
try {
try (OutputStream bodyOut = contentOut.getOutputStream()) {
invocation.writeRequestBody(bodyOut);
}
} catch (Exception e) { // Also catch any exception thrown from close
future.completeExceptionally(e);
if (callback != null) {
callback.failed(e);
}
}
return future;
}
request.content(new BytesContentProvider(os.toByteArray()));
});
request.body(contentOut);
}

request.send(new Response.Listener.Adapter() {
request.send(new InputStreamResponseListener() {
private ClientResponse cr;
private JettyResponseStream stream = new JettyResponseStream();

@Override
@SuppressWarnings("unchecked")
public void onHeaders(Response response) {
cr = new JettyClientResponse(invocation.getClientConfiguration(), stream, () -> {
if (!completing.get()) {
future.cancel(true);
}
});
super.onHeaders(response);
InputStream inputStream = getInputStream();
cr = new JettyClientResponse(invocation.getClientConfiguration(), inputStream);
cr.setProperties(invocation.getMutableProperties());
cr.setStatus(response.getStatus());
cr.setHeaders(extract(response.getHeaders()));
if (!buffered) {
complete();
}
}

@Override
public void onContent(Response response, ByteBuffer buf) {
final ByteBufferPool bufs = client.getByteBufferPool();
final ByteBuffer copy = bufs.acquire(buf.remaining(), false);
copy.limit(buf.remaining());
copy.put(buf);
copy.flip();
stream.offer(copy, new ReleaseCallback(bufs, copy));
}

@Override
public void onSuccess(Response response) {
if (buffered) {
asyncExecutor.submit(() -> {
try {
complete();
if (buffered) {
cr.bufferEntity();
}
complete(extractor == null ? (T) cr : extractor.extractResult(cr));
} catch (Exception e) {
try {
inputStream.close();
} catch (Exception e1) {
e.addSuppressed(e1);
}
onFailure(response, e);
}
}
}

@SuppressWarnings("unchecked")
private void complete() {
completing.set(true);
if (buffered) {
cr.bufferEntity();
}
// TODO: dangerous cast, see javadoc!
complete(extractor == null ? (T) cr : extractor.extractResult(cr));
});
}

@Override
public void onFailure(Response response, Throwable failure) {
super.onFailure(response, failure);
failed(failure);
}

@Override
public void onComplete(Result result) {
try {
if (extractor != null) {
stream.close();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void complete(T result) {
future.complete(result);
if (callback != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,21 @@
package org.jboss.resteasy.client.jaxrs.engines.jetty;

import java.io.IOException;
import java.io.InputStream;

import org.jboss.resteasy.client.jaxrs.internal.ClientConfiguration;
import org.jboss.resteasy.client.jaxrs.internal.FinalizedClientResponse;
import org.jboss.resteasy.client.jaxrs.internal.ClientResponse;
import org.jboss.resteasy.tracing.RESTEasyTracingLogger;

class JettyClientResponse extends FinalizedClientResponse {
private final Runnable cancel;
private InputStream stream;
class JettyClientResponse extends ClientResponse {

JettyClientResponse(final ClientConfiguration configuration, final InputStream stream, final Runnable cancel) {
JettyClientResponse(final ClientConfiguration configuration, final InputStream stream) {
super(configuration, RESTEasyTracingLogger.empty());
this.cancel = cancel;
this.stream = stream;
setInputStream(stream);
}

@Override
protected InputStream getInputStream() {
return stream;
}

@Override
protected void setInputStream(InputStream is) {
stream = is;
protected void setInputStream(final InputStream is) {
this.is = is;
resetEntity();
}

@Override
public void releaseConnection() throws IOException {
releaseConnection(false);
}

@Override
public void releaseConnection(boolean consumeInputStream) throws IOException {
InputStream is = getInputStream();
if (is != null && consumeInputStream) {
while (is.read() > 0) {
}
}
cancel.run();
}

}

This file was deleted.

0 comments on commit 45c034e

Please sign in to comment.