Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[http2] Dont drop connections on metadata limit exceeded #32309

Merged
merged 4 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/core/ext/transport/chttp2/transport/hpack_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1227,13 +1227,19 @@ class HPackParser::Parser {
absl::StrCat("; adding ", md.key(), " (length ", md.transport_size(),
"B)", summary.empty() ? "" : " to ", summary);
if (metadata_buffer_ != nullptr) metadata_buffer_->Clear();
// StreamId is used as a signal to skip this stream but keep the connection
// alive
return input_->MaybeSetErrorAndReturn(
[this, summary = std::move(summary)] {
return grpc_error_set_int(
GRPC_ERROR_CREATE(absl::StrCat(
"received initial metadata size exceeds limit (",
*frame_length_, " vs. ", metadata_size_limit_, ")", summary)),
StatusIntProperty::kRpcStatus, GRPC_STATUS_RESOURCE_EXHAUSTED);
grpc_error_set_int(
GRPC_ERROR_CREATE(absl::StrCat(
"received initial metadata size exceeds limit (",
*frame_length_, " vs. ", metadata_size_limit_, ")",
summary)),
StatusIntProperty::kRpcStatus,
GRPC_STATUS_RESOURCE_EXHAUSTED),
StatusIntProperty::kStreamId, 0);
},
false);
}
Expand Down
2 changes: 0 additions & 2 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,6 @@ struct grpc_chttp2_stream {

grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();

/// saw some stream level error
grpc_error_handle forced_close_error;
/// how many header frames have we received?
uint8_t header_frames_received = 0;
/// number of bytes received - reset at end of parse thread execution
Expand Down
7 changes: 3 additions & 4 deletions src/core/ext/transport/chttp2/transport/parsing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "internal.h"

#include <grpc/slice.h>
#include <grpc/support/log.h>
Expand Down Expand Up @@ -59,6 +60,7 @@
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport.h"

using grpc_core::HPackParser;
Expand Down Expand Up @@ -807,10 +809,7 @@ static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
&unused)) {
grpc_chttp2_parsing_become_skip_parser(t);
if (s) {
s->forced_close_error = err;
grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id,
GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing);
grpc_chttp2_cancel_stream(t, s, std::exchange(err, absl::OkStatus()));
}
}
return err;
Expand Down
228 changes: 116 additions & 112 deletions test/core/end2end/tests/large_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,6 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
// Server responds with metadata larger than what the client accepts.
static void test_request_with_bad_large_metadata_response(
grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_metadata meta;
const size_t large_size = 64 * 1024;
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_MAX_METADATA_SIZE);
Expand All @@ -262,115 +258,123 @@ static void test_request_with_bad_large_metadata_response(
grpc_end2end_test_fixture f = begin_test(
config, "test_request_with_bad_large_metadata_response", &args, &args);
grpc_core::CqVerifier cqv(f.cq);
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;

gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"), nullptr,
deadline, nullptr);
GPR_ASSERT(c);

meta.key = grpc_slice_from_static_string("key");
meta.value = grpc_slice_malloc(large_size);
memset(GRPC_SLICE_START_PTR(meta.value), 'a', large_size);

grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);

memset(ops, 0, sizeof(ops));
// Client: send request.
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);

error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);

cqv.Expect(tag(101), true);
cqv.Verify();

memset(ops, 0, sizeof(ops));
// Server: send large initial metadata
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 1;
op->data.send_initial_metadata.metadata = &meta;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
grpc_slice status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(102), true);
cqv.Expect(tag(1), true);
cqv.Verify();

GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
const char* expected_error = "received initial metadata size exceeds limit";
grpc_slice actual_error =
grpc_slice_split_head(&details, strlen(expected_error));
GPR_ASSERT(0 == grpc_slice_str_cmp(actual_error, expected_error));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));

grpc_slice_unref(actual_error);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);

grpc_call_unref(c);
grpc_call_unref(s);

grpc_slice_unref(meta.value);
for (int i = 0; i < 10; i++) {
grpc_call* c;
grpc_call* s;
grpc_metadata meta;
const size_t large_size = 64 * 1024;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;

gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
f.cq, grpc_slice_from_static_string("/foo"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);

meta.key = grpc_slice_from_static_string("key");
meta.value = grpc_slice_malloc(large_size);
memset(GRPC_SLICE_START_PTR(meta.value), 'a', large_size);

grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);

memset(ops, 0, sizeof(ops));
// Client: send request.
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);

error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);

cqv.Expect(tag(101), true);
cqv.Verify();

memset(ops, 0, sizeof(ops));
// Server: send large initial metadata
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 1;
op->data.send_initial_metadata.metadata = &meta;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
grpc_slice status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops),
tag(102), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(102), true);
cqv.Expect(tag(1), true);
cqv.Verify();

GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
const char* expected_error = "received initial metadata size exceeds limit";
grpc_slice actual_error =
grpc_slice_split_head(&details, strlen(expected_error));
GPR_ASSERT(0 == grpc_slice_str_cmp(actual_error, expected_error));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));

grpc_slice_unref(actual_error);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);

grpc_call_unref(c);
grpc_call_unref(s);

grpc_slice_unref(meta.value);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe also do a ping to make sure that the channel is still connected?


end_test(&f);
config.tear_down_data(&f);
Expand Down