Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Back Ports from main to 6.2 #3592

Merged
merged 10 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package org.jboss.resteasy.client.jaxrs.engines.jetty;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
Expand All @@ -28,17 +26,14 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.ByteBufferPool;
import org.jboss.logging.Logger;
import org.jboss.resteasy.client.jaxrs.engines.AsyncClientHttpEngine;
import org.jboss.resteasy.client.jaxrs.internal.ClientInvocation;
import org.jboss.resteasy.client.jaxrs.internal.ClientResponse;

public class JettyClientEngine implements AsyncClientHttpEngine {
private static final AtomicBoolean WARN_BUF = new AtomicBoolean();
public static final String REQUEST_TIMEOUT_MS = JettyClientEngine.class + "$RequestTimeout";
// Yeah, this is the Jersey one, but there's no standard one and it makes more sense to reuse than make our own...
public static final String FOLLOW_REDIRECTS = "jersey.config.client.followRedirects";
Expand Down Expand Up @@ -78,7 +73,7 @@ public HostnameVerifier getHostnameVerifier() {

@Override
public ClientResponse invoke(Invocation invocation) {
Future<ClientResponse> future = submit((ClientInvocation) invocation, true, NOP, null);
Future<ClientResponse> future = submit((ClientInvocation) invocation, false, NOP, null);
try {
return future.get(1, TimeUnit.HOURS); // There's already an idle and connect timeout, do we need one here?
} catch (InterruptedException e) {
Expand All @@ -91,15 +86,6 @@ public ClientResponse invoke(Invocation invocation) {
}
}

/**
* Implementation note: due to lack of asynchronous message decoders the request must either be buffered,
* or it must have a {@code null} extractor and type parameter {@code <T>} must be {@link ClientResponse},
* which will read the data through its stream. It is not possible to use the synchronous JAX-RS message
* decoding infrastructure without buffering or spinning up auxiliary threads (arguably more expensive than buffering).
*
* @see AsyncClientHttpEngine#submit(ClientInvocation, boolean, InvocationCallback,
* org.jboss.resteasy.client.jaxrs.engines.AsyncClientHttpEngine.ResultExtractor)
*/
@Override
public <T> Future<T> submit(ClientInvocation invocation, boolean bufIn, InvocationCallback<T> callback,
ResultExtractor<T> extractor) {
Expand All @@ -112,112 +98,75 @@ public <T> CompletableFuture<T> submit(ClientInvocation request, boolean buffere
return doSubmit(request, buffered, null, extractor);
}

private <T> CompletableFuture<T> doSubmit(ClientInvocation invocation, boolean bufIn, InvocationCallback<T> callback,
private <T> CompletableFuture<T> doSubmit(ClientInvocation invocation, boolean buffered, InvocationCallback<T> callback,
ResultExtractor<T> extractor) {
final boolean buffered;
if (!bufIn && extractor != null) {
if (!WARN_BUF.getAndSet(true)) {
Logger LOG = Logger.getLogger(JettyClientEngine.class);
LOG.error("TODO: ResultExtractor is synchronous and may not be used without buffering - forcing buffer mode.");
}
buffered = true;
} else {
buffered = bufIn;
}
final ExecutorService asyncExecutor = invocation.asyncInvocationExecutor();

final Request request = client.newRequest(invocation.getUri());
final CompletableFuture<T> future = new RequestFuture<T>(request);
// readEntity calls releaseConnection which calls cancel, so don't let that interrupt us
final AtomicBoolean completing = new AtomicBoolean();

invocation.getMutableProperties().forEach(request::attribute);
request.method(invocation.getMethod());
invocation.getHeaders().asMap().forEach((h, vs) -> vs.forEach(v -> request.header(h, v)));
request.headers(mutableHeaders -> invocation.getHeaders().asMap()
.forEach((h, vs) -> vs.forEach(v -> mutableHeaders.add(h, v))));
configureTimeout(request);
if (request.getAttributes().get(FOLLOW_REDIRECTS) == Boolean.FALSE) {
request.followRedirects(false);
}

if (invocation.getEntity() != null) {
final ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
invocation.writeRequestBody(os);
} catch (IOException e) {
future.completeExceptionally(e);
if (callback != null) {
callback.failed(e);
final OutputStreamRequestContent contentOut = new OutputStreamRequestContent(
Objects.toString(invocation.getHeaders().getMediaType(), null));
asyncExecutor.execute(() -> {
try {
try (OutputStream bodyOut = contentOut.getOutputStream()) {
invocation.writeRequestBody(bodyOut);
}
} catch (Exception e) { // Also catch any exception thrown from close
future.completeExceptionally(e);
if (callback != null) {
callback.failed(e);
}
}
return future;
}
request.content(new BytesContentProvider(os.toByteArray()));
});
request.body(contentOut);
}

request.send(new Response.Listener.Adapter() {
request.send(new InputStreamResponseListener() {
private ClientResponse cr;
private JettyResponseStream stream = new JettyResponseStream();

@Override
@SuppressWarnings("unchecked")
public void onHeaders(Response response) {
cr = new JettyClientResponse(invocation.getClientConfiguration(), stream, () -> {
if (!completing.get()) {
future.cancel(true);
}
});
super.onHeaders(response);
InputStream inputStream = getInputStream();
cr = new JettyClientResponse(invocation.getClientConfiguration(), inputStream);
cr.setProperties(invocation.getMutableProperties());
cr.setStatus(response.getStatus());
cr.setHeaders(extract(response.getHeaders()));
if (!buffered) {
complete();
}
}

@Override
public void onContent(Response response, ByteBuffer buf) {
final ByteBufferPool bufs = client.getByteBufferPool();
final ByteBuffer copy = bufs.acquire(buf.remaining(), false);
copy.limit(buf.remaining());
copy.put(buf);
copy.flip();
stream.offer(copy, new ReleaseCallback(bufs, copy));
}

@Override
public void onSuccess(Response response) {
if (buffered) {
asyncExecutor.submit(() -> {
try {
complete();
if (buffered) {
cr.bufferEntity();
}
complete(extractor == null ? (T) cr : extractor.extractResult(cr));
} catch (Exception e) {
try {
inputStream.close();
} catch (Exception e1) {
e.addSuppressed(e1);
}
onFailure(response, e);
}
}
}

@SuppressWarnings("unchecked")
private void complete() {
completing.set(true);
if (buffered) {
cr.bufferEntity();
}
// TODO: dangerous cast, see javadoc!
complete(extractor == null ? (T) cr : extractor.extractResult(cr));
});
}

@Override
public void onFailure(Response response, Throwable failure) {
super.onFailure(response, failure);
failed(failure);
}

@Override
public void onComplete(Result result) {
try {
if (extractor != null) {
stream.close();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void complete(T result) {
future.complete(result);
if (callback != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,21 @@
package org.jboss.resteasy.client.jaxrs.engines.jetty;

import java.io.IOException;
import java.io.InputStream;

import org.jboss.resteasy.client.jaxrs.internal.ClientConfiguration;
import org.jboss.resteasy.client.jaxrs.internal.FinalizedClientResponse;
import org.jboss.resteasy.client.jaxrs.internal.ClientResponse;
import org.jboss.resteasy.tracing.RESTEasyTracingLogger;

class JettyClientResponse extends FinalizedClientResponse {
private final Runnable cancel;
private InputStream stream;
class JettyClientResponse extends ClientResponse {

JettyClientResponse(final ClientConfiguration configuration, final InputStream stream, final Runnable cancel) {
JettyClientResponse(final ClientConfiguration configuration, final InputStream stream) {
super(configuration, RESTEasyTracingLogger.empty());
this.cancel = cancel;
this.stream = stream;
setInputStream(stream);
}

@Override
protected InputStream getInputStream() {
return stream;
}

@Override
protected void setInputStream(InputStream is) {
stream = is;
protected void setInputStream(final InputStream is) {
this.is = is;
resetEntity();
}

@Override
public void releaseConnection() throws IOException {
releaseConnection(false);
}

@Override
public void releaseConnection(boolean consumeInputStream) throws IOException {
InputStream is = getInputStream();
if (is != null && consumeInputStream) {
while (is.read() > 0) {
}
}
cancel.run();
}

}

This file was deleted.