Skip to content

Commit

Permalink
[outlier detection] fix crash with pick_first and add tests (grpc#33069)
Browse files Browse the repository at this point in the history
Fixes grpc#32967.

Also fix incorrect defaults for `enforcementPercentage` fields.
  • Loading branch information
markdroth authored and eugeneo committed May 17, 2023
1 parent 41b0d00 commit 1d10ef0
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 100 deletions.
4 changes: 4 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Eject(const Timestamp& time) {
ejection_time_ = time;
++multiplier_;
for (auto& subchannel : subchannels_) {
// Ejecting the subchannel may cause the child policy to unref the
// subchannel, so we need to be prepared for the set to be modified
// while we are iterating.
for (auto it = subchannels_.begin(); it != subchannels_.end();) {
SubchannelWrapper* subchannel = *it;
++it;
subchannel->Eject();
}
}
Expand Down Expand Up @@ -394,8 +399,13 @@ class OutlierDetectionLb : public LoadBalancingPolicy {

void OutlierDetectionLb::SubchannelWrapper::Eject() {
ejected_ = true;
for (auto& watcher : watchers_) {
watcher.second->Eject();
// Ejecting the subchannel may cause the child policy to cancel the watch,
// so we need to be prepared for the map to be modified while we are
// iterating.
for (auto it = watchers_.begin(); it != watchers_.end();) {
WatcherWrapper* watcher = it->second;
++it;
watcher->Eject();
}
}

Expand Down Expand Up @@ -858,8 +868,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.success_rate_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running success rate algorithm",
parent_.get());
"[outlier_detection_lb %p] running success rate algorithm: "
"stdev_factor=%d, enforcement_percentage=%d",
parent_.get(), config.success_rate_ejection->stdev_factor,
config.success_rate_ejection->enforcement_percentage);
}
// calculate ejection threshold: (mean - stdev *
// (success_rate_ejection.stdev_factor / 1000))
Expand Down Expand Up @@ -917,8 +929,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.failure_percentage_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running failure percentage algorithm",
parent_.get());
"[outlier_detection_lb %p] running failure percentage algorithm: "
"threshold=%d, enforcement_percentage=%d",
parent_.get(), config.failure_percentage_ejection->threshold,
config.failure_percentage_ejection->enforcement_percentage);
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct OutlierDetectionConfig {
uint32_t max_ejection_percent = 10;
struct SuccessRateEjection {
uint32_t stdev_factor = 1900;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 100;

Expand All @@ -56,7 +56,7 @@ struct OutlierDetectionConfig {
};
struct FailurePercentageEjection {
uint32_t threshold = 85;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 50;

Expand Down
2 changes: 1 addition & 1 deletion test/core/client_channel/lb_policy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ grpc_cc_library(
deps = [
"//src/core:lb_policy",
"//src/core:subchannel_interface",
"//test/core/event_engine:mock_event_engine",
],
)

Expand Down Expand Up @@ -205,7 +206,6 @@ grpc_cc_test(
deps = [
":lb_policy_test_lib",
"//src/core:grpc_lb_policy_weighted_round_robin",
"//test/core/event_engine:mock_event_engine",
"//test/core/util:grpc_test_util",
],
)
100 changes: 100 additions & 0 deletions test/core/client_channel/lb_policy/lb_policy_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
#include <grpc/support/port_platform.h>

#include <stddef.h>
#include <stdint.h>

#include <algorithm>
#include <chrono>
#include <deque>
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <ratio>
#include <set>
#include <string>
#include <tuple>
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
Expand Down Expand Up @@ -79,6 +83,7 @@
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/mock_event_engine.h"

namespace grpc_core {
namespace testing {
Expand Down Expand Up @@ -896,6 +901,30 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< location.file() << ":" << location.line();
}

// Expect startup with RR with a set of addresses.
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> ExpectRoundRobinStartup(
absl::Span<const absl::string_view> addresses) {
ExpectConnectingUpdate();
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
return picker;
}

// Requests a picker on picker and expects a Fail result.
// The failing status is passed to check_status.
void ExpectPickFail(LoadBalancingPolicy::SubchannelPicker* picker,
Expand Down Expand Up @@ -964,6 +993,77 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
};

// A subclass to be used for LB policies that start timers.
// Injects a mock EventEngine and provides the necessary framework for
// incrementing time and handling timer callbacks.
class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest {
protected:
// A custom time cache for which InvalidateCache() is a no-op. This
// ensures that when the timer callback instantiates its own ExecCtx
// and therefore its own ScopedTimeCache, it continues to see the time
// that we are injecting in the test.
class TestTimeCache final : public Timestamp::ScopedSource {
public:
TestTimeCache() : cached_time_(previous()->Now()) {}

Timestamp Now() override { return cached_time_; }
void InvalidateCache() override {}

void IncrementBy(Duration duration) { cached_time_ += duration; }

private:
Timestamp cached_time_;
};

TimeAwareLoadBalancingPolicyTest() {
auto mock_ee =
std::make_shared<grpc_event_engine::experimental::MockEventEngine>();
auto capture = [this](std::chrono::duration<int64_t, std::nano> duration,
absl::AnyInvocable<void()> callback) {
CheckExpectedTimerDuration(duration);
intptr_t key = next_key_++;
timer_callbacks_[key] = std::move(callback);
return grpc_event_engine::experimental::EventEngine::TaskHandle{key, 0};
};
ON_CALL(*mock_ee,
RunAfter(::testing::_, ::testing::A<absl::AnyInvocable<void()>>()))
.WillByDefault(capture);
auto cancel =
[this](
grpc_event_engine::experimental::EventEngine::TaskHandle handle) {
auto it = timer_callbacks_.find(handle.keys[0]);
if (it == timer_callbacks_.end()) return false;
timer_callbacks_.erase(it);
return true;
};
ON_CALL(*mock_ee, Cancel(::testing::_)).WillByDefault(cancel);
// Store in base class, to make it visible to the LB policy.
event_engine_ = std::move(mock_ee);
}

~TimeAwareLoadBalancingPolicyTest() override {
EXPECT_TRUE(timer_callbacks_.empty())
<< "WARNING: Test did not run all timer callbacks";
}

void RunTimerCallback() {
ASSERT_EQ(timer_callbacks_.size(), 1UL);
auto it = timer_callbacks_.begin();
ASSERT_NE(it->second, nullptr);
std::move(it->second)();
timer_callbacks_.erase(it);
}

// Called when the LB policy starts a timer.
// May be overridden by subclasses.
virtual void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration) {}

std::map<intptr_t, absl::AnyInvocable<void()>> timer_callbacks_;
intptr_t next_key_ = 1;
TestTimeCache time_cache_;
};

} // namespace testing
} // namespace grpc_core

Expand Down

0 comments on commit 1d10ef0

Please sign in to comment.