Skip to content

Commit

Permalink
Second attempt: filter stack: pass peer name up via recv_initial_meta…
Browse files Browse the repository at this point in the history
…data batch (#32417)

Original attempt was #31933, reverted
in #32415.
  • Loading branch information
markdroth committed Feb 22, 2023
1 parent b53f6fd commit 5c0589f
Show file tree
Hide file tree
Showing 26 changed files with 147 additions and 182 deletions.
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3687,7 +3687,6 @@ grpc_cc_library(
"//src/core:chttp2_flow_control",
"//src/core:closure",
"//src/core:error",
"//src/core:gpr_atm",
"//src/core:http2_errors",
"//src/core:http2_settings",
"//src/core:init_internally",
Expand Down
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2225,6 +2225,7 @@ grpc_cc_library(
"lib/transport/bdp_estimator.cc",
],
hdrs = ["lib/transport/bdp_estimator.h"],
external_deps = ["absl/strings"],
deps = [
"time",
"//:gpr",
Expand Down
32 changes: 8 additions & 24 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2929,11 +2929,6 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
if (batch->send_initial_metadata) {
call_attempt_tracer()->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata);
peer_string_ = batch->payload->send_initial_metadata.peer_string;
original_send_initial_metadata_on_complete_ = batch->on_complete;
GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
SendInitialMetadataOnComplete, this, nullptr);
batch->on_complete = &send_initial_metadata_on_complete_;
}
if (batch->send_message) {
call_attempt_tracer()->RecordSendMessage(
Expand Down Expand Up @@ -3039,21 +3034,6 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch(
}
}

void ClientChannel::FilterBasedLoadBalancedCall::SendInitialMetadataOnComplete(
void* arg, grpc_error_handle error) {
auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: got on_complete for send_initial_metadata: "
"error=%s",
self->chand(), self, StatusToString(error).c_str());
}
self->call_attempt_tracer()->RecordOnDoneSendInitialMetadata(
self->peer_string_);
Closure::Run(DEBUG_LOCATION,
self->original_send_initial_metadata_on_complete_, error);
}

void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
Expand Down Expand Up @@ -3121,10 +3101,14 @@ void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady(
status = absl::Status(static_cast<absl::StatusCode>(code), message);
}
}
const char* peer_string =
self->peer_string_ != nullptr
? reinterpret_cast<char*>(gpr_atm_acq_load(self->peer_string_))
: "";
absl::string_view peer_string;
if (self->recv_initial_metadata_ != nullptr) {
Slice* peer_string_slice =
self->recv_initial_metadata_->get_pointer(PeerString());
if (peer_string_slice != nullptr) {
peer_string = peer_string_slice->as_string_view();
}
}
self->RecordCallCompletion(status, self->recv_trailing_metadata_,
self->transport_stream_stats_, peer_string);
}
Expand Down
6 changes: 0 additions & 6 deletions src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/atm.h>

#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
Expand Down Expand Up @@ -564,11 +563,6 @@ class ClientChannel::FilterBasedLoadBalancedCall

RefCountedPtr<SubchannelCall> subchannel_call_;

// For intercepting send_initial_metadata on_complete.
gpr_atm* peer_string_ = nullptr;
grpc_closure send_initial_metadata_on_complete_;
grpc_closure* original_send_initial_metadata_on_complete_ = nullptr;

// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
Expand Down
12 changes: 0 additions & 12 deletions src/core/ext/filters/client_channel/retry_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>

#include "src/core/ext/filters/client_channel/client_channel.h"
Expand Down Expand Up @@ -614,15 +613,6 @@ class RetryFilter::CallData {
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_metadata_batch send_initial_metadata_{arena_};
// TODO(roth): As part of implementing hedging, we'll probably need to
// have the LB call set a value in CallAttempt and then propagate it
// from CallAttempt to the parent call when we commit. Otherwise, we
// may leave this with a value for a peer other than the one we
// actually commit to. Alternatively, maybe see if there's a way to
// change the surface API such that the peer isn't available until
// after initial metadata is received? (Could even change the
// transport API to return this with the recv_initial_metadata op.)
gpr_atm* peer_string_;
// send_message
// When we get a send_message op, we replace the original byte stream
// with a CachingByteStream that caches the slices to a local buffer for
Expand Down Expand Up @@ -1989,7 +1979,6 @@ void RetryFilter::CallData::CallAttempt::BatchData::
batch_.send_initial_metadata = true;
batch_.payload->send_initial_metadata.send_initial_metadata =
&call_attempt_->send_initial_metadata_;
batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
}

void RetryFilter::CallData::CallAttempt::BatchData::
Expand Down Expand Up @@ -2337,7 +2326,6 @@ void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
grpc_metadata_batch* send_initial_metadata =
batch->payload->send_initial_metadata.send_initial_metadata;
send_initial_metadata_ = send_initial_metadata->Copy();
peer_string_ = batch->payload->send_initial_metadata.peer_string;
}
// Set up cache for send_message ops.
if (batch->send_message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
GPR_ASSERT(error.ok());
payload_.send_initial_metadata.send_initial_metadata =
&send_initial_metadata_;
payload_.send_initial_metadata.peer_string = nullptr;
batch_.send_initial_metadata = true;
// Add send_message op.
send_message_.Append(Slice(
Expand All @@ -257,7 +256,6 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
payload_.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_;
payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
payload_.recv_initial_metadata.peer_string = nullptr;
// recv_initial_metadata_ready callback takes ref, handled manually.
call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
payload_.recv_initial_metadata.recv_initial_metadata_ready =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ namespace {
std::string GetCensusSafeClientIpString(
const ClientMetadataHandle& initial_metadata) {
// Find the client URI string.
auto client_uri_str = initial_metadata->get(PeerString());
if (!client_uri_str.has_value()) {
Slice* client_uri_slice = initial_metadata->get_pointer(PeerString());
if (client_uri_slice == nullptr) {
gpr_log(GPR_ERROR,
"Unable to extract client URI string (peer string) from gRPC "
"metadata.");
return "";
}
absl::StatusOr<URI> client_uri = URI::Parse(*client_uri_str);
absl::StatusOr<URI> client_uri =
URI::Parse(client_uri_slice->as_string_view());
if (!client_uri.ok()) {
gpr_log(GPR_ERROR,
"Unable to parse the client URI string (peer string) to a client "
Expand Down
13 changes: 7 additions & 6 deletions src/core/ext/filters/stateful_session/stateful_session_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ void MaybeUpdateServerInitialMetadata(
absl::optional<absl::string_view> cookie_value,
ServerMetadata* server_initial_metadata) {
// Get peer string.
auto peer_string = server_initial_metadata->get(PeerString());
if (!peer_string.has_value()) return; // Nothing we can do.
Slice* peer_string = server_initial_metadata->get_pointer(PeerString());
if (peer_string == nullptr) return; // Nothing we can do.
// If there was no cookie or if the address changed, set the cookie.
if (!cookie_value.has_value() || *peer_string != *cookie_value) {
std::vector<std::string> parts = {
absl::StrCat(*cookie_config->name, "=",
absl::Base64Escape(*peer_string), "; HttpOnly")};
if (!cookie_value.has_value() ||
peer_string->as_string_view() != *cookie_value) {
std::vector<std::string> parts = {absl::StrCat(
*cookie_config->name, "=",
absl::Base64Escape(peer_string->as_string_view()), "; HttpOnly")};
if (!cookie_config->path.empty()) {
parts.emplace_back(absl::StrCat("Path=", cookie_config->path));
}
Expand Down

0 comments on commit 5c0589f

Please sign in to comment.