Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter stack: pass peer name up via recv_initial_metadata batch #31933

Merged
merged 11 commits into from
Feb 16, 2023
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3654,7 +3654,6 @@ grpc_cc_library(
"//src/core:chttp2_flow_control",
"//src/core:closure",
"//src/core:error",
"//src/core:gpr_atm",
"//src/core:http2_errors",
"//src/core:http2_settings",
"//src/core:init_internally",
Expand Down
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2229,6 +2229,7 @@ grpc_cc_library(
"lib/transport/bdp_estimator.cc",
],
hdrs = ["lib/transport/bdp_estimator.h"],
external_deps = ["absl/strings"],
deps = [
"time",
"//:gpr",
Expand Down
32 changes: 8 additions & 24 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2722,11 +2722,6 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata);
peer_string_ = batch->payload->send_initial_metadata.peer_string;
original_send_initial_metadata_on_complete_ = batch->on_complete;
GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
SendInitialMetadataOnComplete, this, nullptr);
batch->on_complete = &send_initial_metadata_on_complete_;
}
if (batch->send_message) {
call_attempt_tracer_->RecordSendMessage(
Expand Down Expand Up @@ -2837,21 +2832,6 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
}
}

void ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: got on_complete for send_initial_metadata: "
"error=%s",
self->chand_, self, StatusToString(error).c_str());
}
self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata(
self->peer_string_);
Closure::Run(DEBUG_LOCATION,
self->original_send_initial_metadata_on_complete_, error);
}

void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
Expand Down Expand Up @@ -2942,10 +2922,14 @@ void ClientChannel::LoadBalancedCall::RecordCallCompletion(
if (lb_subchannel_call_tracker_ != nullptr) {
Metadata trailing_metadata(recv_trailing_metadata_);
BackendMetricAccessor backend_metric_accessor(this);
const char* peer_string =
peer_string_ != nullptr
? reinterpret_cast<char*>(gpr_atm_acq_load(peer_string_))
: "";
absl::string_view peer_string;
if (recv_initial_metadata_ != nullptr) {
Slice* peer_string_slice =
recv_initial_metadata_->get_pointer(PeerString());
if (peer_string_slice != nullptr) {
peer_string = peer_string_slice->as_string_view();
}
}
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
peer_string, status, &trailing_metadata, &backend_metric_accessor};
lb_subchannel_call_tracker_->Finish(args);
Expand Down
6 changes: 0 additions & 6 deletions src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/atm.h>

#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
Expand Down Expand Up @@ -530,11 +529,6 @@ class ClientChannel::LoadBalancedCall

RefCountedPtr<SubchannelCall> subchannel_call_;

// For intercepting send_initial_metadata on_complete.
gpr_atm* peer_string_ = nullptr;
grpc_closure send_initial_metadata_on_complete_;
grpc_closure* original_send_initial_metadata_on_complete_ = nullptr;

// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
Expand Down
12 changes: 0 additions & 12 deletions src/core/ext/filters/client_channel/retry_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>

#include "src/core/ext/filters/client_channel/client_channel.h"
Expand Down Expand Up @@ -615,15 +614,6 @@ class RetryFilter::CallData {
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_metadata_batch send_initial_metadata_{arena_};
// TODO(roth): As part of implementing hedging, we'll probably need to
// have the LB call set a value in CallAttempt and then propagate it
// from CallAttempt to the parent call when we commit. Otherwise, we
// may leave this with a value for a peer other than the one we
// actually commit to. Alternatively, maybe see if there's a way to
// change the surface API such that the peer isn't available until
// after initial metadata is received? (Could even change the
// transport API to return this with the recv_initial_metadata op.)
gpr_atm* peer_string_;
// send_message
// When we get a send_message op, we replace the original byte stream
// with a CachingByteStream that caches the slices to a local buffer for
Expand Down Expand Up @@ -1989,7 +1979,6 @@ void RetryFilter::CallData::CallAttempt::BatchData::
batch_.send_initial_metadata = true;
batch_.payload->send_initial_metadata.send_initial_metadata =
&call_attempt_->send_initial_metadata_;
batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
}

void RetryFilter::CallData::CallAttempt::BatchData::
Expand Down Expand Up @@ -2337,7 +2326,6 @@ void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
grpc_metadata_batch* send_initial_metadata =
batch->payload->send_initial_metadata.send_initial_metadata;
send_initial_metadata_ = send_initial_metadata->Copy();
peer_string_ = batch->payload->send_initial_metadata.peer_string;
}
// Set up cache for send_message ops.
if (batch->send_message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
GPR_ASSERT(error.ok());
payload_.send_initial_metadata.send_initial_metadata =
&send_initial_metadata_;
payload_.send_initial_metadata.peer_string = nullptr;
batch_.send_initial_metadata = true;
// Add send_message op.
send_message_.Append(Slice(
Expand All @@ -257,7 +256,6 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
payload_.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_;
payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
payload_.recv_initial_metadata.peer_string = nullptr;
// recv_initial_metadata_ready callback takes ref, handled manually.
call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
payload_.recv_initial_metadata.recv_initial_metadata_ready =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ namespace {
std::string GetCensusSafeClientIpString(
const ClientMetadataHandle& initial_metadata) {
// Find the client URI string.
auto client_uri_str = initial_metadata->get(PeerString());
if (!client_uri_str.has_value()) {
Slice* client_uri_slice = initial_metadata->get_pointer(PeerString());
if (client_uri_slice == nullptr) {
gpr_log(GPR_ERROR,
"Unable to extract client URI string (peer string) from gRPC "
"metadata.");
return "";
}
absl::StatusOr<URI> client_uri = URI::Parse(*client_uri_str);
absl::StatusOr<URI> client_uri =
URI::Parse(client_uri_slice->as_string_view());
if (!client_uri.ok()) {
gpr_log(GPR_ERROR,
"Unable to parse the client URI string (peer string) to a client "
Expand Down
13 changes: 7 additions & 6 deletions src/core/ext/filters/stateful_session/stateful_session_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ void MaybeUpdateServerInitialMetadata(
absl::optional<absl::string_view> cookie_value,
ServerMetadata* server_initial_metadata) {
// Get peer string.
auto peer_string = server_initial_metadata->get(PeerString());
if (!peer_string.has_value()) return; // Nothing we can do.
Slice* peer_string = server_initial_metadata->get_pointer(PeerString());
if (peer_string == nullptr) return; // Nothing we can do.
// If there was no cookie or if the address changed, set the cookie.
if (!cookie_value.has_value() || *peer_string != *cookie_value) {
std::vector<std::string> parts = {
absl::StrCat(*cookie_config->name, "=",
absl::Base64Escape(*peer_string), "; HttpOnly")};
if (!cookie_value.has_value() ||
peer_string->as_string_view() != *cookie_value) {
std::vector<std::string> parts = {absl::StrCat(
*cookie_config->name, "=",
absl::Base64Escape(peer_string->as_string_view()), "; HttpOnly")};
if (!cookie_config->path.empty()) {
parts.emplace_back(absl::StrCat("Path=", cookie_config->path));
}
Expand Down