Skip to content

Commit

Permalink
Revert "filter stack: pass peer name up via recv_initial_metadata bat…
Browse files Browse the repository at this point in the history
…ch (#31933)"

This reverts commit 3a94e50.
  • Loading branch information
markdroth committed Feb 16, 2023
1 parent 1c5db34 commit 24921c0
Show file tree
Hide file tree
Showing 26 changed files with 182 additions and 147 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3679,6 +3679,7 @@ grpc_cc_library(
"//src/core:chttp2_flow_control",
"//src/core:closure",
"//src/core:error",
"//src/core:gpr_atm",
"//src/core:http2_errors",
"//src/core:http2_settings",
"//src/core:init_internally",
Expand Down
1 change: 0 additions & 1 deletion src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2225,7 +2225,6 @@ grpc_cc_library(
"lib/transport/bdp_estimator.cc",
],
hdrs = ["lib/transport/bdp_estimator.h"],
external_deps = ["absl/strings"],
deps = [
"time",
"//:gpr",
Expand Down
32 changes: 24 additions & 8 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2720,6 +2720,11 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata);
peer_string_ = batch->payload->send_initial_metadata.peer_string;
original_send_initial_metadata_on_complete_ = batch->on_complete;
GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
SendInitialMetadataOnComplete, this, nullptr);
batch->on_complete = &send_initial_metadata_on_complete_;
}
if (batch->send_message) {
call_attempt_tracer_->RecordSendMessage(
Expand Down Expand Up @@ -2830,6 +2835,21 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
}
}

void ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: got on_complete for send_initial_metadata: "
"error=%s",
self->chand_, self, StatusToString(error).c_str());
}
self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata(
self->peer_string_);
Closure::Run(DEBUG_LOCATION,
self->original_send_initial_metadata_on_complete_, error);
}

void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
Expand Down Expand Up @@ -2920,14 +2940,10 @@ void ClientChannel::LoadBalancedCall::RecordCallCompletion(
if (lb_subchannel_call_tracker_ != nullptr) {
Metadata trailing_metadata(recv_trailing_metadata_);
BackendMetricAccessor backend_metric_accessor(this);
absl::string_view peer_string;
if (recv_initial_metadata_ != nullptr) {
Slice* peer_string_slice =
recv_initial_metadata_->get_pointer(PeerString());
if (peer_string_slice != nullptr) {
peer_string = peer_string_slice->as_string_view();
}
}
const char* peer_string =
peer_string_ != nullptr
? reinterpret_cast<char*>(gpr_atm_acq_load(peer_string_))
: "";
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
peer_string, status, &trailing_metadata, &backend_metric_accessor};
lb_subchannel_call_tracker_->Finish(args);
Expand Down
6 changes: 6 additions & 0 deletions src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/atm.h>

#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
Expand Down Expand Up @@ -529,6 +530,11 @@ class ClientChannel::LoadBalancedCall

RefCountedPtr<SubchannelCall> subchannel_call_;

// For intercepting send_initial_metadata on_complete.
gpr_atm* peer_string_ = nullptr;
grpc_closure send_initial_metadata_on_complete_;
grpc_closure* original_send_initial_metadata_on_complete_ = nullptr;

// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
Expand Down
12 changes: 12 additions & 0 deletions src/core/ext/filters/client_channel/retry_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>

#include "src/core/ext/filters/client_channel/client_channel.h"
Expand Down Expand Up @@ -611,6 +612,15 @@ class RetryFilter::CallData {
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_metadata_batch send_initial_metadata_{arena_};
// TODO(roth): As part of implementing hedging, we'll probably need to
// have the LB call set a value in CallAttempt and then propagate it
// from CallAttempt to the parent call when we commit. Otherwise, we
// may leave this with a value for a peer other than the one we
// actually commit to. Alternatively, maybe see if there's a way to
// change the surface API such that the peer isn't available until
// after initial metadata is received? (Could even change the
// transport API to return this with the recv_initial_metadata op.)
gpr_atm* peer_string_;
// send_message
// When we get a send_message op, we replace the original byte stream
// with a CachingByteStream that caches the slices to a local buffer for
Expand Down Expand Up @@ -1976,6 +1986,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::
batch_.send_initial_metadata = true;
batch_.payload->send_initial_metadata.send_initial_metadata =
&call_attempt_->send_initial_metadata_;
batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
}

void RetryFilter::CallData::CallAttempt::BatchData::
Expand Down Expand Up @@ -2323,6 +2334,7 @@ void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
grpc_metadata_batch* send_initial_metadata =
batch->payload->send_initial_metadata.send_initial_metadata;
send_initial_metadata_ = send_initial_metadata->Copy();
peer_string_ = batch->payload->send_initial_metadata.peer_string;
}
// Set up cache for send_message ops.
if (batch->send_message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
GPR_ASSERT(error.ok());
payload_.send_initial_metadata.send_initial_metadata =
&send_initial_metadata_;
payload_.send_initial_metadata.peer_string = nullptr;
batch_.send_initial_metadata = true;
// Add send_message op.
send_message_.Append(Slice(
Expand All @@ -256,6 +257,7 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
payload_.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_;
payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
payload_.recv_initial_metadata.peer_string = nullptr;
// recv_initial_metadata_ready callback takes ref, handled manually.
call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
payload_.recv_initial_metadata.recv_initial_metadata_ready =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ namespace {
std::string GetCensusSafeClientIpString(
const ClientMetadataHandle& initial_metadata) {
// Find the client URI string.
Slice* client_uri_slice = initial_metadata->get_pointer(PeerString());
if (client_uri_slice == nullptr) {
auto client_uri_str = initial_metadata->get(PeerString());
if (!client_uri_str.has_value()) {
gpr_log(GPR_ERROR,
"Unable to extract client URI string (peer string) from gRPC "
"metadata.");
return "";
}
absl::StatusOr<URI> client_uri =
URI::Parse(client_uri_slice->as_string_view());
absl::StatusOr<URI> client_uri = URI::Parse(*client_uri_str);
if (!client_uri.ok()) {
gpr_log(GPR_ERROR,
"Unable to parse the client URI string (peer string) to a client "
Expand Down
13 changes: 6 additions & 7 deletions src/core/ext/filters/stateful_session/stateful_session_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ void MaybeUpdateServerInitialMetadata(
absl::optional<absl::string_view> cookie_value,
ServerMetadata* server_initial_metadata) {
// Get peer string.
Slice* peer_string = server_initial_metadata->get_pointer(PeerString());
if (peer_string == nullptr) return; // Nothing we can do.
auto peer_string = server_initial_metadata->get(PeerString());
if (!peer_string.has_value()) return; // Nothing we can do.
// If there was no cookie or if the address changed, set the cookie.
if (!cookie_value.has_value() ||
peer_string->as_string_view() != *cookie_value) {
std::vector<std::string> parts = {absl::StrCat(
*cookie_config->name, "=",
absl::Base64Escape(peer_string->as_string_view()), "; HttpOnly")};
if (!cookie_value.has_value() || *peer_string != *cookie_value) {
std::vector<std::string> parts = {
absl::StrCat(*cookie_config->name, "=",
absl::Base64Escape(*peer_string), "; HttpOnly")};
if (!cookie_config->path.empty()) {
parts.emplace_back(absl::StrCat("Path=", cookie_config->path));
}
Expand Down

0 comments on commit 24921c0

Please sign in to comment.