Skip to content

Commit

Permalink
Fixes #8678 - Jetty client receives GO_AWAY and continue to send traf…
Browse files Browse the repository at this point in the history
…fic on same connection (#8894)

* Now upon receiving the GOAWAY, the connection is removed from the pool, so it cannot be used by new requests.
* HTTP2Session.removeStream() now happens _after_ notifying HEADERS and DATA events, although the Stream state change still happens before.
This is necessary to avoid that a "close" event is notified before a "headers" or "data" event.

With these changes, the race window of a client acquiring a connection while the server is closing it is reduced, but it is impossible to close it completely.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
(cherry picked from commit fbc23ac)
  • Loading branch information
sbordet committed Nov 14, 2022
1 parent 3f261fa commit 6124ab9
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 24 deletions.
5 changes: 5 additions & 0 deletions jetty-http2/http2-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ public void onHeaders(HeadersFrame frame)
else
{
stream.process(frame, Callback.NOOP);
if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
removeStream(stream);
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyHeaders(stream, frame);
if (closed)
removeStream(stream);
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -61,8 +62,10 @@
import org.eclipse.jetty.util.Promise;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -511,7 +514,8 @@ public void onHeaders(Stream stream, HeadersFrame frame)
}
});
assertTrue(exchangeLatch4.await(5, TimeUnit.SECONDS));
assertEquals(1, session.getStreams().size());
// The stream is removed from the session just after returning from onHeaders(), so wait a little bit.
await().atMost(Duration.ofSeconds(1)).until(() -> session.getStreams().size(), is(1));

// End the first stream.
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2036,30 +2036,33 @@ private void newLocalStream(IStream.FrameList frameList, Promise<Stream> promise

private boolean newRemoteStream(int streamId)
{
boolean created = false;
try (Locker.Lock l = lock.lock())
{
switch (closed)
{
case NOT_CLOSED:
{
HTTP2Session.this.onStreamCreated(streamId);
return true;
created = true;
break;
}
case LOCALLY_CLOSED:
{
// SPEC: streams larger than GOAWAY's lastStreamId are dropped.
if (streamId <= goAwaySent.getLastStreamId())
{
// Allow creation of streams that may have been in-flight.
HTTP2Session.this.onStreamCreated(streamId);
return true;
created = true;
}
return false;
break;
}
default:
return false;
break;
}
}
if (created)
HTTP2Session.this.onStreamCreated(streamId);
return created;
}

private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
Expand Down Expand Up @@ -2108,14 +2111,16 @@ private boolean createLocalStream(Slot slot, List<StreamFrame> frames, Promise<S
private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
{
Throwable failure = null;
boolean reserved = false;
try (Locker.Lock l = lock.lock())
{
// SPEC: cannot create new streams after receiving a GOAWAY.
if (closed == CloseState.NOT_CLOSED)
{
if (streamId <= 0)
{
streamId = localStreamIds.getAndAdd(2);
HTTP2Session.this.onStreamCreated(streamId);
reserved = true;
}
slots.offer(slot);
}
Expand All @@ -2127,9 +2132,16 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
}
}
if (failure == null)
{
if (reserved)
HTTP2Session.this.onStreamCreated(streamId);
return streamId;
fail.accept(failure);
return 0;
}
else
{
fail.accept(failure);
return 0;
}
}

private void freeSlot(Slot slot, int streamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,10 @@ private void onData(DataFrame frame, Callback callback)
}
}

if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);

boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyData(this, frame, callback);
if (closed)
session.removeStream(this);
}

private void onReset(ResetFrame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public HeadersFrame withStreamId(int streamId)
@Override
public String toString()
{
return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream,
priority == null ? "" : String.format("+%s", priority));
return String.format("%s#%d[end=%b,{%s},priority=%s]", super.toString(), getStreamId(), isEndStream(), getMetaData(), getPriority());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,22 @@ private void onServerPreface(Session session)
connectionPromise().succeeded(connection);
}

@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
if (failConnectionPromise(new ClosedChannelException()))
return;
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
connection.remove();
}

@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (failConnectionPromise(new ClosedChannelException()))
return;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
HttpClientTransportOverHTTP2.this.onClose(connection, frame);
}
Expand All @@ -240,7 +250,7 @@ public boolean onIdleTimeout(Session session)
long idleTimeout = ((HTTP2Session)session).getEndPoint().getIdleTimeout();
if (failConnectionPromise(new TimeoutException("Idle timeout expired: " + idleTimeout + " ms")))
return true;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
return connection.onIdleTimeout(idleTimeout);
return true;
Expand All @@ -251,7 +261,7 @@ public void onFailure(Session session, Throwable failure)
{
if (failConnectionPromise(failure))
return;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
connection.close(failure);
}
Expand All @@ -263,5 +273,10 @@ private boolean failConnectionPromise(Throwable failure)
connectionPromise().failed(failure);
return result;
}

private HttpConnectionOverHTTP2 getConnection()
{
return connection.getReference();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public boolean onIdleTimeout(long idleTimeout)
return false;
}

void remove()
{
getHttpDestination().remove(this);
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.http2.client.http;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class GoAwayTest extends AbstractTest
{
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConnectionIsRemovedFromPoolOnGracefulGoAwayReceived(boolean graceful) throws Exception
{
long timeout = 5000;
AtomicReference<Response> responseRef = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
private Stream goAwayStream;

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
String path = request.getURI().getPath();

if ("/prime".equals(path))
{
respond(stream);
}
else if ("/goaway".equals(path))
{
try
{
goAwayStream = stream;

if (graceful)
{
// Send to the client a graceful GOAWAY.
((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
}
else
{
// Send to the client a non-graceful GOAWAY.
stream.getSession().close(ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, null, Callback.NOOP);
}

// Wait for the client to receive the GOAWAY.
Thread.sleep(1000);

// This request will be performed on a different connection.
client.newRequest("localhost", connector.getLocalPort())
.path("/after")
.timeout(timeout / 2, TimeUnit.MILLISECONDS)
.send(result ->
{
responseRef.set(result.getResponse());
responseLatch.countDown();
});
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
else if ("/after".equals(path))
{
// Wait for the /after request to arrive to the server
// before answering to the /goaway request.
// The /goaway request must succeed because it's in
// flight and seen by the server when the GOAWAY happens,
// so it will be completed before closing the connection.
respond(goAwayStream);
respond(stream);
}
return null;
}

private void respond(Stream stream)
{
HTTP2Session session = (HTTP2Session)stream.getSession();
long remotePort = session.getEndPoint().getRemoteAddress().getPort();
HttpFields responseHeaders = new HttpFields();
responseHeaders.putLongField("X-Remote-Port", remotePort);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseHeaders);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
});

Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/prime")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
long primePort = response.getHeaders().getLongField("X-Remote-Port");

response = client.newRequest("localhost", connector.getLocalPort())
.path("/goaway")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
long goAwayPort = response.getHeaders().getLongField("X-Remote-Port");
assertEquals(primePort, goAwayPort);

assertTrue(responseLatch.await(timeout, TimeUnit.MILLISECONDS));
response = responseRef.get();
assertNotNull(response);
assertEquals(HttpStatus.OK_200, response.getStatus());
// The /after request must happen on a different port
// because the first connection has been removed from the pool.
long afterPort = response.getHeaders().getLongField("X-Remote-Port");
assertNotEquals(primePort, afterPort);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ public void onHeaders(HeadersFrame frame)
{
onStreamOpened(stream);
stream.process(frame, Callback.NOOP);
if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
removeStream(stream);
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
if (closed)
removeStream(stream);
}
}
}
Expand All @@ -126,9 +127,10 @@ else if (metaData.isResponse())
if (stream != null)
{
stream.process(frame, Callback.NOOP);
if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
removeStream(stream);
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyHeaders(stream, frame);
if (closed)
removeStream(stream);
}
else
{
Expand Down

0 comments on commit 6124ab9

Please sign in to comment.