Skip to content

Commit e85a098

Browse files
authoredAug 29, 2024··
Update-with-Start operation (#1579)
Adds support for Update-with-Start, using the MultiOperation API (temporalio/api#367).
1 parent 5364a47 commit e85a098

File tree

8 files changed

+779
-115
lines changed

8 files changed

+779
-115
lines changed
 

‎.github/workflows/docker/dynamic-config-custom.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ frontend.enableUpdateWorkflowExecution:
77
- value: true
88
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
99
- value: true
10+
frontend.enableExecuteMultiOperation:
11+
- value: true
1012
system.enableEagerWorkflowStart:
1113
- value: true
1214
frontend.workerVersioningRuleAPIs:

‎client/client.go

+18
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,16 @@ type (
162162
// StartWorkflowOptions configuration parameters for starting a workflow execution.
163163
StartWorkflowOptions = internal.StartWorkflowOptions
164164

165+
// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
166+
// For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start.
167+
// NOTE: Experimental
168+
WithStartWorkflowOperation = internal.WithStartWorkflowOperation
169+
170+
// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
171+
// See NewUpdateWithStartWorkflowOperation for details.
172+
// NOTE: Experimental
173+
UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation
174+
165175
// HistoryEventIterator is a iterator which can return history events.
166176
HistoryEventIterator = internal.HistoryEventIterator
167177

@@ -921,6 +931,14 @@ type MetricsTimer = metrics.Timer
921931
// MetricsNopHandler is a noop handler that does nothing with the metrics.
922932
var MetricsNopHandler = metrics.NopHandler
923933

934+
// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start.
935+
// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options,
936+
// the update result can be obtained.
937+
// NOTE: Experimental
938+
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
939+
return internal.NewUpdateWithStartWorkflowOperation(options)
940+
}
941+
924942
// Dial creates an instance of a workflow client. This will attempt to connect
925943
// to the server eagerly and will return an error if the server is not
926944
// available.

‎internal/client.go

+79-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package internal
2727
import (
2828
"context"
2929
"crypto/tls"
30+
"errors"
3031
"fmt"
3132
"sync/atomic"
3233
"time"
@@ -643,9 +644,23 @@ type (
643644
// Optional: defaulted to Fail.
644645
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
645646

647+
// WithStartOperation - Operation to execute with Workflow Start.
648+
// For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is
649+
// already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the
650+
// operation is executed. If instead the policy is set to Fail (the default), nothing is executed and
651+
// an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored).
652+
// This option will be ignored when used with Client.SignalWithStartWorkflow.
653+
//
654+
// Optional: defaults to nil.
655+
//
656+
// NOTE: Experimental
657+
WithStartOperation WithStartWorkflowOperation
658+
646659
// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
647-
// workflow id has already been used and WorkflowIDReusePolicy would disallow a re-run. If it is set to false,
648-
// rather than erroring a WorkflowRun instance representing the current or last run will be returned.
660+
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
661+
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
662+
// the current or last run will be returned. However, when WithStartOperation is set, this field is ignored and
663+
// the WorkflowIDConflictPolicy UseExisting must be used instead to prevent erroring.
649664
//
650665
// Optional: defaults to false
651666
WorkflowExecutionErrorWhenAlreadyStarted bool
@@ -714,6 +729,24 @@ type (
714729
links []*commonpb.Link
715730
}
716731

732+
// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
733+
WithStartWorkflowOperation interface {
734+
isWithStartWorkflowOperation()
735+
}
736+
737+
// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
738+
// See NewUpdateWithStartWorkflowOperation for details.
739+
UpdateWithStartWorkflowOperation struct {
740+
input *ClientUpdateWorkflowInput
741+
// flag to ensure the operation is only executed once
742+
executed atomic.Bool
743+
// channel to indicate that handle or err is available
744+
doneCh chan struct{}
745+
// handle and err cannot be accessed before doneCh is closed
746+
handle WorkflowUpdateHandle
747+
err error
748+
}
749+
717750
// RetryPolicy defines the retry policy.
718751
// Note that the history of activity with retry policy will be different: the started event will be written down into
719752
// history only when the activity completes or "finally" timeouts/fails. And the started event only records the last
@@ -1004,6 +1037,50 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien
10041037
}, nil
10051038
}
10061039

1040+
// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start.
1041+
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
1042+
res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})}
1043+
1044+
input, err := createUpdateWorkflowInput(options)
1045+
if err != nil {
1046+
res.set(nil, err)
1047+
} else if options.RunID != "" {
1048+
res.set(nil, errors.New("RunID cannot be set because the workflow might not be running"))
1049+
}
1050+
if options.FirstExecutionRunID != "" {
1051+
res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running"))
1052+
} else {
1053+
res.input = input
1054+
}
1055+
1056+
return res
1057+
}
1058+
1059+
// Get blocks until a server response has been received; or the context deadline is exceeded.
1060+
func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) {
1061+
select {
1062+
case <-op.doneCh:
1063+
return op.handle, op.err
1064+
case <-ctx.Done():
1065+
return nil, ctx.Err()
1066+
}
1067+
}
1068+
1069+
func (op *UpdateWithStartWorkflowOperation) markExecuted() error {
1070+
if op.executed.Swap(true) {
1071+
return fmt.Errorf("was already executed")
1072+
}
1073+
return nil
1074+
}
1075+
1076+
func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) {
1077+
op.handle = handle
1078+
op.err = err
1079+
close(op.doneCh)
1080+
}
1081+
1082+
func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {}
1083+
10071084
// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces.
10081085
func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) {
10091086
// Initialize root tags

‎internal/cmd/build/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ import (
4141

4242
_ "github.com/BurntSushi/toml"
4343
_ "github.com/kisielk/errcheck/errcheck"
44+
_ "honnef.co/go/tools/staticcheck"
45+
4446
"go.temporal.io/sdk/client"
4547
"go.temporal.io/sdk/testsuite"
46-
_ "honnef.co/go/tools/staticcheck"
4748
)
4849

4950
func main() {
@@ -145,6 +146,7 @@ func (b *builder) integrationTest() error {
145146
},
146147
LogLevel: "warn",
147148
ExtraArgs: []string{
149+
"--dynamic-config-value", "frontend.enableExecuteMultiOperation=true",
148150
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
149151
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true",
150152
"--dynamic-config-value", "frontend.workerVersioningRuleAPIs=true",

‎internal/internal_workflow_client.go

+275-106
Large diffs are not rendered by default.

‎internal/internal_workflow_client_test.go

+142-4
Original file line numberDiff line numberDiff line change
@@ -976,6 +976,134 @@ func (s *workflowRunSuite) TestGetWorkflowNoExtantWorkflowAndNoRunId() {
976976
s.Equal("", workflowRunNoRunID.GetRunID())
977977
}
978978

979+
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError() {
980+
s.workflowServiceClient.EXPECT().
981+
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
982+
Return(nil, serviceerror.NewInternal("internal error")).Times(1)
983+
984+
updOp := NewUpdateWithStartWorkflowOperation(
985+
UpdateWorkflowOptions{
986+
UpdateName: "update",
987+
WaitForStage: WorkflowUpdateStageCompleted,
988+
})
989+
990+
_, err := s.workflowClient.ExecuteWorkflow(
991+
context.Background(),
992+
StartWorkflowOptions{
993+
ID: workflowID,
994+
TaskQueue: taskqueue,
995+
WithStartOperation: updOp,
996+
}, workflowType,
997+
)
998+
s.ErrorContains(err, "internal error")
999+
}
1000+
1001+
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMismatch() {
1002+
s.workflowServiceClient.EXPECT().
1003+
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
1004+
Return(&workflowservice.ExecuteMultiOperationResponse{
1005+
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{},
1006+
}, nil).Times(1)
1007+
1008+
updOp := NewUpdateWithStartWorkflowOperation(
1009+
UpdateWorkflowOptions{
1010+
UpdateName: "update",
1011+
WaitForStage: WorkflowUpdateStageCompleted,
1012+
})
1013+
1014+
_, err := s.workflowClient.ExecuteWorkflow(
1015+
context.Background(),
1016+
StartWorkflowOptions{
1017+
ID: workflowID,
1018+
TaskQueue: taskqueue,
1019+
WithStartOperation: updOp,
1020+
}, workflowType,
1021+
)
1022+
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results")
1023+
}
1024+
1025+
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCountMismatch() {
1026+
s.workflowServiceClient.EXPECT().
1027+
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
1028+
Return(nil, serviceerror.NewMultiOperationExecution("Error", []error{})).Times(1)
1029+
1030+
updOp := NewUpdateWithStartWorkflowOperation(
1031+
UpdateWorkflowOptions{
1032+
UpdateName: "update",
1033+
WaitForStage: WorkflowUpdateStageCompleted,
1034+
})
1035+
1036+
_, err := s.workflowClient.ExecuteWorkflow(
1037+
context.Background(),
1038+
StartWorkflowOptions{
1039+
ID: workflowID,
1040+
TaskQueue: taskqueue,
1041+
WithStartOperation: updOp,
1042+
}, workflowType,
1043+
)
1044+
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors")
1045+
}
1046+
1047+
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseTypeMismatch() {
1048+
s.workflowServiceClient.EXPECT().
1049+
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
1050+
Return(&workflowservice.ExecuteMultiOperationResponse{
1051+
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{
1052+
{
1053+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{}, // wrong!
1054+
},
1055+
nil,
1056+
},
1057+
}, nil).Times(1)
1058+
1059+
updOp := NewUpdateWithStartWorkflowOperation(
1060+
UpdateWorkflowOptions{
1061+
UpdateName: "update",
1062+
WaitForStage: WorkflowUpdateStageCompleted,
1063+
})
1064+
1065+
_, err := s.workflowClient.ExecuteWorkflow(
1066+
context.Background(),
1067+
StartWorkflowOptions{
1068+
ID: workflowID,
1069+
TaskQueue: taskqueue,
1070+
WithStartOperation: updOp,
1071+
}, workflowType,
1072+
)
1073+
s.ErrorContains(err, "invalid server response: StartWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow")
1074+
}
1075+
1076+
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTypeMismatch() {
1077+
s.workflowServiceClient.EXPECT().
1078+
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
1079+
Return(&workflowservice.ExecuteMultiOperationResponse{
1080+
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{
1081+
{
1082+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{},
1083+
},
1084+
{
1085+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, // wrong!
1086+
},
1087+
},
1088+
}, nil).Times(1)
1089+
1090+
updOp := NewUpdateWithStartWorkflowOperation(
1091+
UpdateWorkflowOptions{
1092+
UpdateName: "update",
1093+
WaitForStage: WorkflowUpdateStageCompleted,
1094+
})
1095+
1096+
_, err := s.workflowClient.ExecuteWorkflow(
1097+
context.Background(),
1098+
StartWorkflowOptions{
1099+
ID: workflowID,
1100+
TaskQueue: taskqueue,
1101+
WithStartOperation: updOp,
1102+
}, workflowType,
1103+
)
1104+
s.ErrorContains(err, "invalid server response: UpdateWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow")
1105+
}
1106+
9791107
func getGetWorkflowExecutionHistoryRequest(filterType enumspb.HistoryEventFilterType) *workflowservice.GetWorkflowExecutionHistoryRequest {
9801108
request := &workflowservice.GetWorkflowExecutionHistoryRequest{
9811109
Namespace: DefaultNamespace,
@@ -1083,11 +1211,21 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithContextAwareDat
10831211
s.Equal(startResponse.GetRunId(), resp.GetRunID())
10841212
}
10851213

1086-
func (s *workflowClientTestSuite) TestSignalWithStartWorkflowAmbiguousID() {
1087-
_, err := s.client.SignalWithStartWorkflow(context.Background(), "workflow-id-1", "my-signal", "my-signal-value",
1214+
func (s *workflowClientTestSuite) TestSignalWithStartWorkflowValidation() {
1215+
// ambiguous WorkflowID
1216+
_, err := s.client.SignalWithStartWorkflow(
1217+
context.Background(), "workflow-id-1", "my-signal", "my-signal-value",
10881218
StartWorkflowOptions{ID: "workflow-id-2"}, workflowType)
1089-
s.Error(err)
1090-
s.Contains(err.Error(), "workflow ID from options not used")
1219+
s.ErrorContains(err, "workflow ID from options not used")
1220+
1221+
// unsupported WithStartOperation
1222+
_, err = s.client.SignalWithStartWorkflow(
1223+
context.Background(), "workflow-id", "my-signal", "my-signal-value",
1224+
StartWorkflowOptions{
1225+
ID: "workflow-id",
1226+
WithStartOperation: &UpdateWithStartWorkflowOperation{},
1227+
}, workflowType)
1228+
s.ErrorContains(err, "option WithStartOperation is not allowed")
10911229
}
10921230

10931231
func (s *workflowClientTestSuite) TestStartWorkflow() {

‎test/integration_test.go

+235-2
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,13 @@ import (
3939
"testing"
4040
"time"
4141

42-
"go.opentelemetry.io/otel/baggage"
43-
4442
"github.com/opentracing/opentracing-go"
4543
"github.com/pborman/uuid"
4644
"github.com/stretchr/testify/assert"
4745
"github.com/stretchr/testify/require"
4846
"github.com/stretchr/testify/suite"
4947
"github.com/uber-go/tally/v4"
48+
"go.opentelemetry.io/otel/baggage"
5049
sdktrace "go.opentelemetry.io/otel/sdk/trace"
5150
"go.opentelemetry.io/otel/sdk/trace/tracetest"
5251
"go.opentelemetry.io/otel/trace"
@@ -3956,6 +3955,240 @@ func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInHandler() {
39563955
ts.NoError(run.Get(ctx, nil))
39573956
}
39583957

3958+
func (ts *IntegrationTestSuite) TestExecuteWorkflowWithUpdate() {
3959+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
3960+
defer cancel()
3961+
3962+
startOptionsWithOperation := func(op client.WithStartWorkflowOperation) client.StartWorkflowOptions {
3963+
startOptions := ts.startWorkflowOptions("test-update-with-start-" + uuid.New())
3964+
startOptions.EnableEagerStart = false // not allowed to use with update-with-start
3965+
startOptions.WithStartOperation = op
3966+
return startOptions
3967+
}
3968+
3969+
ts.Run("sends update-with-start (no running workflow)", func() {
3970+
updateOp := client.NewUpdateWithStartWorkflowOperation(
3971+
client.UpdateWorkflowOptions{
3972+
UpdateName: "update",
3973+
Args: []any{1},
3974+
WaitForStage: client.WorkflowUpdateStageAccepted,
3975+
})
3976+
3977+
startOptions := startOptionsWithOperation(updateOp)
3978+
run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
3979+
ts.NoError(err)
3980+
3981+
var updateResult int
3982+
updHandle, err := updateOp.Get(ctx)
3983+
ts.NoError(err)
3984+
ts.NoError(updHandle.Get(ctx, &updateResult))
3985+
ts.Equal(1, updateResult)
3986+
3987+
var workflowResult int
3988+
ts.NoError(run.Get(ctx, &workflowResult))
3989+
ts.Equal(1, workflowResult)
3990+
})
3991+
3992+
ts.Run("sends update-with-start (already running workflow)", func() {
3993+
startOptions := startOptionsWithOperation(nil)
3994+
run1, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
3995+
ts.NoError(err)
3996+
3997+
updateOp := client.NewUpdateWithStartWorkflowOperation(
3998+
client.UpdateWorkflowOptions{
3999+
UpdateName: "update",
4000+
Args: []any{1},
4001+
WaitForStage: client.WorkflowUpdateStageCompleted,
4002+
})
4003+
4004+
startOptions.WithStartOperation = updateOp
4005+
startOptions.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
4006+
run2, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4007+
ts.NoError(err)
4008+
ts.Equal(run1.GetRunID(), run2.GetRunID())
4009+
4010+
var updateResult int
4011+
updHandle, err := updateOp.Get(ctx)
4012+
ts.NoError(err)
4013+
ts.NoError(updHandle.Get(ctx, &updateResult))
4014+
ts.Equal(1, updateResult)
4015+
})
4016+
4017+
ts.Run("sends update-with-start but update is rejected", func() {
4018+
updateOp := client.NewUpdateWithStartWorkflowOperation(
4019+
client.UpdateWorkflowOptions{
4020+
UpdateName: "update",
4021+
Args: []any{-1}, // rejected update payload
4022+
WaitForStage: client.WorkflowUpdateStageCompleted,
4023+
})
4024+
4025+
startOptions := startOptionsWithOperation(updateOp)
4026+
run, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4027+
ts.NoError(err)
4028+
ts.NotNil(run)
4029+
4030+
var updateResult int
4031+
updHandle, err := updateOp.Get(ctx)
4032+
ts.NoError(err)
4033+
err = updHandle.Get(ctx, &updateResult)
4034+
ts.ErrorContains(err, "addend must be non-negative")
4035+
})
4036+
4037+
ts.Run("receives update result in separate goroutines", func() {
4038+
updateOp := client.NewUpdateWithStartWorkflowOperation(
4039+
client.UpdateWorkflowOptions{
4040+
UpdateName: "update",
4041+
Args: []any{1},
4042+
WaitForStage: client.WorkflowUpdateStageAccepted,
4043+
})
4044+
4045+
done := make(chan struct{})
4046+
defer func() { <-done }()
4047+
go func() {
4048+
var updateResult int
4049+
updHandle, err := updateOp.Get(ctx)
4050+
ts.NoError(err)
4051+
ts.NoError(updHandle.Get(ctx, &updateResult))
4052+
ts.Equal(1, updateResult)
4053+
done <- struct{}{}
4054+
}()
4055+
4056+
startOptions := startOptionsWithOperation(updateOp)
4057+
_, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4058+
ts.NoError(err)
4059+
4060+
var updateResult int
4061+
updHandle, err := updateOp.Get(ctx)
4062+
ts.NoError(err)
4063+
ts.NoError(updHandle.Get(ctx, &updateResult))
4064+
ts.Equal(1, updateResult)
4065+
})
4066+
4067+
ts.Run("fails when start request is invalid", func() {
4068+
updateOp := client.NewUpdateWithStartWorkflowOperation(
4069+
client.UpdateWorkflowOptions{
4070+
UpdateName: "update",
4071+
WaitForStage: client.WorkflowUpdateStageCompleted,
4072+
})
4073+
4074+
startOptions := startOptionsWithOperation(updateOp)
4075+
startOptions.CronSchedule = "invalid!"
4076+
_, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4077+
ts.Error(err)
4078+
})
4079+
4080+
ts.Run("fails when update operation is invalid", func() {
4081+
updateOp := client.NewUpdateWithStartWorkflowOperation(
4082+
client.UpdateWorkflowOptions{
4083+
// invalid
4084+
})
4085+
4086+
startOptions := startOptionsWithOperation(updateOp)
4087+
_, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4088+
ts.ErrorContains(err, "invalid WithStartOperation: WaitForStage must be specified")
4089+
4090+
updateOp = client.NewUpdateWithStartWorkflowOperation(
4091+
client.UpdateWorkflowOptions{
4092+
RunID: "invalid",
4093+
WaitForStage: client.WorkflowUpdateStageCompleted,
4094+
})
4095+
4096+
startOptions = startOptionsWithOperation(updateOp)
4097+
_, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4098+
ts.ErrorContains(err, "invalid WithStartOperation: RunID cannot be set because the workflow might not be running")
4099+
4100+
updateOp = client.NewUpdateWithStartWorkflowOperation(
4101+
client.UpdateWorkflowOptions{
4102+
FirstExecutionRunID: "invalid",
4103+
WaitForStage: client.WorkflowUpdateStageCompleted,
4104+
})
4105+
4106+
startOptions = startOptionsWithOperation(updateOp)
4107+
_, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4108+
ts.ErrorContains(err, "invalid WithStartOperation: FirstExecutionRunID cannot be set because the workflow might not be running")
4109+
4110+
updateOp = client.NewUpdateWithStartWorkflowOperation(
4111+
client.UpdateWorkflowOptions{
4112+
UpdateName: "", // invalid
4113+
WaitForStage: client.WorkflowUpdateStageCompleted,
4114+
})
4115+
4116+
startOptions = startOptionsWithOperation(updateOp)
4117+
_, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4118+
ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally
4119+
4120+
updateOp = client.NewUpdateWithStartWorkflowOperation(
4121+
client.UpdateWorkflowOptions{
4122+
WorkflowID: "different", // does not match Start's
4123+
UpdateName: "update",
4124+
WaitForStage: client.WorkflowUpdateStageCompleted,
4125+
})
4126+
4127+
startOptions = startOptionsWithOperation(updateOp)
4128+
_, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4129+
ts.ErrorContains(err, "invalid WithStartOperation: ") // omitting server message intentionally
4130+
})
4131+
4132+
ts.Run("fails when workflow is already running", func() {
4133+
startOptions := startOptionsWithOperation(nil)
4134+
_, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4135+
ts.NoError(err)
4136+
4137+
updateOp := client.NewUpdateWithStartWorkflowOperation(
4138+
client.UpdateWorkflowOptions{
4139+
UpdateName: "update",
4140+
Args: []any{1},
4141+
WaitForStage: client.WorkflowUpdateStageCompleted,
4142+
})
4143+
4144+
startOptions.WithStartOperation = updateOp
4145+
// NOTE that WorkflowExecutionErrorWhenAlreadyStarted (defaults to false) has no impact
4146+
_, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4147+
ts.ErrorContains(err, "Workflow execution is already running")
4148+
})
4149+
4150+
ts.Run("fails when executed twice", func() {
4151+
updateOp := client.NewUpdateWithStartWorkflowOperation(
4152+
client.UpdateWorkflowOptions{
4153+
UpdateName: "update",
4154+
Args: []any{1},
4155+
WaitForStage: client.WorkflowUpdateStageCompleted,
4156+
})
4157+
4158+
startOptions := startOptionsWithOperation(updateOp)
4159+
_, err := ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4160+
ts.NoError(err)
4161+
4162+
_, err = ts.client.ExecuteWorkflow(ctx, startOptions, ts.workflows.UpdateEntityWorkflow)
4163+
ts.ErrorContains(err, "invalid WithStartOperation: was already executed")
4164+
})
4165+
4166+
ts.Run("propagates context", func() {
4167+
updateOp := client.NewUpdateWithStartWorkflowOperation(
4168+
client.UpdateWorkflowOptions{
4169+
UpdateName: "update",
4170+
Args: []any{1},
4171+
WaitForStage: client.WorkflowUpdateStageCompleted,
4172+
})
4173+
4174+
var propagatedValues []string
4175+
ctx := context.Background()
4176+
// Propagate values using different context propagators.
4177+
ctx = context.WithValue(ctx, contextKey(testContextKey1), "propagatedValue1")
4178+
ctx = context.WithValue(ctx, contextKey(testContextKey2), "propagatedValue2")
4179+
ctx = context.WithValue(ctx, contextKey(testContextKey3), "non-propagatedValue")
4180+
startOptions := startOptionsWithOperation(updateOp)
4181+
err := ts.executeWorkflowWithContextAndOption(ctx, startOptions, ts.workflows.ContextPropagator, &propagatedValues, true)
4182+
ts.NoError(err)
4183+
4184+
// One copy from workflow and one copy from activity * 2 for child workflow
4185+
ts.EqualValues([]string{
4186+
"propagatedValue1", "propagatedValue2", "activity_propagatedValue1", "activity_propagatedValue2",
4187+
"child_propagatedValue1", "child_propagatedValue2", "child_activity_propagatedValue1", "child_activity_propagatedValue2",
4188+
}, propagatedValues)
4189+
})
4190+
}
4191+
39594192
func (ts *IntegrationTestSuite) TestSessionOnWorkerFailure() {
39604193
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
39614194
defer cancel()

‎test/workflow_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,30 @@ func (w *Workflows) UpdateInfoWorkflow(ctx workflow.Context) error {
373373
return nil
374374
}
375375

376+
func (w *Workflows) UpdateEntityWorkflow(ctx workflow.Context) (int, error) {
377+
counter := 0
378+
379+
err := workflow.SetUpdateHandlerWithOptions(ctx, "update", func(ctx workflow.Context, add int) (int, error) {
380+
workflow.Sleep(ctx, 1*time.Second) // force separate WFT for accept and complete
381+
counter += add
382+
return counter, nil
383+
}, workflow.UpdateHandlerOptions{
384+
Validator: func(ctx workflow.Context, i int) error {
385+
if i < 0 {
386+
return fmt.Errorf("addend must be non-negative (%v)", i)
387+
}
388+
return nil
389+
},
390+
})
391+
if err != nil {
392+
return 0, err
393+
}
394+
395+
workflow.Await(ctx, func() bool { return counter >= 1 })
396+
397+
return counter, nil
398+
}
399+
376400
func (w *Workflows) UpdateWithValidatorWorkflow(ctx workflow.Context) error {
377401
workflow.Go(ctx, func(ctx workflow.Context) {
378402
_ = workflow.Sleep(ctx, time.Minute)
@@ -3161,6 +3185,7 @@ func (w *Workflows) register(worker worker.Worker) {
31613185
worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartToCloseTimeout)
31623186
worker.RegisterWorkflow(w.LocalActivityStaleCache)
31633187
worker.RegisterWorkflow(w.UpdateInfoWorkflow)
3188+
worker.RegisterWorkflow(w.UpdateEntityWorkflow)
31643189
worker.RegisterWorkflow(w.SignalWorkflow)
31653190
worker.RegisterWorkflow(w.CronWorkflow)
31663191
worker.RegisterWorkflow(w.ActivityTimeoutsWorkflow)

0 commit comments

Comments
 (0)
Please sign in to comment.