Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otlptracegrpc, otlpmetricgrpc: Retry for RESOURCE_EXHAUSTED only if RetryInfo is returned #4669

Merged
merged 8 commits into from
Oct 30, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` does no longer depend on `go.opentelemetry.io/otel/exporters/otlp/otlpmetric`. (#4660)
- Retry for `502 Bad Gateway` and `504 Gateway Timeout` HTTP statuses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4670)
- Retry for `502 Bad Gateway` and `504 Gateway Timeout` HTTP statuses in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4670)
- Retry for `RESOURCE_EXHAUSTED` only if RetryInfo is returned in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#4669)
pellared marked this conversation as resolved.
Show resolved Hide resolved
- Retry for `RESOURCE_EXHAUSTED` only if RetryInfo is returned in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#4669)

### Fixed

Expand Down
22 changes: 15 additions & 7 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,28 +176,36 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
return true, throttleDelay(s)
// Additionally, handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(s *status.Status) time.Duration {
// throttleDelay returns if the status is RetryInfo
// and the duration to wait for if an explicit throttle time is included.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t.RetryDelay.AsDuration()
return true, t.RetryDelay.AsDuration()
}
}
return 0
return false, 0
}
44 changes: 33 additions & 11 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestThrottleDuration(t *testing.T) {
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
expected time.Duration
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "NoRetryInfo"),
expected: 0,
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
Expand All @@ -53,7 +55,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 15 * time.Millisecond,
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
Expand All @@ -63,7 +66,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 0,
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
Expand All @@ -76,7 +80,8 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
Expand All @@ -91,13 +96,16 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
require.Equal(t, tc.expected, throttleDelay(tc.status))
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
Expand All @@ -112,7 +120,7 @@ func TestRetryable(t *testing.T) {
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: true,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
Expand All @@ -129,6 +137,20 @@ func TestRetryable(t *testing.T) {
}
}

func TestRetryableGRPCStatus_ResourceExhaustedWithRetryInfo(t *testing.T) {
pellared marked this conversation as resolved.
Show resolved Hide resolved
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

type clientShim struct {
*client
}
Expand Down
22 changes: 15 additions & 7 deletions exporters/otlp/otlptrace/otlptracegrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,30 +260,38 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}

func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
return true, throttleDelay(s)
// Additionally handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(s *status.Status) time.Duration {
// throttleDelay returns of the status is RetryInfo
// and the its duration to wait for if an explicit throttle time.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return t.RetryDelay.AsDuration()
return true, t.RetryDelay.AsDuration()
}
}
return 0
return false, 0
}

// MarshalLog is the marshaling function used by the logging system to represent this Client.
Expand Down
52 changes: 37 additions & 15 deletions exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,45 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

func TestThrottleDuration(t *testing.T) {
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
expected time.Duration
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "no retry info"),
expected: 0,
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "single retry info").WithDetails(
s, err := status.New(c, "SingleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
require.NoError(t, err)
return s
}(),
expected: 15 * time.Millisecond,
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
s, err := status.New(c, "error info").WithDetails(
s, err := status.New(c, "ErrorInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
require.NoError(t, err)
return s
}(),
expected: 0,
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "error and retry info").WithDetails(
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
Expand All @@ -70,11 +74,12 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
s, err := status.New(c, "double retry info").WithDetails(
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
Expand All @@ -85,13 +90,16 @@ func TestThrottleDuration(t *testing.T) {
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
wantOK: true,
wantDuration: 13 * time.Minute,
},
}

for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
require.Equal(t, tc.expected, throttleDelay(tc.status))
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
Expand All @@ -106,7 +114,7 @@ func TestRetryable(t *testing.T) {
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: true,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
Expand All @@ -123,6 +131,20 @@ func TestRetryable(t *testing.T) {
}
}

func TestRetryableGRPCStatus_ResourceExhaustedWithRetryInfo(t *testing.T) {
pellared marked this conversation as resolved.
Show resolved Hide resolved
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)

ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}

func TestUnstartedStop(t *testing.T) {
client := NewClient()
assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped)
Expand Down