Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otlptracegrpc, otlpmetricgrpc: Retry for RESOURCE_EXHAUSTED only if RetryInfo is returned #4669

Merged
merged 8 commits into from
Oct 30, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -47,6 +47,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
See the "API Implementations" section of the `go.opentelemetry.io/otel/trace` package documentation for more informatoin about how to accomplish this. (#4620)
- `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` does no longer depend on `go.opentelemetry.io/otel/exporters/otlp/otlpmetric`. (#4660)
- `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` does no longer depend on `go.opentelemetry.io/otel/exporters/otlp/otlpmetric`. (#4660)
- Retry for `RESOURCE_EXHAUSTED` only if RetryInfo is returned in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#4669)
pellared marked this conversation as resolved.
Show resolved Hide resolved
- Retry for `RESOURCE_EXHAUSTED` only if RetryInfo is returned in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#4669)

### Fixed

Expand Down
22 changes: 15 additions & 7 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Expand Up @@ -176,28 +176,36 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
return true, throttleDelay(s)
// Additionally handle RetryInfo.
pellared marked this conversation as resolved.
Show resolved Hide resolved
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(s *status.Status) time.Duration {
// throttleDelay returns of the status is RetryInfo
// and the its duration to wait for if an explicit throttle time.
pellared marked this conversation as resolved.
Show resolved Hide resolved
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t.RetryDelay.AsDuration()
return true, t.RetryDelay.AsDuration()
}
}
return 0
return false, 0
}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
44 changes: 33 additions & 11 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go
Expand Up @@ -33,15 +33,17 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestThrottleDuration(t *testing.T) {
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
expected time.Duration
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "NoRetryInfo"),
expected: 0,
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
Expand All @@ -53,7 +55,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 15 * time.Millisecond,
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
Expand All @@ -63,7 +66,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 0,
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
Expand All @@ -76,7 +80,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
Expand All @@ -91,13 +96,16 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
require.Equal(t, tc.expected, throttleDelay(tc.status))
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
Expand All @@ -112,7 +120,7 @@ func TestRetryable(t *testing.T) {
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: true,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
Expand All @@ -129,6 +137,20 @@ func TestRetryable(t *testing.T) {
}
}

func TestRetryableGRPCStatus_ResourceExhaustedWithRetryInfo(t *testing.T) {
pellared marked this conversation as resolved.
Show resolved Hide resolved
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

type clientShim struct {
*client
}
Expand Down
22 changes: 15 additions & 7 deletions exporters/otlp/otlptrace/otlptracegrpc/client.go
Expand Up @@ -260,30 +260,38 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
return true, throttleDelay(s)
// Additionally handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(s *status.Status) time.Duration {
// throttleDelay returns of the status is RetryInfo
// and the its duration to wait for if an explicit throttle time.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t.RetryDelay.AsDuration()
return true, t.RetryDelay.AsDuration()
}
}
return 0
return false, 0
}

// MarshalLog is the marshaling function used by the logging system to represent this Client.
Expand Down
52 changes: 37 additions & 15 deletions exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go
Expand Up @@ -27,41 +27,45 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

func TestThrottleDuration(t *testing.T) {
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
expected time.Duration
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "no retry info"),
expected: 0,
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "single retry info").WithDetails(
s, err := status.New(c, "SingleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
require.NoError(t, err)
return s
}(),
expected: 15 * time.Millisecond,
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
s, err := status.New(c, "error info").WithDetails(
s, err := status.New(c, "ErrorInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
require.NoError(t, err)
return s
}(),
expected: 0,
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "error and retry info").WithDetails(
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
Expand All @@ -70,11 +74,12 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
s, err := status.New(c, "double retry info").WithDetails(
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
Expand All @@ -85,13 +90,16 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
require.Equal(t, tc.expected, throttleDelay(tc.status))
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
Expand All @@ -106,7 +114,7 @@ func TestRetryable(t *testing.T) {
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: true,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
Expand All @@ -123,6 +131,20 @@ func TestRetryable(t *testing.T) {
}
}

func TestRetryableGRPCStatus_ResourceExhaustedWithRetryInfo(t *testing.T) {
pellared marked this conversation as resolved.
Show resolved Hide resolved
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

func TestUnstartedStop(t *testing.T) {
client := NewClient()
assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped)
Expand Down