Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[outlier detection] fix crash with pick_first and add tests #33069

Merged
merged 3 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Eject(const Timestamp& time) {
ejection_time_ = time;
++multiplier_;
for (auto& subchannel : subchannels_) {
// Ejecting the subchannel may cause the child policy to unref the
// subchannel, so we need to be prepared for the set to be modified
// while we are iterating.
for (auto it = subchannels_.begin(); it != subchannels_.end(); ) {
SubchannelWrapper* subchannel = *it;
++it;
subchannel->Eject();
}
}
Expand Down Expand Up @@ -394,8 +399,13 @@ class OutlierDetectionLb : public LoadBalancingPolicy {

void OutlierDetectionLb::SubchannelWrapper::Eject() {
ejected_ = true;
for (auto& watcher : watchers_) {
watcher.second->Eject();
// Ejecting the subchannel may cause the child policy to cancel the watch,
// so we need to be prepared for the map to be modified while we are
// iterating.
for (auto it = watchers_.begin(); it != watchers_.end(); ) {
WatcherWrapper* watcher = it->second;
++it;
watcher->Eject();
}
}

Expand Down Expand Up @@ -858,8 +868,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.success_rate_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running success rate algorithm",
parent_.get());
"[outlier_detection_lb %p] running success rate algorithm: "
"stdev_factor=%d, enforcement_percentage=%d",
parent_.get(), config.success_rate_ejection->stdev_factor,
config.success_rate_ejection->enforcement_percentage);
}
// calculate ejection threshold: (mean - stdev *
// (success_rate_ejection.stdev_factor / 1000))
Expand Down Expand Up @@ -917,8 +929,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.failure_percentage_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running failure percentage algorithm",
parent_.get());
"[outlier_detection_lb %p] running failure percentage algorithm: "
"threshold=%d, enforcement_percentage=%d",
parent_.get(), config.failure_percentage_ejection->threshold,
config.failure_percentage_ejection->enforcement_percentage);
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct OutlierDetectionConfig {
uint32_t max_ejection_percent = 10;
struct SuccessRateEjection {
uint32_t stdev_factor = 1900;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 100;

Expand All @@ -56,7 +56,7 @@ struct OutlierDetectionConfig {
};
struct FailurePercentageEjection {
uint32_t threshold = 85;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 50;

Expand Down
2 changes: 1 addition & 1 deletion test/core/client_channel/lb_policy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ grpc_cc_library(
deps = [
"//src/core:lb_policy",
"//src/core:subchannel_interface",
"//test/core/event_engine:mock_event_engine",
],
)

Expand Down Expand Up @@ -205,7 +206,6 @@ grpc_cc_test(
deps = [
":lb_policy_test_lib",
"//src/core:grpc_lb_policy_weighted_round_robin",
"//test/core/event_engine:mock_event_engine",
"//test/core/util:grpc_test_util",
],
)
95 changes: 95 additions & 0 deletions test/core/client_channel/lb_policy/lb_policy_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/mock_event_engine.h"

namespace grpc_core {
namespace testing {
Expand Down Expand Up @@ -896,6 +897,30 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< location.file() << ":" << location.line();
}

// Expect startup with RR with a set of addresses.
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectRoundRobinStartup(absl::Span<const absl::string_view> addresses) {
ExpectConnectingUpdate();
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
return picker;
}

// Requests a picker on picker and expects a Fail result.
// The failing status is passed to check_status.
void ExpectPickFail(LoadBalancingPolicy::SubchannelPicker* picker,
Expand Down Expand Up @@ -964,6 +989,76 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
};

// A subclass to be used for LB policies that start timers.
// Injects a mock EventEngine and provides the necessary framework for
// incrementing time and handling timer callbacks.
class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest {
protected:
// A custom time cache for which InvalidateCache() is a no-op. This
// ensures that when the timer callback instantiates its own ExecCtx
// and therefore its own ScopedTimeCache, it continues to see the time
// that we are injecting in the test.
class TestTimeCache final : public Timestamp::ScopedSource {
public:
TestTimeCache() : cached_time_(previous()->Now()) {}

Timestamp Now() override { return cached_time_; }
void InvalidateCache() override {}

void IncrementBy(Duration duration) { cached_time_ += duration; }

private:
Timestamp cached_time_;
};

TimeAwareLoadBalancingPolicyTest() {
auto mock_ee =
std::make_shared<::grpc_event_engine::experimental::MockEventEngine>();
auto capture = [this](std::chrono::duration<int64_t, std::nano> duration,
absl::AnyInvocable<void()> callback) {
CheckExpectedTimerDuration(duration);
intptr_t key = next_key_++;
timer_callbacks_[key] = std::move(callback);
return ::grpc_event_engine::experimental::EventEngine::TaskHandle{key, 0};
};
ON_CALL(*mock_ee,
RunAfter(::testing::_, ::testing::A<absl::AnyInvocable<void()>>()))
.WillByDefault(capture);
auto cancel = [this](
::grpc_event_engine::experimental::EventEngine::TaskHandle handle) {
auto it = timer_callbacks_.find(handle.keys[0]);
if (it == timer_callbacks_.end()) return false;
timer_callbacks_.erase(it);
return true;
};
ON_CALL(*mock_ee, Cancel(::testing::_)).WillByDefault(cancel);
// Store in base class, to make it visible to the LB policy.
event_engine_ = std::move(mock_ee);
}

~TimeAwareLoadBalancingPolicyTest() override {
EXPECT_TRUE(timer_callbacks_.empty())
<< "WARNING: Test did not run all timer callbacks";
}

void RunTimerCallback() {
ASSERT_EQ(timer_callbacks_.size(), 1UL);
auto it = timer_callbacks_.begin();
ASSERT_NE(it->second, nullptr);
std::move(it->second)();
timer_callbacks_.erase(it);
}

// Called when the LB policy starts a timer.
// May be overridden by subclasses.
virtual void CheckExpectedTimerDuration(
::grpc_event_engine::experimental::EventEngine::Duration) {}

std::map<intptr_t, absl::AnyInvocable<void()>> timer_callbacks_;
intptr_t next_key_ = 1;
TestTimeCache time_cache_;
};

} // namespace testing
} // namespace grpc_core

Expand Down
117 changes: 116 additions & 1 deletion test/core/client_channel/lb_policy/outlier_detection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace grpc_core {
namespace testing {
namespace {

class OutlierDetectionTest : public LoadBalancingPolicyTest {
class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
protected:
class ConfigBuilder {
public:
Expand Down Expand Up @@ -137,7 +137,34 @@ class OutlierDetectionTest : public LoadBalancingPolicyTest {
OutlierDetectionTest()
: lb_policy_(MakeLbPolicy("outlier_detection_experimental")) {}

absl::optional<std::string> DoPickWithFailedCall(
LoadBalancingPolicy::SubchannelPicker* picker) {
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
subchannel_call_tracker;
auto address = ExpectPickComplete(picker, {}, &subchannel_call_tracker);
if (address.has_value()) {
subchannel_call_tracker->Start();
FakeMetadata metadata({});
FakeBackendMetricAccessor backend_metric_accessor({});
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
*address, absl::UnavailableError("uh oh"), &metadata,
&backend_metric_accessor};
subchannel_call_tracker->Finish(args);
}
return address;
}

void CheckExpectedTimerDuration(
::grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_internal_)
<< "Expected: " << expected_internal_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}

OrphanablePtr<LoadBalancingPolicy> lb_policy_;
::grpc_event_engine::experimental::EventEngine::Duration expected_internal_ =
std::chrono::seconds(10);
};

TEST_F(OutlierDetectionTest, Basic) {
Expand Down Expand Up @@ -168,6 +195,94 @@ TEST_F(OutlierDetectionTest, Basic) {
}
}

TEST_F(OutlierDetectionTest, FailurePercentage) {
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
// Send initial update.
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses,
ConfigBuilder()
.SetFailurePercentageThreshold(1)
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.Build()),
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// Expect normal startup.
auto picker = ExpectRoundRobinStartup(kAddresses);
ASSERT_NE(picker, nullptr);
gpr_log(GPR_INFO, "### RR startup complete");
// Do a pick and report a failed call.
auto address = DoPickWithFailedCall(picker.get());
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
gpr_log(GPR_INFO, "### ejection complete");
// Expect a re-resolution request.
ExpectReresolutionRequest();
// Expect a picker update.
std::vector<absl::string_view> remaining_addresses;
for (const auto& addr : kAddresses) {
if (addr != *address) remaining_addresses.push_back(addr);
}
picker = WaitForRoundRobinListChange(kAddresses, remaining_addresses);
}

TEST_F(OutlierDetectionTest, FailurePercentageWithPickFirst) {
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
// Send initial update.
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses,
ConfigBuilder()
.SetFailurePercentageThreshold(1)
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.SetChildPolicy({{"pick_first", Json::FromObject({})}})
.Build()),
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the first address with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel, nullptr);
// When the LB policy receives the subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// When the subchannel becomes connected, it reports READY.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't
// matter how many) and then report READY.
auto picker = WaitForConnected();
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
}
gpr_log(GPR_INFO, "### PF startup complete");
// Now have an RPC to that subchannel fail.
auto address = DoPickWithFailedCall(picker.get());
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
gpr_log(GPR_INFO, "### ejection complete");
// Expect a re-resolution request.
ExpectReresolutionRequest();
// The pick_first policy should report IDLE with a queuing picker.
ExpectStateAndQueuingPicker(GRPC_CHANNEL_IDLE);
// The queued pick should have triggered a reconnection attempt.
EXPECT_TRUE(subchannel->ConnectionRequested());
}

} // namespace
} // namespace testing
} // namespace grpc_core
Expand Down