Skip to content

Commit 9d59447

Browse files
authoredDec 6, 2024··
New Update-With-Start API (#1731)
Instead of using the ExecuteWorkflow client method, update-with-start is invoked via a new client method UpdateWithStartWorkflow. To use this method, first use NewWithStartWorkflowOperation to define the start-workflow operation. A workflow ID conflict policy is required. Then call UpdateWithStartWorkflow, passing it an UpdateWithStartWorkflowOptions containing your WithStartWorkflowOperation, together with an UpdateWorkflowOptions defining the update operation. This will return an UpdateHandle. The WithStartWorkflowOperation exposes a blocking .Get(ctx) method to obtain the workflow run targeted by the update.
1 parent 9c4dde8 commit 9d59447

11 files changed

+679
-396
lines changed
 

‎client/client.go

+26-15
Original file line numberDiff line numberDiff line change
@@ -162,16 +162,11 @@ type (
162162
// StartWorkflowOptions configuration parameters for starting a workflow execution.
163163
StartWorkflowOptions = internal.StartWorkflowOptions
164164

165-
// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
166-
// For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start.
165+
// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
166+
// See [Client.NewWithStartWorkflowOperation] and [Client.UpdateWithStartWorkflow].
167167
// NOTE: Experimental
168168
WithStartWorkflowOperation = internal.WithStartWorkflowOperation
169169

170-
// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
171-
// See NewUpdateWithStartWorkflowOperation for details.
172-
// NOTE: Experimental
173-
UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation
174-
175170
// HistoryEventIterator is a iterator which can return history events.
176171
HistoryEventIterator = internal.HistoryEventIterator
177172

@@ -279,6 +274,11 @@ type (
279274
// NOTE: Experimental
280275
UpdateWorkflowOptions = internal.UpdateWorkflowOptions
281276

277+
// UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow.
278+
// See [Client.UpdateWithStartWorkflow] and [Client.NewWithStartWorkflowOperation].
279+
// NOTE: Experimental
280+
UpdateWithStartWorkflowOptions = internal.UpdateWithStartWorkflowOptions
281+
282282
// WorkflowUpdateHandle represents a running or completed workflow
283283
// execution update and gives the holder access to the outcome of the same.
284284
// NOTE: Experimental
@@ -564,6 +564,11 @@ type (
564564
SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{},
565565
options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error)
566566

567+
// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use with UpdateWithStartWorkflow.
568+
// See [Client.UpdateWithStartWorkflow].
569+
// NOTE: Experimental
570+
NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation
571+
567572
// CancelWorkflow request cancellation of a workflow in execution. Cancellation request closes the channel
568573
// returned by the workflow.Context.Done() of the workflow that is target of the request.
569574
// - workflow ID of the workflow.
@@ -840,6 +845,20 @@ type (
840845
// NOTE: Experimental
841846
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)
842847

848+
// UpdateWithStartWorkflow issues an update-with-start request. A
849+
// WorkflowIDConflictPolicy must be set in the options. If the specified
850+
// workflow execution is not running, then a new workflow execution is
851+
// started and the update is sent in the first workflow task.
852+
// Alternatively if the specified workflow execution is running then, if
853+
// the WorkflowIDConflictPolicy is USE_EXISTING, the update is issued
854+
// against the specified workflow, and if the WorkflowIDConflictPolicy
855+
// is FAIL, an error is returned. The call will block until the update
856+
// has reached the WaitForStage in the options. Note that this means
857+
// that the call will not return successfully until the update has been
858+
// delivered to a worker.
859+
// NOTE: Experimental
860+
UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error)
861+
843862
// GetWorkflowUpdateHandle creates a handle to the referenced update
844863
// which can be polled for an outcome. Note that runID is optional and
845864
// if not specified the most recent runID will be used.
@@ -934,14 +953,6 @@ type MetricsTimer = metrics.Timer
934953
// MetricsNopHandler is a noop handler that does nothing with the metrics.
935954
var MetricsNopHandler = metrics.NopHandler
936955

937-
// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start.
938-
// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options,
939-
// the update result can be obtained.
940-
// NOTE: Experimental
941-
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
942-
return internal.NewUpdateWithStartWorkflowOperation(options)
943-
}
944-
945956
// Dial creates an instance of a workflow client. This will attempt to connect
946957
// to the server eagerly and will return an error if the server is not
947958
// available.

‎interceptor/interceptor.go

+4
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ type ScheduleClientCreateInput = internal.ScheduleClientCreateInput
220220
// ClientOutoundInterceptor.UpdateWorkflow.
221221
type ClientUpdateWorkflowInput = internal.ClientUpdateWorkflowInput
222222

223+
// ClientUpdateWithStartWorkflowInput is input for
224+
// ClientOutboundInterceptor.UpdateWithStartWorkflow.
225+
type ClientUpdateWithStartWorkflowInput = internal.ClientUpdateWithStartWorkflowInput
226+
223227
// Header provides Temporal header information from the context for reading or
224228
// writing during specific interceptor calls.
225229
//

‎interceptor/tracing_interceptor.go

+27
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,33 @@ func (t *tracingClientOutboundInterceptor) UpdateWorkflow(
379379
return val, err
380380
}
381381

382+
func (t *tracingClientOutboundInterceptor) UpdateWithStartWorkflow(
383+
ctx context.Context,
384+
in *ClientUpdateWithStartWorkflowInput,
385+
) (client.WorkflowUpdateHandle, error) {
386+
// Only add tracing if enabled
387+
if t.root.options.DisableUpdateTracing {
388+
return t.Next.UpdateWithStartWorkflow(ctx, in)
389+
}
390+
// Start span and write to header
391+
span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{
392+
Operation: "UpdateWithStartWorkflow",
393+
Name: in.UpdateOptions.UpdateName,
394+
Tags: map[string]string{workflowIDTagKey: in.UpdateOptions.WorkflowID, updateIDTagKey: in.UpdateOptions.UpdateID},
395+
ToHeader: true,
396+
Time: time.Now(),
397+
})
398+
if err != nil {
399+
return nil, err
400+
}
401+
var finishOpts TracerFinishSpanOptions
402+
defer span.Finish(&finishOpts)
403+
404+
val, err := t.Next.UpdateWithStartWorkflow(ctx, in)
405+
finishOpts.Error = err
406+
return val, err
407+
}
408+
382409
type tracingActivityOutboundInterceptor struct {
383410
ActivityOutboundInterceptorBase
384411
root *tracingInterceptor

‎internal/client.go

+31-49
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ package internal
2727
import (
2828
"context"
2929
"crypto/tls"
30-
"errors"
3130
"fmt"
3231
"sync/atomic"
3332
"time"
@@ -135,6 +134,10 @@ type (
135134
SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{},
136135
options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (WorkflowRun, error)
137136

137+
// NewWithStartWorkflowOperation returns a WithStartWorkflowOperation for use in UpdateWithStartWorkflow.
138+
// NOTE: Experimental
139+
NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation
140+
138141
// CancelWorkflow cancels a workflow in execution
139142
// - workflow ID of the workflow.
140143
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
@@ -394,6 +397,17 @@ type (
394397
// NOTE: Experimental
395398
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)
396399

400+
// UpdateWithStartWorkflow issues an update-with-start request. A
401+
// WorkflowIDConflictPolicy must be set. If the specified workflow is
402+
// not running, then a new workflow execution is started and the update
403+
// is sent in the first workflow task. Alternatively if the specified
404+
// workflow is running then, if the WorkflowIDConflictPolicy is
405+
// USE_EXISTING, the update is issued against the specified workflow,
406+
// and if the WorkflowIDConflictPolicy is FAIL, an error is returned.
407+
//
408+
// NOTE: Experimental
409+
UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error)
410+
397411
// GetWorkflowUpdateHandle creates a handle to the referenced update
398412
// which can be polled for an outcome. Note that runID is optional and
399413
// if not specified the most recent runID will be used.
@@ -647,18 +661,6 @@ type (
647661
// Optional: defaulted to Fail.
648662
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
649663

650-
// WithStartOperation - Operation to execute with Workflow Start.
651-
// For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is
652-
// already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the
653-
// operation is executed. If instead the policy is set to Fail (the default), nothing is executed and
654-
// an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored).
655-
// This option will be ignored when used with Client.SignalWithStartWorkflow.
656-
//
657-
// Optional: defaults to nil.
658-
//
659-
// NOTE: Experimental
660-
WithStartOperation WithStartWorkflowOperation
661-
662664
// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
663665
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
664666
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
@@ -751,22 +753,24 @@ type (
751753
links []*commonpb.Link
752754
}
753755

754-
// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
756+
// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
757+
// See [NewWithStartWorkflowOperation] and [UpdateWithStartWorkflow].
758+
// NOTE: Experimental
755759
WithStartWorkflowOperation interface {
756-
isWithStartWorkflowOperation()
760+
// Get returns the WorkflowRun that was targeted by the UpdateWithStartWorkflow call.
761+
// This is a blocking API.
762+
Get(ctx context.Context) (WorkflowRun, error)
757763
}
758764

759-
// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
760-
// See NewUpdateWithStartWorkflowOperation for details.
761-
UpdateWithStartWorkflowOperation struct {
762-
input *ClientUpdateWorkflowInput
765+
withStartWorkflowOperationImpl struct {
766+
input *ClientExecuteWorkflowInput
763767
// flag to ensure the operation is only executed once
764768
executed atomic.Bool
765769
// channel to indicate that handle or err is available
766770
doneCh chan struct{}
767-
// handle and err cannot be accessed before doneCh is closed
768-
handle WorkflowUpdateHandle
769-
err error
771+
// workflowRun and err cannot be accessed before doneCh is closed
772+
workflowRun WorkflowRun
773+
err error
770774
}
771775

772776
// RetryPolicy defines the retry policy.
@@ -1059,30 +1063,10 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien
10591063
}, nil
10601064
}
10611065

1062-
// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start.
1063-
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
1064-
res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})}
1065-
1066-
input, err := createUpdateWorkflowInput(options)
1067-
if err != nil {
1068-
res.set(nil, err)
1069-
} else if options.RunID != "" {
1070-
res.set(nil, errors.New("RunID cannot be set because the workflow might not be running"))
1071-
}
1072-
if options.FirstExecutionRunID != "" {
1073-
res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running"))
1074-
} else {
1075-
res.input = input
1076-
}
1077-
1078-
return res
1079-
}
1080-
1081-
// Get blocks until a server response has been received; or the context deadline is exceeded.
1082-
func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) {
1066+
func (op *withStartWorkflowOperationImpl) Get(ctx context.Context) (WorkflowRun, error) {
10831067
select {
10841068
case <-op.doneCh:
1085-
return op.handle, op.err
1069+
return op.workflowRun, op.err
10861070
case <-ctx.Done():
10871071
if !op.executed.Load() {
10881072
return nil, fmt.Errorf("%w: %w", ctx.Err(), fmt.Errorf("operation was not executed"))
@@ -1091,21 +1075,19 @@ func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUp
10911075
}
10921076
}
10931077

1094-
func (op *UpdateWithStartWorkflowOperation) markExecuted() error {
1078+
func (op *withStartWorkflowOperationImpl) markExecuted() error {
10951079
if op.executed.Swap(true) {
10961080
return fmt.Errorf("was already executed")
10971081
}
10981082
return nil
10991083
}
11001084

1101-
func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) {
1102-
op.handle = handle
1085+
func (op *withStartWorkflowOperationImpl) set(workflowRun WorkflowRun, err error) {
1086+
op.workflowRun = workflowRun
11031087
op.err = err
11041088
close(op.doneCh)
11051089
}
11061090

1107-
func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {}
1108-
11091091
// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces.
11101092
func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) {
11111093
// Initialize root tags

‎internal/interceptor.go

+10
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,11 @@ type ClientOutboundInterceptor interface {
393393
// NOTE: Experimental
394394
UpdateWorkflow(context.Context, *ClientUpdateWorkflowInput) (WorkflowUpdateHandle, error)
395395

396+
// UpdateWithStartWorkflow intercepts client.Client.UpdateWithStartWorkflow.
397+
//
398+
// NOTE: Experimental
399+
UpdateWithStartWorkflow(context.Context, *ClientUpdateWithStartWorkflowInput) (WorkflowUpdateHandle, error)
400+
396401
// PollWorkflowUpdate requests the outcome of a specific update from the
397402
// server.
398403
//
@@ -416,6 +421,11 @@ type ClientUpdateWorkflowInput struct {
416421
WaitForStage WorkflowUpdateStage
417422
}
418423

424+
type ClientUpdateWithStartWorkflowInput struct {
425+
UpdateOptions *UpdateWorkflowOptions
426+
StartWorkflowOperation WithStartWorkflowOperation
427+
}
428+
419429
// ClientPollWorkflowUpdateInput is the input to
420430
// ClientOutboundInterceptor.PollWorkflowUpdate.
421431
type ClientPollWorkflowUpdateInput struct {

‎internal/interceptor_base.go

+7
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,13 @@ func (c *ClientOutboundInterceptorBase) PollWorkflowUpdate(
485485
return c.Next.PollWorkflowUpdate(ctx, in)
486486
}
487487

488+
func (c *ClientOutboundInterceptorBase) UpdateWithStartWorkflow(
489+
ctx context.Context,
490+
in *ClientUpdateWithStartWorkflowInput,
491+
) (WorkflowUpdateHandle, error) {
492+
return c.Next.UpdateWithStartWorkflow(ctx, in)
493+
}
494+
488495
// ExecuteWorkflow implements ClientOutboundInterceptor.ExecuteWorkflow.
489496
func (c *ClientOutboundInterceptorBase) ExecuteWorkflow(
490497
ctx context.Context,

‎internal/internal_workflow_client.go

+208-98
Large diffs are not rendered by default.

‎internal/internal_workflow_client_test.go

+118-107
Original file line numberDiff line numberDiff line change
@@ -1010,49 +1010,41 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() {
10101010
},
10111011
}, nil)
10121012

1013-
updOp := NewUpdateWithStartWorkflowOperation(
1014-
UpdateWorkflowOptions{
1015-
UpdateName: "update",
1016-
WaitForStage: WorkflowUpdateStageCompleted,
1017-
})
1018-
1019-
_, err := s.workflowClient.ExecuteWorkflow(
1020-
context.Background(),
1013+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
10211014
StartWorkflowOptions{
1022-
ID: workflowID,
1023-
TaskQueue: taskqueue,
1024-
WithStartOperation: updOp,
1015+
ID: workflowID,
1016+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1017+
TaskQueue: taskqueue,
10251018
}, workflowType,
10261019
)
1020+
1021+
_, err := s.workflowClient.UpdateWithStartWorkflow(
1022+
context.Background(),
1023+
UpdateWithStartWorkflowOptions{
1024+
UpdateOptions: UpdateWorkflowOptions{
1025+
UpdateName: "update",
1026+
WaitForStage: WorkflowUpdateStageCompleted,
1027+
},
1028+
StartWorkflowOperation: startOp,
1029+
},
1030+
)
10271031
s.NoError(err)
10281032
}
10291033

10301034
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() {
1031-
s.workflowServiceClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).
1032-
Return(&workflowservice.StartWorkflowExecutionResponse{
1033-
RunId: runID,
1034-
}, nil)
1035-
1036-
updOp := NewUpdateWithStartWorkflowOperation(
1037-
UpdateWorkflowOptions{
1038-
UpdateName: "update",
1039-
WaitForStage: WorkflowUpdateStageCompleted,
1040-
})
1041-
1042-
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
1043-
defer cancel()
10441035

1045-
_, err := s.workflowClient.ExecuteWorkflow(
1046-
ctxWithTimeout,
1036+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
10471037
StartWorkflowOptions{
1048-
ID: workflowID,
1049-
TaskQueue: taskqueue,
1050-
// WithStartOperation is not specified!
1038+
ID: workflowID,
1039+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1040+
TaskQueue: taskqueue,
10511041
}, workflowType,
10521042
)
1053-
require.NoError(s.T(), err)
10541043

1055-
_, err = updOp.Get(ctxWithTimeout)
1044+
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
1045+
defer cancel()
1046+
1047+
_, err := startOp.Get(ctxWithTimeout)
10561048
require.EqualError(s.T(), err, "context deadline exceeded: operation was not executed")
10571049
}
10581050

@@ -1092,22 +1084,26 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() {
10921084
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
10931085
DoAndReturn(tt.respFunc)
10941086

1095-
updOp := NewUpdateWithStartWorkflowOperation(
1096-
UpdateWorkflowOptions{
1097-
UpdateName: "update",
1098-
WaitForStage: WorkflowUpdateStageCompleted,
1099-
})
1087+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
1088+
StartWorkflowOptions{
1089+
ID: workflowID,
1090+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1091+
TaskQueue: taskqueue,
1092+
}, workflowType,
1093+
)
11001094

11011095
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
11021096
defer cancel()
11031097

1104-
_, err := s.workflowClient.ExecuteWorkflow(
1098+
_, err := s.workflowClient.UpdateWithStartWorkflow(
11051099
ctxWithTimeout,
1106-
StartWorkflowOptions{
1107-
ID: workflowID,
1108-
TaskQueue: taskqueue,
1109-
WithStartOperation: updOp,
1110-
}, workflowType,
1100+
UpdateWithStartWorkflowOptions{
1101+
UpdateOptions: UpdateWorkflowOptions{
1102+
UpdateName: "update",
1103+
WaitForStage: WorkflowUpdateStageCompleted,
1104+
},
1105+
StartWorkflowOperation: startOp,
1106+
},
11111107
)
11121108

11131109
var expectedErr *WorkflowUpdateServiceTimeoutOrCanceledError
@@ -1122,20 +1118,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError(
11221118
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
11231119
Return(nil, serviceerror.NewInternal("internal error")).Times(1)
11241120

1125-
updOp := NewUpdateWithStartWorkflowOperation(
1126-
UpdateWorkflowOptions{
1127-
UpdateName: "update",
1128-
WaitForStage: WorkflowUpdateStageCompleted,
1129-
})
1130-
1131-
_, err := s.workflowClient.ExecuteWorkflow(
1132-
context.Background(),
1121+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
11331122
StartWorkflowOptions{
1134-
ID: workflowID,
1135-
TaskQueue: taskqueue,
1136-
WithStartOperation: updOp,
1123+
ID: workflowID,
1124+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1125+
TaskQueue: taskqueue,
11371126
}, workflowType,
11381127
)
1128+
1129+
_, err := s.workflowClient.UpdateWithStartWorkflow(
1130+
context.Background(),
1131+
UpdateWithStartWorkflowOptions{
1132+
UpdateOptions: UpdateWorkflowOptions{
1133+
UpdateName: "update",
1134+
WaitForStage: WorkflowUpdateStageCompleted,
1135+
},
1136+
StartWorkflowOperation: startOp,
1137+
},
1138+
)
11391139
s.ErrorContains(err, "internal error")
11401140
}
11411141

@@ -1146,20 +1146,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerResponseCountMism
11461146
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{},
11471147
}, nil).Times(1)
11481148

1149-
updOp := NewUpdateWithStartWorkflowOperation(
1150-
UpdateWorkflowOptions{
1151-
UpdateName: "update",
1152-
WaitForStage: WorkflowUpdateStageCompleted,
1153-
})
1154-
1155-
_, err := s.workflowClient.ExecuteWorkflow(
1156-
context.Background(),
1149+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
11571150
StartWorkflowOptions{
1158-
ID: workflowID,
1159-
TaskQueue: taskqueue,
1160-
WithStartOperation: updOp,
1151+
ID: workflowID,
1152+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1153+
TaskQueue: taskqueue,
11611154
}, workflowType,
11621155
)
1156+
1157+
_, err := s.workflowClient.UpdateWithStartWorkflow(
1158+
context.Background(),
1159+
UpdateWithStartWorkflowOptions{
1160+
UpdateOptions: UpdateWorkflowOptions{
1161+
UpdateName: "update",
1162+
WaitForStage: WorkflowUpdateStageCompleted,
1163+
},
1164+
StartWorkflowOperation: startOp,
1165+
},
1166+
)
11631167
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation results")
11641168
}
11651169

@@ -1168,20 +1172,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerErrorResponseCoun
11681172
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
11691173
Return(nil, serviceerror.NewMultiOperationExecution("Error", []error{})).Times(1)
11701174

1171-
updOp := NewUpdateWithStartWorkflowOperation(
1172-
UpdateWorkflowOptions{
1173-
UpdateName: "update",
1174-
WaitForStage: WorkflowUpdateStageCompleted,
1175-
})
1176-
1177-
_, err := s.workflowClient.ExecuteWorkflow(
1178-
context.Background(),
1175+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
11791176
StartWorkflowOptions{
1180-
ID: workflowID,
1181-
TaskQueue: taskqueue,
1182-
WithStartOperation: updOp,
1177+
ID: workflowID,
1178+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1179+
TaskQueue: taskqueue,
11831180
}, workflowType,
11841181
)
1182+
1183+
_, err := s.workflowClient.UpdateWithStartWorkflow(
1184+
context.Background(),
1185+
UpdateWithStartWorkflowOptions{
1186+
UpdateOptions: UpdateWorkflowOptions{
1187+
UpdateName: "update",
1188+
WaitForStage: WorkflowUpdateStageCompleted,
1189+
},
1190+
StartWorkflowOperation: startOp,
1191+
},
1192+
)
11851193
s.ErrorContains(err, "invalid server response: 0 instead of 2 operation errors")
11861194
}
11871195

@@ -1197,20 +1205,24 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerStartResponseType
11971205
},
11981206
}, nil).Times(1)
11991207

1200-
updOp := NewUpdateWithStartWorkflowOperation(
1201-
UpdateWorkflowOptions{
1202-
UpdateName: "update",
1203-
WaitForStage: WorkflowUpdateStageCompleted,
1204-
})
1205-
1206-
_, err := s.workflowClient.ExecuteWorkflow(
1207-
context.Background(),
1208+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
12081209
StartWorkflowOptions{
1209-
ID: workflowID,
1210-
TaskQueue: taskqueue,
1211-
WithStartOperation: updOp,
1210+
ID: workflowID,
1211+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1212+
TaskQueue: taskqueue,
12121213
}, workflowType,
12131214
)
1215+
1216+
_, err := s.workflowClient.UpdateWithStartWorkflow(
1217+
context.Background(),
1218+
UpdateWithStartWorkflowOptions{
1219+
UpdateOptions: UpdateWorkflowOptions{
1220+
UpdateName: "update",
1221+
WaitForStage: WorkflowUpdateStageCompleted,
1222+
},
1223+
StartWorkflowOperation: startOp,
1224+
},
1225+
)
12141226
s.ErrorContains(err, "invalid server response: StartWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow")
12151227
}
12161228

@@ -1220,28 +1232,36 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_ServerUpdateResponseTyp
12201232
Return(&workflowservice.ExecuteMultiOperationResponse{
12211233
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{
12221234
{
1223-
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{},
1235+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{
1236+
StartWorkflow: &workflowservice.StartWorkflowExecutionResponse{
1237+
RunId: "RUN_ID",
1238+
},
1239+
},
12241240
},
12251241
{
12261242
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{}, // wrong!
12271243
},
12281244
},
12291245
}, nil).Times(1)
12301246

1231-
updOp := NewUpdateWithStartWorkflowOperation(
1232-
UpdateWorkflowOptions{
1233-
UpdateName: "update",
1234-
WaitForStage: WorkflowUpdateStageCompleted,
1235-
})
1236-
1237-
_, err := s.workflowClient.ExecuteWorkflow(
1238-
context.Background(),
1247+
startOp := s.workflowClient.NewWithStartWorkflowOperation(
12391248
StartWorkflowOptions{
1240-
ID: workflowID,
1241-
TaskQueue: taskqueue,
1242-
WithStartOperation: updOp,
1249+
ID: workflowID,
1250+
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
1251+
TaskQueue: taskqueue,
12431252
}, workflowType,
12441253
)
1254+
1255+
_, err := s.workflowClient.UpdateWithStartWorkflow(
1256+
context.Background(),
1257+
UpdateWithStartWorkflowOptions{
1258+
UpdateOptions: UpdateWorkflowOptions{
1259+
UpdateName: "update",
1260+
WaitForStage: WorkflowUpdateStageCompleted,
1261+
},
1262+
StartWorkflowOperation: startOp,
1263+
},
1264+
)
12451265
s.ErrorContains(err, "invalid server response: UpdateWorkflow response has the wrong type *workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow")
12461266
}
12471267

@@ -1358,15 +1378,6 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowValidation() {
13581378
context.Background(), "workflow-id-1", "my-signal", "my-signal-value",
13591379
StartWorkflowOptions{ID: "workflow-id-2"}, workflowType)
13601380
s.ErrorContains(err, "workflow ID from options not used")
1361-
1362-
// unsupported WithStartOperation
1363-
_, err = s.client.SignalWithStartWorkflow(
1364-
context.Background(), "workflow-id", "my-signal", "my-signal-value",
1365-
StartWorkflowOptions{
1366-
ID: "workflow-id",
1367-
WithStartOperation: &UpdateWithStartWorkflowOperation{},
1368-
}, workflowType)
1369-
s.ErrorContains(err, "option WithStartOperation is not allowed")
13701381
}
13711382

13721383
func (s *workflowClientTestSuite) TestStartWorkflow() {

‎internal/nexus_operations.go

+9
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,10 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
264264
return run, nil
265265
}
266266

267+
func (t *testSuiteClientForNexusOperations) NewWithStartWorkflowOperation(options StartWorkflowOptions, workflow interface{}, args ...interface{}) WithStartWorkflowOperation {
268+
panic("not implemented in the test environment")
269+
}
270+
267271
// GetSearchAttributes implements Client.
268272
func (t *testSuiteClientForNexusOperations) GetSearchAttributes(ctx context.Context) (*workflowservice.GetSearchAttributesResponse, error) {
269273
panic("not implemented in the test environment")
@@ -379,6 +383,11 @@ func (t *testSuiteClientForNexusOperations) UpdateWorkflow(ctx context.Context,
379383
panic("unimplemented in the test environment")
380384
}
381385

386+
// UpdateWithStartWorkflow implements Client.
387+
func (t *testSuiteClientForNexusOperations) UpdateWithStartWorkflow(ctx context.Context, options UpdateWithStartWorkflowOptions) (WorkflowUpdateHandle, error) {
388+
panic("unimplemented in the test environment")
389+
}
390+
382391
// UpdateWorkerBuildIdCompatibility implements Client.
383392
func (t *testSuiteClientForNexusOperations) UpdateWorkerBuildIdCompatibility(ctx context.Context, options *UpdateWorkerBuildIdCompatibilityOptions) error {
384393
panic("not implemented in the test environment")

‎mocks/Client.go

+53-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎test/integration_test.go

+186-126
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.