Skip to content

Commit

Permalink
filter stack: pass peer name up via recv_initial_metadata batch (grpc…
Browse files Browse the repository at this point in the history
…#31933)

Currently, the peer name is returned with the completion of the
send_initial_metadata op, which does not make sense, because with
retries, we don't actually know the peer name until we complete the
recv_initial_metadata op. This PR changes our code to return the peer
string as an attribute of the recv_initial_metadata op, so that it is
not available to the application until that point. This change may be
user-visible, but since our API docs don't seem to guarantee exactly
when this data will be available, it's not technically a breaking
change.

Note that in the promise-based stack, we were already assuming that the
peer string would be returned as part of the recv_initial_metadata
batch, so this PR helps reduce risk for the promise conversion by making
this semantic change now, thus decoupling it from the promise
conversion.

I have also changed the representation of the string in the metadata
batch to be a `grpc_core::Slice` instead of a `std::string`, so that we
can just take a ref to the string held in the transport instead of
having to copy the whole string for every call.
  • Loading branch information
markdroth authored and XuanWang-Amos committed May 1, 2023
1 parent 5d8ae23 commit 8d4c131
Show file tree
Hide file tree
Showing 26 changed files with 147 additions and 182 deletions.
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3679,7 +3679,6 @@ grpc_cc_library(
"//src/core:chttp2_flow_control",
"//src/core:closure",
"//src/core:error",
"//src/core:gpr_atm",
"//src/core:http2_errors",
"//src/core:http2_settings",
"//src/core:init_internally",
Expand Down
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2225,6 +2225,7 @@ grpc_cc_library(
"lib/transport/bdp_estimator.cc",
],
hdrs = ["lib/transport/bdp_estimator.h"],
external_deps = ["absl/strings"],
deps = [
"time",
"//:gpr",
Expand Down
32 changes: 8 additions & 24 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2721,11 +2721,6 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata);
peer_string_ = batch->payload->send_initial_metadata.peer_string;
original_send_initial_metadata_on_complete_ = batch->on_complete;
GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
SendInitialMetadataOnComplete, this, nullptr);
batch->on_complete = &send_initial_metadata_on_complete_;
}
if (batch->send_message) {
call_attempt_tracer_->RecordSendMessage(
Expand Down Expand Up @@ -2836,21 +2831,6 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
}
}

void ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: got on_complete for send_initial_metadata: "
"error=%s",
self->chand_, self, StatusToString(error).c_str());
}
self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata(
self->peer_string_);
Closure::Run(DEBUG_LOCATION,
self->original_send_initial_metadata_on_complete_, error);
}

void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
Expand Down Expand Up @@ -2941,10 +2921,14 @@ void ClientChannel::LoadBalancedCall::RecordCallCompletion(
if (lb_subchannel_call_tracker_ != nullptr) {
Metadata trailing_metadata(recv_trailing_metadata_);
BackendMetricAccessor backend_metric_accessor(this);
const char* peer_string =
peer_string_ != nullptr
? reinterpret_cast<char*>(gpr_atm_acq_load(peer_string_))
: "";
absl::string_view peer_string;
if (recv_initial_metadata_ != nullptr) {
Slice* peer_string_slice =
recv_initial_metadata_->get_pointer(PeerString());
if (peer_string_slice != nullptr) {
peer_string = peer_string_slice->as_string_view();
}
}
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
peer_string, status, &trailing_metadata, &backend_metric_accessor};
lb_subchannel_call_tracker_->Finish(args);
Expand Down
6 changes: 0 additions & 6 deletions src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/atm.h>

#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
Expand Down Expand Up @@ -530,11 +529,6 @@ class ClientChannel::LoadBalancedCall

RefCountedPtr<SubchannelCall> subchannel_call_;

// For intercepting send_initial_metadata on_complete.
gpr_atm* peer_string_ = nullptr;
grpc_closure send_initial_metadata_on_complete_;
grpc_closure* original_send_initial_metadata_on_complete_ = nullptr;

// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
Expand Down
12 changes: 0 additions & 12 deletions src/core/ext/filters/client_channel/retry_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>

#include "src/core/ext/filters/client_channel/client_channel.h"
Expand Down Expand Up @@ -612,15 +611,6 @@ class RetryFilter::CallData {
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_metadata_batch send_initial_metadata_{arena_};
// TODO(roth): As part of implementing hedging, we'll probably need to
// have the LB call set a value in CallAttempt and then propagate it
// from CallAttempt to the parent call when we commit. Otherwise, we
// may leave this with a value for a peer other than the one we
// actually commit to. Alternatively, maybe see if there's a way to
// change the surface API such that the peer isn't available until
// after initial metadata is received? (Could even change the
// transport API to return this with the recv_initial_metadata op.)
gpr_atm* peer_string_;
// send_message
// When we get a send_message op, we replace the original byte stream
// with a CachingByteStream that caches the slices to a local buffer for
Expand Down Expand Up @@ -1986,7 +1976,6 @@ void RetryFilter::CallData::CallAttempt::BatchData::
batch_.send_initial_metadata = true;
batch_.payload->send_initial_metadata.send_initial_metadata =
&call_attempt_->send_initial_metadata_;
batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
}

void RetryFilter::CallData::CallAttempt::BatchData::
Expand Down Expand Up @@ -2334,7 +2323,6 @@ void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
grpc_metadata_batch* send_initial_metadata =
batch->payload->send_initial_metadata.send_initial_metadata;
send_initial_metadata_ = send_initial_metadata->Copy();
peer_string_ = batch->payload->send_initial_metadata.peer_string;
}
// Set up cache for send_message ops.
if (batch->send_message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
GPR_ASSERT(error.ok());
payload_.send_initial_metadata.send_initial_metadata =
&send_initial_metadata_;
payload_.send_initial_metadata.peer_string = nullptr;
batch_.send_initial_metadata = true;
// Add send_message op.
send_message_.Append(Slice(
Expand All @@ -257,7 +256,6 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
payload_.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_;
payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
payload_.recv_initial_metadata.peer_string = nullptr;
// recv_initial_metadata_ready callback takes ref, handled manually.
call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
payload_.recv_initial_metadata.recv_initial_metadata_ready =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ namespace {
std::string GetCensusSafeClientIpString(
const ClientMetadataHandle& initial_metadata) {
// Find the client URI string.
auto client_uri_str = initial_metadata->get(PeerString());
if (!client_uri_str.has_value()) {
Slice* client_uri_slice = initial_metadata->get_pointer(PeerString());
if (client_uri_slice == nullptr) {
gpr_log(GPR_ERROR,
"Unable to extract client URI string (peer string) from gRPC "
"metadata.");
return "";
}
absl::StatusOr<URI> client_uri = URI::Parse(*client_uri_str);
absl::StatusOr<URI> client_uri =
URI::Parse(client_uri_slice->as_string_view());
if (!client_uri.ok()) {
gpr_log(GPR_ERROR,
"Unable to parse the client URI string (peer string) to a client "
Expand Down
13 changes: 7 additions & 6 deletions src/core/ext/filters/stateful_session/stateful_session_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ void MaybeUpdateServerInitialMetadata(
absl::optional<absl::string_view> cookie_value,
ServerMetadata* server_initial_metadata) {
// Get peer string.
auto peer_string = server_initial_metadata->get(PeerString());
if (!peer_string.has_value()) return; // Nothing we can do.
Slice* peer_string = server_initial_metadata->get_pointer(PeerString());
if (peer_string == nullptr) return; // Nothing we can do.
// If there was no cookie or if the address changed, set the cookie.
if (!cookie_value.has_value() || *peer_string != *cookie_value) {
std::vector<std::string> parts = {
absl::StrCat(*cookie_config->name, "=",
absl::Base64Escape(*peer_string), "; HttpOnly")};
if (!cookie_value.has_value() ||
peer_string->as_string_view() != *cookie_value) {
std::vector<std::string> parts = {absl::StrCat(
*cookie_config->name, "=",
absl::Base64Escape(peer_string->as_string_view()), "; HttpOnly")};
if (!cookie_config->path.empty()) {
parts.emplace_back(absl::StrCat("Path=", cookie_config->path));
}
Expand Down

0 comments on commit 8d4c131

Please sign in to comment.