Skip to content

Commit

Permalink
Update: Add snappy support on HttpContentDecoder (#13312)
Browse files Browse the repository at this point in the history
Motivation:
I want to allow logstash http input to be able to read snappy compressed
data.

Modifications:
To allow the HttpContentDecoder to support snappy it created a new
control flow to test for snappy content encoding.

If it is present, we are using a specific snappy decompressor to read
the data.

In order to make sure it was working I also included a e2e test on the
behaior of HttpContentDecoder.

Result:
We should be able to send snappy traffic to a server using
HttpContentDocder and be ale to uncompresse the traffic.

Co-authored-by: Aayush Atharva <hyperx.pro@outlook.com>
Co-authored-by: Norman Maurer <norman_maurer@apple.com>
  • Loading branch information
3 people committed Apr 3, 2023
1 parent 76d83fe commit cd54019
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;

import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.compression.BrotliDecoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.compression.SnappyFrameDecoder;

/**
* Decompresses an {@link HttpMessage} and an {@link HttpContent} compressed in
Expand Down Expand Up @@ -72,6 +74,11 @@ protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Excep
ctx.channel().config(), new BrotliDecoder());
}

if (SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), new SnappyFrameDecoder());
}

// 'identity' or unsupported
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ public final class HttpHeaderValues {
* {@code "br"}
*/
public static final AsciiString BR = AsciiString.cached("br");

/**
* {@code "snappy"}
*/
public static final AsciiString SNAPPY = AsciiString.cached("snappy");

/**
* {@code "zstd"}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class HttpContentDecoderTest {
31, -117, 8, 8, 12, 3, -74, 84, 0, 3, 50, 0, -53, 72, -51, -55, -55,
-41, 81, 40, -49, 47, -54, 73, 1, 0, 58, 114, -85, -1, 12, 0, 0, 0
};
private static final byte[] SNAPPY_HELLO_WORLD = {
-1, 6, 0, 0, 115, 78, 97, 80, 112, 89, 1, 16, 0, 0, 11, -66, -63,
-22, 104, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100
};
private static final String SAMPLE_STRING = "Hello, I am Meow!. A small kitten. :)" +
"I sleep all day, and meow all night.";
private static final byte[] SAMPLE_BZ_BYTES = new byte[]{27, 72, 0, 0, -60, -102, 91, -86, 103, 20,
Expand Down Expand Up @@ -147,17 +151,45 @@ public void testChunkedRequestDecompression() {
}

@Test
public void testResponseDecompression() {
public void testSnappyResponseDecompression() {
// baseline test: response decoder, content decompressor && request aggregator work as expected
HttpResponseDecoder decoder = new HttpResponseDecoder();
HttpContentDecoder decompressor = new HttpContentDecompressor();
HttpObjectAggregator aggregator = new HttpObjectAggregator(1024);
EmbeddedChannel channel = new EmbeddedChannel(decoder, decompressor, aggregator);

String headers = "HTTP/1.1 200 OK\r\n" +
"Content-Length: " + GZ_HELLO_WORLD.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"Content-Length: " + SNAPPY_HELLO_WORLD.length + "\r\n" +
"Content-Encoding: snappy\r\n" +
"\r\n";
ByteBuf buf = Unpooled.copiedBuffer(headers.getBytes(CharsetUtil.UTF_8), SNAPPY_HELLO_WORLD);
assertTrue(channel.writeInbound(buf));

Object o = channel.readInbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
FullHttpResponse resp = (FullHttpResponse) o;
assertEquals(HELLO_WORLD.length(), resp.headers().getInt(HttpHeaderNames.CONTENT_LENGTH).intValue());
assertEquals(HELLO_WORLD, resp.content().toString(CharsetUtil.UTF_8));
resp.release();

assertHasInboundMessages(channel, false);
assertHasOutboundMessages(channel, false);
assertFalse(channel.finish()); // assert that no messages are left in channel
}

@Test
public void testResponseDecompression() {
// baseline test: response decoder, content decompressor && request aggregator work as expected
HttpResponseDecoder decoder = new HttpResponseDecoder();
HttpContentDecoder decompressor = new HttpContentDecompressor();
HttpObjectAggregator aggregator = new HttpObjectAggregator(1024);

EmbeddedChannel channel = new EmbeddedChannel(decoder, decompressor, aggregator);

String headers = "HTTP/1.1 200 OK\r\n" +
"Content-Length: " + GZ_HELLO_WORLD.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"\r\n";
ByteBuf buf = Unpooled.copiedBuffer(headers.getBytes(CharsetUtil.US_ASCII), GZ_HELLO_WORLD);
assertTrue(channel.writeInbound(buf));

Expand Down

0 comments on commit cd54019

Please sign in to comment.