Skip to content

Commit ded70a3

Browse files
authoredMay 9, 2024··
Add support for NextRetryDelay for local activities (#1456)
Add support for NextRetryDelay for local activities
1 parent 386a6d3 commit ded70a3

7 files changed

+95
-17
lines changed
 

Diff for: ‎internal/error.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,11 @@ func NewApplicationError(msg string, errType string, nonRetryable bool, cause er
328328

329329
func NewApplicationErrorWithOptions(msg string, errType string, options ApplicationErrorOptions) error {
330330
applicationErr := &ApplicationError{
331-
msg: msg,
332-
errType: errType,
333-
cause: options.Cause,
334-
nonRetryable: options.NonRetryable,
331+
msg: msg,
332+
errType: errType,
333+
cause: options.Cause,
334+
nonRetryable: options.NonRetryable,
335+
nextRetryDelay: options.NextRetryDelay,
335336
}
336337
// When return error to user, use EncodedValues as details and data is ready to be decoded by calling Get
337338
details := options.Details
@@ -582,6 +583,8 @@ func (e *ApplicationError) Unwrap() error {
582583
return e.cause
583584
}
584585

586+
// NextRetryDelay returns the delay to wait before retrying the activity.
587+
// a zero value means to use the activities retry policy.
585588
func (e *ApplicationError) NextRetryDelay() time.Duration { return e.nextRetryDelay }
586589

587590
// Error from error interface

Diff for: ‎internal/internal_task_handlers.go

+22-12
Original file line numberDiff line numberDiff line change
@@ -1254,7 +1254,7 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu
12541254
return false
12551255
}
12561256

1257-
retryBackoff := getRetryBackoff(lar, time.Now(), w.wth.dataConverter)
1257+
retryBackoff := getRetryBackoff(lar, time.Now())
12581258
if retryBackoff > 0 && retryBackoff <= w.workflowInfo.WorkflowTaskTimeout {
12591259
// we need a local retry
12601260
time.AfterFunc(retryBackoff, func() {
@@ -1279,7 +1279,7 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu
12791279
return false
12801280
}
12811281

1282-
func getRetryBackoff(lar *localActivityResult, now time.Time, dataConverter converter.DataConverter) time.Duration {
1282+
func getRetryBackoff(lar *localActivityResult, now time.Time) time.Duration {
12831283
return getRetryBackoffWithNowTime(lar.task.retryPolicy, lar.task.attempt, lar.err, now, lar.task.expireTime)
12841284
}
12851285

@@ -1291,20 +1291,30 @@ func getRetryBackoffWithNowTime(p *RetryPolicy, attempt int32, err error, now, e
12911291
if p.MaximumAttempts > 0 && attempt >= p.MaximumAttempts {
12921292
return noRetryBackoff // max attempt reached
12931293
}
1294+
1295+
var backoffInterval time.Duration
1296+
// Extract backoff interval from error if it is a retryable error.
1297+
// Not using errors.As() since we don't want to explore the whole error chain.
1298+
if applicationErr, ok := err.(*ApplicationError); ok {
1299+
backoffInterval = applicationErr.nextRetryDelay
1300+
}
1301+
// Calculate next backoff interval if the error did not contain the next backoff interval.
12941302
// attempt starts from 1
1295-
backoffInterval := time.Duration(float64(p.InitialInterval) * math.Pow(p.BackoffCoefficient, float64(attempt-1)))
1296-
if backoffInterval <= 0 {
1297-
// math.Pow() could overflow
1298-
if p.MaximumInterval > 0 {
1303+
if backoffInterval == 0 {
1304+
backoffInterval = time.Duration(float64(p.InitialInterval) * math.Pow(p.BackoffCoefficient, float64(attempt-1)))
1305+
if backoffInterval <= 0 {
1306+
// math.Pow() could overflow
1307+
if p.MaximumInterval > 0 {
1308+
backoffInterval = p.MaximumInterval
1309+
}
1310+
}
1311+
if p.MaximumInterval > 0 && backoffInterval > p.MaximumInterval {
1312+
// cap next interval to MaxInterval
12991313
backoffInterval = p.MaximumInterval
1300-
} else {
1301-
return noRetryBackoff
13021314
}
13031315
}
1304-
1305-
if p.MaximumInterval > 0 && backoffInterval > p.MaximumInterval {
1306-
// cap next interval to MaxInterval
1307-
backoffInterval = p.MaximumInterval
1316+
if backoffInterval <= 0 {
1317+
return noRetryBackoff
13081318
}
13091319

13101320
nextScheduleTime := now.Add(backoffInterval)

Diff for: ‎internal/internal_workflow_testsuite.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1647,7 +1647,7 @@ func (env *testWorkflowEnvironmentImpl) handleLocalActivityResult(result *localA
16471647
Attempt: 1,
16481648
}
16491649
if result.task.retryPolicy != nil && result.err != nil {
1650-
lar.Backoff = getRetryBackoff(result, env.Now(), env.dataConverter)
1650+
lar.Backoff = getRetryBackoff(result, env.Now())
16511651
lar.Attempt = task.attempt
16521652
}
16531653
task.callback(lar)

Diff for: ‎internal/internal_workflow_testsuite_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -2412,6 +2412,39 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowWithLocalActivity() {
24122412
s.Equal("hello local_activity", result)
24132413
}
24142414

2415+
func (s *WorkflowTestSuiteUnitTest) Test_WorkflowWithLocalActivityNextDelay() {
2416+
localActivityFn := func(ctx context.Context, delay time.Duration) error {
2417+
return NewApplicationErrorWithOptions("test delay", "DelayError", ApplicationErrorOptions{
2418+
NextRetryDelay: delay,
2419+
})
2420+
}
2421+
2422+
workflowFn := func(ctx Context) (time.Duration, error) {
2423+
lao := LocalActivityOptions{
2424+
ScheduleToCloseTimeout: time.Minute,
2425+
RetryPolicy: &RetryPolicy{
2426+
MaximumAttempts: 5,
2427+
},
2428+
}
2429+
ctx = WithLocalActivityOptions(ctx, lao)
2430+
var result string
2431+
t1 := Now(ctx)
2432+
f := ExecuteLocalActivity(ctx, localActivityFn, time.Second)
2433+
_ = f.Get(ctx, &result)
2434+
t2 := Now(ctx)
2435+
return t2.Sub(t1), nil
2436+
}
2437+
2438+
env := s.NewTestWorkflowEnvironment()
2439+
env.ExecuteWorkflow(workflowFn)
2440+
s.True(env.IsWorkflowCompleted())
2441+
s.NoError(env.GetWorkflowError())
2442+
var result time.Duration
2443+
err := env.GetWorkflowResult(&result)
2444+
s.NoError(err)
2445+
s.Equal(4*time.Second, result)
2446+
}
2447+
24152448
func (s *WorkflowTestSuiteUnitTest) Test_LocalActivity() {
24162449
localActivityFn := func(ctx context.Context, name string) (string, error) {
24172450
return "hello " + name, nil

Diff for: ‎test/activity_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ func LocalSleep(_ context.Context, delay time.Duration) error {
8686
return nil
8787
}
8888

89+
func ErrorWithNextDelay(_ context.Context, delay time.Duration) error {
90+
return temporal.NewApplicationErrorWithOptions("error with next delay", "NextDelay", temporal.ApplicationErrorOptions{
91+
NextRetryDelay: delay,
92+
})
93+
}
94+
8995
func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) {
9096
a.append("ActivityToBeCanceled")
9197
for {

Diff for: ‎test/integration_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"errors"
3030
"flag"
3131
"fmt"
32+
"math"
3233
"math/rand"
3334
"os"
3435
"strings"
@@ -465,6 +466,16 @@ func (ts *IntegrationTestSuite) TestDeadlockDetectionViaLocalActivity() {
465466
ts.True(strings.Contains(applicationErr.Error(), "Potential deadlock detected"))
466467
}
467468

469+
func (ts *IntegrationTestSuite) TestLocalActivityNextRetryDelay() {
470+
var activityExecutionTime time.Duration
471+
wfOpts := ts.startWorkflowOptions("test-local-activity-next-retry-delay")
472+
wfOpts.WorkflowTaskTimeout = 5 * time.Second
473+
err := ts.executeWorkflowWithOption(wfOpts, ts.workflows.LocalActivityNextRetryDelay, &activityExecutionTime)
474+
ts.NoError(err)
475+
// Check the activity execution time is around 7 seconds
476+
ts.LessOrEqual(math.Abs((activityExecutionTime - 7*time.Second).Seconds()), 1.0)
477+
}
478+
468479
func (ts *IntegrationTestSuite) TestActivityRetryOnError() {
469480
var expected []string
470481
err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected)

Diff for: ‎test/workflow_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ func (w *Workflows) DeadlockedWithLocalActivity(ctx workflow.Context) ([]string,
101101
return []string{}, nil
102102
}
103103

104+
func (w *Workflows) LocalActivityNextRetryDelay(ctx workflow.Context) (time.Duration, error) {
105+
laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
106+
ScheduleToCloseTimeout: time.Minute,
107+
RetryPolicy: &temporal.RetryPolicy{
108+
MaximumAttempts: 8,
109+
},
110+
})
111+
112+
t1 := workflow.Now(ctx)
113+
_ = workflow.ExecuteLocalActivity(laCtx, ErrorWithNextDelay, time.Second).Get(laCtx, nil)
114+
t2 := workflow.Now(ctx)
115+
return t2.Sub(t1), nil
116+
}
117+
104118
func (w *Workflows) Panicked(ctx workflow.Context) ([]string, error) {
105119
panic("simulated")
106120
}
@@ -3024,6 +3038,7 @@ func (w *Workflows) register(worker worker.Worker) {
30243038
worker.RegisterWorkflow(w.UpdateSettingHandlerInHandler)
30253039
worker.RegisterWorkflow(w.UpdateCancelableWorkflow)
30263040
worker.RegisterWorkflow(w.UpdateHandlerRegisteredLate)
3041+
worker.RegisterWorkflow(w.LocalActivityNextRetryDelay)
30273042

30283043
worker.RegisterWorkflow(w.child)
30293044
worker.RegisterWorkflow(w.childWithRetryPolicy)

0 commit comments

Comments
 (0)
Please sign in to comment.