Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[outlier detection] fix crash with pick_first and add tests #33069

Merged
merged 3 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Eject(const Timestamp& time) {
ejection_time_ = time;
++multiplier_;
for (auto& subchannel : subchannels_) {
// Ejecting the subchannel may cause the child policy to unref the
// subchannel, so we need to be prepared for the set to be modified
// while we are iterating.
for (auto it = subchannels_.begin(); it != subchannels_.end();) {
SubchannelWrapper* subchannel = *it;
++it;
subchannel->Eject();
}
}
Expand Down Expand Up @@ -394,8 +399,13 @@ class OutlierDetectionLb : public LoadBalancingPolicy {

void OutlierDetectionLb::SubchannelWrapper::Eject() {
ejected_ = true;
for (auto& watcher : watchers_) {
watcher.second->Eject();
// Ejecting the subchannel may cause the child policy to cancel the watch,
// so we need to be prepared for the map to be modified while we are
// iterating.
for (auto it = watchers_.begin(); it != watchers_.end();) {
WatcherWrapper* watcher = it->second;
++it;
watcher->Eject();
}
}

Expand Down Expand Up @@ -858,8 +868,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.success_rate_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running success rate algorithm",
parent_.get());
"[outlier_detection_lb %p] running success rate algorithm: "
"stdev_factor=%d, enforcement_percentage=%d",
parent_.get(), config.success_rate_ejection->stdev_factor,
config.success_rate_ejection->enforcement_percentage);
}
// calculate ejection threshold: (mean - stdev *
// (success_rate_ejection.stdev_factor / 1000))
Expand Down Expand Up @@ -917,8 +929,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.failure_percentage_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running failure percentage algorithm",
parent_.get());
"[outlier_detection_lb %p] running failure percentage algorithm: "
"threshold=%d, enforcement_percentage=%d",
parent_.get(), config.failure_percentage_ejection->threshold,
config.failure_percentage_ejection->enforcement_percentage);
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct OutlierDetectionConfig {
uint32_t max_ejection_percent = 10;
struct SuccessRateEjection {
uint32_t stdev_factor = 1900;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 100;

Expand All @@ -56,7 +56,7 @@ struct OutlierDetectionConfig {
};
struct FailurePercentageEjection {
uint32_t threshold = 85;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 50;

Expand Down
2 changes: 1 addition & 1 deletion test/core/client_channel/lb_policy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ grpc_cc_library(
deps = [
"//src/core:lb_policy",
"//src/core:subchannel_interface",
"//test/core/event_engine:mock_event_engine",
],
)

Expand Down Expand Up @@ -205,7 +206,6 @@ grpc_cc_test(
deps = [
":lb_policy_test_lib",
"//src/core:grpc_lb_policy_weighted_round_robin",
"//test/core/event_engine:mock_event_engine",
"//test/core/util:grpc_test_util",
],
)
100 changes: 100 additions & 0 deletions test/core/client_channel/lb_policy/lb_policy_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
#include <grpc/support/port_platform.h>

#include <stddef.h>
#include <stdint.h>

#include <algorithm>
#include <chrono>
#include <deque>
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <ratio>
#include <set>
#include <string>
#include <tuple>
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
Expand Down Expand Up @@ -79,6 +83,7 @@
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/mock_event_engine.h"

namespace grpc_core {
namespace testing {
Expand Down Expand Up @@ -896,6 +901,30 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< location.file() << ":" << location.line();
}

// Expect startup with RR with a set of addresses.
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> ExpectRoundRobinStartup(
absl::Span<const absl::string_view> addresses) {
ExpectConnectingUpdate();
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
return picker;
}

// Requests a picker on picker and expects a Fail result.
// The failing status is passed to check_status.
void ExpectPickFail(LoadBalancingPolicy::SubchannelPicker* picker,
Expand Down Expand Up @@ -964,6 +993,77 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
};

// A subclass to be used for LB policies that start timers.
// Injects a mock EventEngine and provides the necessary framework for
// incrementing time and handling timer callbacks.
class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest {
protected:
// A custom time cache for which InvalidateCache() is a no-op. This
// ensures that when the timer callback instantiates its own ExecCtx
// and therefore its own ScopedTimeCache, it continues to see the time
// that we are injecting in the test.
class TestTimeCache final : public Timestamp::ScopedSource {
public:
TestTimeCache() : cached_time_(previous()->Now()) {}

Timestamp Now() override { return cached_time_; }
void InvalidateCache() override {}

void IncrementBy(Duration duration) { cached_time_ += duration; }

private:
Timestamp cached_time_;
};

TimeAwareLoadBalancingPolicyTest() {
auto mock_ee =
std::make_shared<grpc_event_engine::experimental::MockEventEngine>();
auto capture = [this](std::chrono::duration<int64_t, std::nano> duration,
absl::AnyInvocable<void()> callback) {
CheckExpectedTimerDuration(duration);
intptr_t key = next_key_++;
timer_callbacks_[key] = std::move(callback);
return grpc_event_engine::experimental::EventEngine::TaskHandle{key, 0};
};
ON_CALL(*mock_ee,
RunAfter(::testing::_, ::testing::A<absl::AnyInvocable<void()>>()))
.WillByDefault(capture);
auto cancel =
[this](
grpc_event_engine::experimental::EventEngine::TaskHandle handle) {
auto it = timer_callbacks_.find(handle.keys[0]);
if (it == timer_callbacks_.end()) return false;
timer_callbacks_.erase(it);
return true;
};
ON_CALL(*mock_ee, Cancel(::testing::_)).WillByDefault(cancel);
// Store in base class, to make it visible to the LB policy.
event_engine_ = std::move(mock_ee);
}

~TimeAwareLoadBalancingPolicyTest() override {
EXPECT_TRUE(timer_callbacks_.empty())
<< "WARNING: Test did not run all timer callbacks";
}

void RunTimerCallback() {
ASSERT_EQ(timer_callbacks_.size(), 1UL);
auto it = timer_callbacks_.begin();
ASSERT_NE(it->second, nullptr);
std::move(it->second)();
timer_callbacks_.erase(it);
}

// Called when the LB policy starts a timer.
// May be overridden by subclasses.
virtual void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration) {}

std::map<intptr_t, absl::AnyInvocable<void()>> timer_callbacks_;
intptr_t next_key_ = 1;
TestTimeCache time_cache_;
};

} // namespace testing
} // namespace grpc_core

Expand Down