Skip to content

Commit bcfa85a

Browse files
authoredMay 28, 2024··
Workflow update client API refactor (#1489)
* Add UpdateLifeCycleStage enum * Update enum naming to WorkflowUpdateStage * Replace UpdateWorkflowWithOptions with UpdateWorkflow
1 parent a6ca6a5 commit bcfa85a

11 files changed

+537
-215
lines changed
 

‎.github/workflows/ci.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ jobs:
115115
working-directory: test
116116

117117
features-test:
118-
uses: temporalio/features/.github/workflows/go.yaml@main
118+
uses: temporalio/features/.github/workflows/go.yaml@go-sdk-update-refactor
119119
with:
120120
go-repo-path: ${{github.event.pull_request.head.repo.full_name}}
121121
version: ${{github.event.pull_request.head.ref}}
122+
features-repo-ref: go-sdk-update-refactor
122123
version-is-repo-ref: true

‎client/client.go

+25-14
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,25 @@ const (
6464
TaskReachabilityClosedWorkflows = internal.TaskReachabilityClosedWorkflows
6565
)
6666

67+
// WorkflowUpdateStage indicates the stage of an update request.
68+
// NOTE: Experimental
69+
type WorkflowUpdateStage = internal.WorkflowUpdateStage
70+
71+
const (
72+
// WorkflowUpdateStageUnspecified indicates the wait stage was not specified
73+
// NOTE: Experimental
74+
WorkflowUpdateStageUnspecified = internal.WorkflowUpdateStageUnspecified
75+
// WorkflowUpdateStageAdmitted indicates the update is admitted
76+
// NOTE: Experimental
77+
WorkflowUpdateStageAdmitted = internal.WorkflowUpdateStageAdmitted
78+
// WorkflowUpdateStageAccepted indicates the update is accepted
79+
// NOTE: Experimental
80+
WorkflowUpdateStageAccepted = internal.WorkflowUpdateStageAccepted
81+
// WorkflowUpdateStageCompleted indicates the update is completed
82+
// NOTE: Experimental
83+
WorkflowUpdateStageCompleted = internal.WorkflowUpdateStageCompleted
84+
)
85+
6786
const (
6887
// DefaultHostPort is the host:port which is used if not passed with options.
6988
DefaultHostPort = internal.LocalHostPort
@@ -196,10 +215,10 @@ type (
196215
// ScheduleBackfillOptions configure the parameters for backfilling a schedule.
197216
ScheduleBackfillOptions = internal.ScheduleBackfillOptions
198217

199-
// UpdateWorkflowWithOptionsRequest encapsulates the parameters for
218+
// UpdateWorkflowOptions encapsulates the parameters for
200219
// sending an update to a workflow execution.
201-
// WARNING: Worker versioning is currently experimental
202-
UpdateWorkflowWithOptionsRequest = internal.UpdateWorkflowWithOptionsRequest
220+
// NOTE: Experimental
221+
UpdateWorkflowOptions = internal.UpdateWorkflowOptions
203222

204223
// WorkflowUpdateHandle represents a running or completed workflow
205224
// execution update and gives the holder access to the outcome of the same.
@@ -566,24 +585,16 @@ type (
566585
// API. If the check fails, an error is returned.
567586
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
568587

569-
// UpdateWorkflow issues an update request to the specified
570-
// workflow execution and returns the result synchronously. Calling this
571-
// function is equivalent to calling UpdateWorkflowWithOptions with
572-
// the same arguments and indicating that the RPC call should wait for
573-
// completion of the update process.
574-
// NOTE: Experimental
575-
UpdateWorkflow(ctx context.Context, workflowID string, workflowRunID string, updateName string, args ...interface{}) (WorkflowUpdateHandle, error)
576-
577-
// UpdateWorkflowWithOptions issues an update request to the
588+
// UpdateWorkflow issues an update request to the
578589
// specified workflow execution and returns a handle to the update that
579590
// is running in in parallel with the calling thread. Errors returned
580591
// from the server will be exposed through the return value of
581592
// WorkflowUpdateHandle.Get(). Errors that occur before the
582593
// update is requested (e.g. if the required workflow ID field is
583-
// missing from the UpdateWorkflowWithOptionsRequest) are returned
594+
// missing from the UpdateWorkflowOptions) are returned
584595
// directly from this function call.
585596
// NOTE: Experimental
586-
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)
597+
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)
587598

588599
// GetWorkflowUpdateHandle creates a handle to the referenced update
589600
// which can be polled for an outcome. Note that runID is optional and

‎internal/client.go

+3-11
Original file line numberDiff line numberDiff line change
@@ -356,24 +356,16 @@ type (
356356
// API. If the check fails, an error is returned.
357357
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
358358

359-
// UpdateWorkflow issues an update request to the specified
360-
// workflow execution and returns the result synchronously. Calling this
361-
// function is equivalent to calling UpdateWorkflowWithOptions with
362-
// the same arguments and indicating that the RPC call should wait for
363-
// completion of the update process.
364-
// NOTE: Experimental
365-
UpdateWorkflow(ctx context.Context, workflowID string, workflowRunID string, updateName string, args ...interface{}) (WorkflowUpdateHandle, error)
366-
367-
// UpdateWorkflowWithOptions issues an update request to the
359+
// UpdateWorkflow issues an update request to the
368360
// specified workflow execution and returns a handle to the update that
369361
// is running in in parallel with the calling thread. Errors returned
370362
// from the server will be exposed through the return value of
371363
// WorkflowExecutionUpdateHandle.Get(). Errors that occur before the
372364
// update is requested (e.g. if the required workflow ID field is
373-
// missing from the UpdateWorkflowWithOptionsRequest) are returned
365+
// missing from the UpdateWorkflowOptions) are returned
374366
// directly from this function call.
375367
// NOTE: Experimental
376-
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)
368+
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)
377369

378370
// GetWorkflowUpdateHandle creates a handle to the referenced update
379371
// which can be polled for an outcome. Note that runID is optional and

‎internal/interceptor.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ type ClientOutboundInterceptor interface {
335335
// server.
336336
//
337337
// NOTE: Experimental
338-
PollWorkflowUpdate(context.Context, *ClientPollWorkflowUpdateInput) (converter.EncodedValue, error)
338+
PollWorkflowUpdate(context.Context, *ClientPollWorkflowUpdateInput) (*ClientPollWorkflowUpdateOutput, error)
339339

340340
mustEmbedClientOutboundInterceptorBase()
341341
}
@@ -351,7 +351,7 @@ type ClientUpdateWorkflowInput struct {
351351
Args []interface{}
352352
RunID string
353353
FirstExecutionRunID string
354-
WaitPolicy *updatepb.WaitPolicy
354+
WaitForStage WorkflowUpdateStage
355355
}
356356

357357
// ClientPollWorkflowUpdateInput is the input to
@@ -360,6 +360,15 @@ type ClientPollWorkflowUpdateInput struct {
360360
UpdateRef *updatepb.UpdateRef
361361
}
362362

363+
// ClientPollWorkflowUpdateOutput is the output to
364+
// ClientOutboundInterceptor.PollWorkflowUpdate.
365+
type ClientPollWorkflowUpdateOutput struct {
366+
// Result is the result of the update, if it has completed successfully.
367+
Result converter.EncodedValue
368+
// Error is the result of a failed update.
369+
Error error
370+
}
371+
363372
// ScheduleClientCreateInput is the input to
364373
// ClientOutboundInterceptor.CreateSchedule.
365374
type ScheduleClientCreateInput struct {

‎internal/interceptor_base.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ func (c *ClientOutboundInterceptorBase) UpdateWorkflow(
425425
func (c *ClientOutboundInterceptorBase) PollWorkflowUpdate(
426426
ctx context.Context,
427427
in *ClientPollWorkflowUpdateInput,
428-
) (converter.EncodedValue, error) {
428+
) (*ClientPollWorkflowUpdateOutput, error) {
429429
return c.Next.PollWorkflowUpdate(ctx, in)
430430
}
431431

‎internal/internal_update.go

+30
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"reflect"
3131

3232
commonpb "go.temporal.io/api/common/v1"
33+
enumspb "go.temporal.io/api/enums/v1"
3334
historypb "go.temporal.io/api/history/v1"
3435
protocolpb "go.temporal.io/api/protocol/v1"
3536
updatepb "go.temporal.io/api/update/v1"
@@ -39,6 +40,20 @@ import (
3940

4041
type updateState string
4142

43+
// WorkflowUpdateStage indicates the stage of an update request.
44+
type WorkflowUpdateStage int
45+
46+
const (
47+
// WorkflowUpdateStageUnspecified indicates the wait stage was not specified
48+
WorkflowUpdateStageUnspecified WorkflowUpdateStage = iota
49+
// WorkflowUpdateStageAdmitted indicates the update is admitted
50+
WorkflowUpdateStageAdmitted
51+
// WorkflowUpdateStageAccepted indicates the update is accepted
52+
WorkflowUpdateStageAccepted
53+
// WorkflowUpdateStageCompleted indicates the update is completed
54+
WorkflowUpdateStageCompleted
55+
)
56+
4257
const (
4358
updateStateNew updateState = "New"
4459
updateStateRequestInitiated updateState = "RequestScheduled"
@@ -453,3 +468,18 @@ func validateUpdateHandlerFn(fn interface{}) error {
453468
}
454469
return nil
455470
}
471+
472+
func updateLifeCycleStageToProto(l WorkflowUpdateStage) enumspb.UpdateWorkflowExecutionLifecycleStage {
473+
switch l {
474+
case WorkflowUpdateStageUnspecified:
475+
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED
476+
case WorkflowUpdateStageAdmitted:
477+
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED
478+
case WorkflowUpdateStageAccepted:
479+
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
480+
case WorkflowUpdateStageCompleted:
481+
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
482+
default:
483+
panic("unknown update lifecycle stage")
484+
}
485+
}

‎internal/internal_workflow_client.go

+102-76
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ import (
3737

3838
"github.com/pborman/uuid"
3939
"google.golang.org/grpc"
40-
"google.golang.org/grpc/codes"
4140
healthpb "google.golang.org/grpc/health/grpc_health_v1"
42-
"google.golang.org/grpc/status"
4341
"google.golang.org/protobuf/types/known/durationpb"
4442

4543
commonpb "go.temporal.io/api/common/v1"
@@ -753,8 +751,8 @@ func (wc *WorkflowClient) QueryWorkflow(ctx context.Context, workflowID string,
753751
})
754752
}
755753

756-
// UpdateWorkflowWithOptionsRequest is the request to UpdateWorkflowWithOptions
757-
type UpdateWorkflowWithOptionsRequest struct {
754+
// UpdateWorkflowOptions is the request to UpdateWorkflow
755+
type UpdateWorkflowOptions struct {
758756
// UpdateID is an application-layer identifier for the requested update. It
759757
// must be unique within the scope of a Namespace+WorkflowID+RunID.
760758
UpdateID string
@@ -776,13 +774,15 @@ type UpdateWorkflowWithOptionsRequest struct {
776774
// update.
777775
Args []interface{}
778776

777+
// WaitForStage is a required field which specifies which stage to wait until returning.
778+
// See https://docs.temporal.io/workflows#update for more details.
779+
// NOTE: Specifying WorkflowUpdateStageAdmitted is not supported.
780+
WaitForStage WorkflowUpdateStage
781+
779782
// FirstExecutionRunID specifies the RunID expected to identify the first
780783
// run in the workflow execution chain. If this expectation does not match
781784
// then the server will reject the update request with an error.
782785
FirstExecutionRunID string
783-
784-
// How this RPC should block on the server before returning.
785-
WaitPolicy *updatepb.WaitPolicy
786786
}
787787

788788
// WorkflowUpdateHandle is a handle to a workflow execution update process. The
@@ -1036,32 +1036,6 @@ func (wc *WorkflowClient) GetWorkerTaskReachability(ctx context.Context, options
10361036
return converted, nil
10371037
}
10381038

1039-
func (wc *WorkflowClient) UpdateWorkflowWithOptions(
1040-
ctx context.Context,
1041-
req *UpdateWorkflowWithOptionsRequest,
1042-
) (WorkflowUpdateHandle, error) {
1043-
if err := wc.ensureInitialized(ctx); err != nil {
1044-
return nil, err
1045-
}
1046-
// Default update ID
1047-
updateID := req.UpdateID
1048-
if updateID == "" {
1049-
updateID = uuid.New()
1050-
}
1051-
1052-
ctx = contextWithNewHeader(ctx)
1053-
1054-
return wc.interceptor.UpdateWorkflow(ctx, &ClientUpdateWorkflowInput{
1055-
UpdateID: updateID,
1056-
WorkflowID: req.WorkflowID,
1057-
UpdateName: req.UpdateName,
1058-
Args: req.Args,
1059-
RunID: req.RunID,
1060-
FirstExecutionRunID: req.FirstExecutionRunID,
1061-
WaitPolicy: req.WaitPolicy,
1062-
})
1063-
}
1064-
10651039
func (wc *WorkflowClient) GetWorkflowUpdateHandle(ref GetWorkflowUpdateHandleOptions) WorkflowUpdateHandle {
10661040
return &lazyUpdateHandle{
10671041
client: wc,
@@ -1082,7 +1056,7 @@ func (wc *WorkflowClient) GetWorkflowUpdateHandle(ref GetWorkflowUpdateHandleOpt
10821056
func (wc *WorkflowClient) PollWorkflowUpdate(
10831057
ctx context.Context,
10841058
ref *updatepb.UpdateRef,
1085-
) (converter.EncodedValue, error) {
1059+
) (*ClientPollWorkflowUpdateOutput, error) {
10861060
if err := wc.ensureInitialized(ctx); err != nil {
10871061
return nil, err
10881062
}
@@ -1094,22 +1068,35 @@ func (wc *WorkflowClient) PollWorkflowUpdate(
10941068

10951069
func (wc *WorkflowClient) UpdateWorkflow(
10961070
ctx context.Context,
1097-
workflowID string,
1098-
workflowRunID string,
1099-
updateName string,
1100-
args ...interface{},
1071+
opt UpdateWorkflowOptions,
11011072
) (WorkflowUpdateHandle, error) {
11021073
if err := wc.ensureInitialized(ctx); err != nil {
11031074
return nil, err
11041075
}
1076+
// Default update ID
1077+
updateID := opt.UpdateID
1078+
if updateID == "" {
1079+
updateID = uuid.New()
1080+
}
1081+
1082+
if opt.WaitForStage == WorkflowUpdateStageUnspecified {
1083+
return nil, errors.New("WaitForStage must be specified")
1084+
}
1085+
1086+
if opt.WaitForStage == WorkflowUpdateStageAdmitted {
1087+
return nil, errors.New("WaitForStage WorkflowUpdateStageAdmitted is not supported")
1088+
}
11051089

11061090
ctx = contextWithNewHeader(ctx)
11071091

11081092
return wc.interceptor.UpdateWorkflow(ctx, &ClientUpdateWorkflowInput{
1109-
WorkflowID: workflowID,
1110-
UpdateName: updateName,
1111-
UpdateID: uuid.New(),
1112-
Args: args,
1093+
UpdateID: updateID,
1094+
WorkflowID: opt.WorkflowID,
1095+
UpdateName: opt.UpdateName,
1096+
Args: opt.Args,
1097+
RunID: opt.RunID,
1098+
FirstExecutionRunID: opt.FirstExecutionRunID,
1099+
WaitForStage: opt.WaitForStage,
11131100
})
11141101
}
11151102

@@ -1792,31 +1779,65 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
17921779
if err != nil {
17931780
return nil, err
17941781
}
1795-
grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx))
1796-
defer cancel()
1797-
wfexec := &commonpb.WorkflowExecution{
1798-
WorkflowId: in.WorkflowID,
1799-
RunId: in.RunID,
1800-
}
1801-
resp, err := w.client.workflowService.UpdateWorkflowExecution(grpcCtx, &workflowservice.UpdateWorkflowExecutionRequest{
1802-
WaitPolicy: in.WaitPolicy,
1803-
Namespace: w.client.namespace,
1804-
WorkflowExecution: wfexec,
1805-
FirstExecutionRunId: in.FirstExecutionRunID,
1806-
Request: &updatepb.Request{
1807-
Meta: &updatepb.Meta{
1808-
UpdateId: in.UpdateID,
1809-
Identity: w.client.identity,
1810-
},
1811-
Input: &updatepb.Input{
1812-
Header: header,
1813-
Name: in.UpdateName,
1814-
Args: argPayloads,
1815-
},
1816-
},
1817-
})
1818-
if err != nil {
1819-
return nil, err
1782+
desiredLifecycleStage := updateLifeCycleStageToProto(in.WaitForStage)
1783+
var resp *workflowservice.UpdateWorkflowExecutionResponse
1784+
for {
1785+
var err error
1786+
resp, err = func() (*workflowservice.UpdateWorkflowExecutionResponse, error) {
1787+
grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx))
1788+
defer cancel()
1789+
wfexec := &commonpb.WorkflowExecution{
1790+
WorkflowId: in.WorkflowID,
1791+
RunId: in.RunID,
1792+
}
1793+
return w.client.workflowService.UpdateWorkflowExecution(grpcCtx, &workflowservice.UpdateWorkflowExecutionRequest{
1794+
WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: desiredLifecycleStage},
1795+
Namespace: w.client.namespace,
1796+
WorkflowExecution: wfexec,
1797+
FirstExecutionRunId: in.FirstExecutionRunID,
1798+
Request: &updatepb.Request{
1799+
Meta: &updatepb.Meta{
1800+
UpdateId: in.UpdateID,
1801+
Identity: w.client.identity,
1802+
},
1803+
Input: &updatepb.Input{
1804+
Header: header,
1805+
Name: in.UpdateName,
1806+
Args: argPayloads,
1807+
},
1808+
},
1809+
})
1810+
}()
1811+
if err != nil {
1812+
return nil, err
1813+
}
1814+
// Once the update is past admitted we know it is durable
1815+
// Note: old server version may return UNSPECIFIED if the update request
1816+
// did not reach the desired lifecycle stage.
1817+
if resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED &&
1818+
resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED {
1819+
break
1820+
}
1821+
}
1822+
// Here we know the update is at least accepted
1823+
if desiredLifecycleStage == enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED &&
1824+
resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED {
1825+
// TODO(https://github.com/temporalio/features/issues/428) replace with handle wait for stage once implemented
1826+
pollResp, err := w.client.PollWorkflowUpdate(ctx, resp.GetUpdateRef())
1827+
if err != nil {
1828+
return nil, err
1829+
}
1830+
if pollResp.Error != nil {
1831+
return &completedUpdateHandle{
1832+
err: pollResp.Error,
1833+
baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()},
1834+
}, nil
1835+
} else {
1836+
return &completedUpdateHandle{
1837+
value: pollResp.Result,
1838+
baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()},
1839+
}, nil
1840+
}
18201841
}
18211842
switch v := resp.GetOutcome().GetValue().(type) {
18221843
case nil:
@@ -1841,7 +1862,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
18411862
func (w *workflowClientInterceptor) PollWorkflowUpdate(
18421863
parentCtx context.Context,
18431864
in *ClientPollWorkflowUpdateInput,
1844-
) (converter.EncodedValue, error) {
1865+
) (*ClientPollWorkflowUpdateOutput, error) {
18451866
// header, _ = headerPropagated(ctx, w.client.contextPropagators)
18461867
// todo header not in PollWorkflowUpdate
18471868

@@ -1866,19 +1887,21 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate(
18661887
)
18671888
resp, err := w.client.workflowService.PollWorkflowExecutionUpdate(ctx, &pollReq)
18681889
cancel()
1869-
if err == context.DeadlineExceeded ||
1870-
status.Code(err) == codes.DeadlineExceeded ||
1871-
(err == nil && resp.GetOutcome() == nil) {
1890+
if err == nil && resp.GetOutcome() == nil {
18721891
continue
18731892
}
18741893
if err != nil {
18751894
return nil, err
18761895
}
18771896
switch v := resp.GetOutcome().GetValue().(type) {
18781897
case *updatepb.Outcome_Failure:
1879-
return nil, w.client.failureConverter.FailureToError(v.Failure)
1898+
return &ClientPollWorkflowUpdateOutput{
1899+
Error: w.client.failureConverter.FailureToError(v.Failure),
1900+
}, nil
18801901
case *updatepb.Outcome_Success:
1881-
return newEncodedValue(v.Success, w.client.dataConverter), nil
1902+
return &ClientPollWorkflowUpdateOutput{
1903+
Result: newEncodedValue(v.Success, w.client.dataConverter),
1904+
}, nil
18821905
default:
18831906
return nil, fmt.Errorf("unsupported outcome type %T", v)
18841907
}
@@ -1912,11 +1935,14 @@ func (ch *completedUpdateHandle) Get(ctx context.Context, valuePtr interface{})
19121935
}
19131936

19141937
func (luh *lazyUpdateHandle) Get(ctx context.Context, valuePtr interface{}) error {
1915-
enc, err := luh.client.PollWorkflowUpdate(ctx, luh.ref)
1916-
if err != nil || valuePtr == nil {
1938+
resp, err := luh.client.PollWorkflowUpdate(ctx, luh.ref)
1939+
if err != nil {
19171940
return err
19181941
}
1919-
return enc.Get(valuePtr)
1942+
if resp.Error != nil || valuePtr == nil {
1943+
return resp.Error
1944+
}
1945+
return resp.Result.Get(valuePtr)
19201946
}
19211947

19221948
func (q *queryRejectedError) Error() string {

‎internal/internal_workflow_client_test.go

+113-31
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
updatepb "go.temporal.io/api/update/v1"
3636
workflowpb "go.temporal.io/api/workflow/v1"
3737
"google.golang.org/grpc"
38-
"google.golang.org/grpc/codes"
3938
"google.golang.org/grpc/connectivity"
4039

4140
ilog "go.temporal.io/sdk/internal/log"
@@ -49,7 +48,6 @@ import (
4948
"go.temporal.io/api/serviceerror"
5049
"go.temporal.io/api/workflowservice/v1"
5150
"go.temporal.io/api/workflowservicemock/v1"
52-
"google.golang.org/grpc/status"
5351

5452
"go.temporal.io/sdk/converter"
5553
"go.temporal.io/sdk/internal/common/metrics"
@@ -1719,31 +1717,31 @@ func TestUpdate(t *testing.T) {
17191717
}
17201718

17211719
const (
1722-
sync = enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
1723-
async = enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
1720+
sync = WorkflowUpdateStageCompleted
1721+
async = WorkflowUpdateStageAccepted
17241722
)
17251723

17261724
newRequest := func(
17271725
t *testing.T,
1728-
stage enumspb.UpdateWorkflowExecutionLifecycleStage,
1729-
) *UpdateWorkflowWithOptionsRequest {
1726+
stage WorkflowUpdateStage,
1727+
) UpdateWorkflowOptions {
17301728
t.Helper()
1731-
return &UpdateWorkflowWithOptionsRequest{
1732-
UpdateID: fmt.Sprintf("%v-update_id", t.Name()),
1733-
WorkflowID: fmt.Sprintf("%v-workflow_id", t.Name()),
1734-
RunID: fmt.Sprintf("%v-run_id", t.Name()),
1735-
UpdateName: fmt.Sprintf("%v-update_name", t.Name()),
1736-
WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: stage},
1729+
return UpdateWorkflowOptions{
1730+
UpdateID: fmt.Sprintf("%v-update_id", t.Name()),
1731+
WorkflowID: fmt.Sprintf("%v-workflow_id", t.Name()),
1732+
RunID: fmt.Sprintf("%v-run_id", t.Name()),
1733+
UpdateName: fmt.Sprintf("%v-update_name", t.Name()),
1734+
WaitForStage: stage,
17371735
}
17381736
}
17391737

1740-
refFromRequest := func(req *UpdateWorkflowWithOptionsRequest) *updatepb.UpdateRef {
1738+
refFromRequest := func(opt UpdateWorkflowOptions) *updatepb.UpdateRef {
17411739
return &updatepb.UpdateRef{
17421740
WorkflowExecution: &commonpb.WorkflowExecution{
1743-
WorkflowId: req.WorkflowID,
1744-
RunId: req.RunID,
1741+
WorkflowId: opt.WorkflowID,
1742+
RunId: opt.RunID,
17451743
},
1746-
UpdateId: req.UpdateName,
1744+
UpdateId: opt.UpdateName,
17471745
}
17481746
}
17491747

@@ -1763,10 +1761,11 @@ func TestUpdate(t *testing.T) {
17631761
&workflowservice.UpdateWorkflowExecutionResponse{
17641762
UpdateRef: refFromRequest(req),
17651763
Outcome: mustOutcome(t, want),
1764+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
17661765
},
17671766
nil,
17681767
)
1769-
handle, err := client.UpdateWorkflowWithOptions(context.TODO(), req)
1768+
handle, err := client.UpdateWorkflow(context.TODO(), req)
17701769
require.NoError(t, err)
17711770
var got string
17721771
err = handle.Get(context.TODO(), &got)
@@ -1785,10 +1784,11 @@ func TestUpdate(t *testing.T) {
17851784
&workflowservice.UpdateWorkflowExecutionResponse{
17861785
UpdateRef: refFromRequest(req),
17871786
Outcome: mustOutcome(t, want),
1787+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
17881788
},
17891789
nil,
17901790
)
1791-
handle, err := client.UpdateWorkflowWithOptions(context.TODO(), req)
1791+
handle, err := client.UpdateWorkflow(context.TODO(), req)
17921792
require.NoError(t, err)
17931793
var got string
17941794
err = handle.Get(context.TODO(), &got)
@@ -1804,6 +1804,7 @@ func TestUpdate(t *testing.T) {
18041804
&workflowservice.UpdateWorkflowExecutionResponse{
18051805
UpdateRef: refFromRequest(req),
18061806
Outcome: nil, // async invocation - outcome unknown
1807+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
18071808
},
18081809
nil,
18091810
)
@@ -1814,7 +1815,7 @@ func TestUpdate(t *testing.T) {
18141815
},
18151816
nil,
18161817
).Times(2)
1817-
handle, err := client.UpdateWorkflowWithOptions(context.TODO(), req)
1818+
handle, err := client.UpdateWorkflow(context.TODO(), req)
18181819
require.NoError(t, err)
18191820
var got string
18201821
err = handle.Get(context.TODO(), &got)
@@ -1824,7 +1825,7 @@ func TestUpdate(t *testing.T) {
18241825
err = handle.Get(context.TODO(), nil)
18251826
require.NoError(t, err)
18261827
})
1827-
t.Run("async error", func(t *testing.T) {
1828+
t.Run("async delayed accepted", func(t *testing.T) {
18281829
svc, client := init(t)
18291830
want := errors.New("this error was intentional")
18301831
req := newRequest(t, async)
@@ -1833,6 +1834,7 @@ func TestUpdate(t *testing.T) {
18331834
&workflowservice.UpdateWorkflowExecutionResponse{
18341835
UpdateRef: refFromRequest(req),
18351836
Outcome: nil, // async invocation - outcome unknown
1837+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
18361838
},
18371839
nil,
18381840
)
@@ -1843,14 +1845,14 @@ func TestUpdate(t *testing.T) {
18431845
},
18441846
nil,
18451847
)
1846-
handle, err := client.UpdateWorkflowWithOptions(context.TODO(), req)
1848+
handle, err := client.UpdateWorkflow(context.TODO(), req)
18471849
require.NoError(t, err)
18481850
var got string
18491851
err = handle.Get(context.TODO(), &got)
18501852
require.Error(t, err)
18511853
require.ErrorContains(t, err, want.Error())
18521854
})
1853-
t.Run("internal retry on timeout", func(t *testing.T) {
1855+
t.Run("admitted error", func(t *testing.T) {
18541856
svc, client := init(t)
18551857
want := t.Name()
18561858
req := newRequest(t, async)
@@ -1859,27 +1861,34 @@ func TestUpdate(t *testing.T) {
18591861
&workflowservice.UpdateWorkflowExecutionResponse{
18601862
UpdateRef: refFromRequest(req),
18611863
Outcome: nil, // async invocation - outcome unknown
1864+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED,
1865+
},
1866+
nil,
1867+
).
1868+
Return(
1869+
&workflowservice.UpdateWorkflowExecutionResponse{
1870+
UpdateRef: refFromRequest(req),
1871+
Outcome: nil, // async invocation - outcome unknown
1872+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
18621873
},
18631874
nil,
18641875
)
1865-
svc.EXPECT().PollWorkflowExecutionUpdate(gomock.Any(), gomock.Any()).
1866-
Return(nil, status.Error(codes.DeadlineExceeded, codes.DeadlineExceeded.String()))
18671876
svc.EXPECT().PollWorkflowExecutionUpdate(gomock.Any(), gomock.Any()).
18681877
Return(
18691878
&workflowservice.PollWorkflowExecutionUpdateResponse{
18701879
Outcome: mustOutcome(t, want),
18711880
},
18721881
nil,
1873-
)
1874-
1875-
pollCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1876-
defer cancel()
1877-
handle, err := client.UpdateWorkflowWithOptions(pollCtx, req)
1882+
).Times(2)
1883+
handle, err := client.UpdateWorkflow(context.TODO(), req)
18781884
require.NoError(t, err)
18791885
var got string
1880-
err = handle.Get(pollCtx, &got)
1886+
err = handle.Get(context.TODO(), &got)
18811887
require.NoError(t, err)
18821888
require.Equal(t, want, got)
1889+
// Verify that calling Get with nil does not panic
1890+
err = handle.Get(context.TODO(), nil)
1891+
require.NoError(t, err)
18831892
})
18841893
t.Run("internal retry on nil outcome", func(t *testing.T) {
18851894
svc, client := init(t)
@@ -1890,6 +1899,7 @@ func TestUpdate(t *testing.T) {
18901899
&workflowservice.UpdateWorkflowExecutionResponse{
18911900
UpdateRef: refFromRequest(req),
18921901
Outcome: nil, // async invocation - outcome unknown
1902+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
18931903
},
18941904
nil,
18951905
)
@@ -1908,7 +1918,7 @@ func TestUpdate(t *testing.T) {
19081918
nil,
19091919
)
19101920

1911-
handle, err := client.UpdateWorkflowWithOptions(context.TODO(), req)
1921+
handle, err := client.UpdateWorkflow(context.TODO(), req)
19121922
require.NoError(t, err)
19131923
var got string
19141924
err = handle.Get(context.TODO(), &got)
@@ -1967,4 +1977,76 @@ func TestUpdate(t *testing.T) {
19671977
require.Error(t, err)
19681978
require.Equal(t, callerCtx.Err(), err)
19691979
})
1980+
t.Run("sync delayed success", func(t *testing.T) {
1981+
svc, client := init(t)
1982+
want := t.Name()
1983+
req := newRequest(t, sync)
1984+
svc.EXPECT().
1985+
UpdateWorkflowExecution(gomock.Any(), gomock.Any()).
1986+
Return(
1987+
&workflowservice.UpdateWorkflowExecutionResponse{
1988+
UpdateRef: refFromRequest(req),
1989+
Outcome: nil,
1990+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED,
1991+
},
1992+
nil,
1993+
).
1994+
Return(
1995+
&workflowservice.UpdateWorkflowExecutionResponse{
1996+
UpdateRef: refFromRequest(req),
1997+
Outcome: mustOutcome(t, want),
1998+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
1999+
},
2000+
nil,
2001+
)
2002+
handle, err := client.UpdateWorkflow(context.TODO(), req)
2003+
require.NoError(t, err)
2004+
var got string
2005+
err = handle.Get(context.TODO(), &got)
2006+
require.NoError(t, err)
2007+
require.Equal(t, want, got)
2008+
// Verify that calling Get with nil does not panic
2009+
err = handle.Get(context.TODO(), nil)
2010+
require.NoError(t, err)
2011+
})
2012+
t.Run("sync multiple step success", func(t *testing.T) {
2013+
svc, client := init(t)
2014+
want := t.Name()
2015+
req := newRequest(t, sync)
2016+
svc.EXPECT().
2017+
UpdateWorkflowExecution(gomock.Any(), gomock.Any()).
2018+
Return(
2019+
&workflowservice.UpdateWorkflowExecutionResponse{
2020+
UpdateRef: refFromRequest(req),
2021+
Outcome: nil,
2022+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED,
2023+
},
2024+
nil,
2025+
).
2026+
Return(
2027+
&workflowservice.UpdateWorkflowExecutionResponse{
2028+
UpdateRef: refFromRequest(req),
2029+
Outcome: nil,
2030+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
2031+
},
2032+
nil,
2033+
)
2034+
svc.EXPECT().PollWorkflowExecutionUpdate(gomock.Any(), gomock.Any()).
2035+
Return(
2036+
&workflowservice.PollWorkflowExecutionUpdateResponse{
2037+
Outcome: mustOutcome(t, want),
2038+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
2039+
},
2040+
nil,
2041+
).Times(1)
2042+
handle, err := client.UpdateWorkflow(context.TODO(), req)
2043+
require.NoError(t, err)
2044+
var got string
2045+
err = handle.Get(context.TODO(), &got)
2046+
require.NoError(t, err)
2047+
require.Equal(t, want, got)
2048+
// Verify that calling Get with nil does not panic
2049+
err = handle.Get(context.TODO(), nil)
2050+
require.NoError(t, err)
2051+
})
19702052
}

‎mocks/Client.go

+10-42
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎test/integration_test.go

+224-37
Large diffs are not rendered by default.

‎test/workflow_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,21 @@ func (w *Workflows) ActivityRetryOnHBTimeout(ctx workflow.Context) ([]string, er
326326
return []string{"heartbeatAndSleep", "heartbeatAndSleep", "heartbeatAndSleep"}, nil
327327
}
328328

329+
func (w *Workflows) UpdateBasicWorkflow(ctx workflow.Context) error {
330+
err := workflow.SetUpdateHandler(ctx, "update", func(ctx workflow.Context, t time.Duration) (string, error) {
331+
err := workflow.Sleep(ctx, t)
332+
if err != nil {
333+
return "", err
334+
}
335+
return "test", nil
336+
})
337+
if err != nil {
338+
return errors.New("failed to register update handler")
339+
}
340+
workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil)
341+
return nil
342+
}
343+
329344
func (w *Workflows) UpdateCancelableWorkflow(ctx workflow.Context) error {
330345
err := workflow.SetUpdateHandler(ctx, "update", func(ctx workflow.Context) error {
331346
return workflow.Sleep(ctx, time.Hour)
@@ -3058,6 +3073,7 @@ func (w *Workflows) register(worker worker.Worker) {
30583073
worker.RegisterWorkflow(w.UpdateSettingHandlerInHandler)
30593074
worker.RegisterWorkflow(w.UpdateCancelableWorkflow)
30603075
worker.RegisterWorkflow(w.UpdateHandlerRegisteredLate)
3076+
worker.RegisterWorkflow(w.UpdateBasicWorkflow)
30613077
worker.RegisterWorkflow(w.LocalActivityNextRetryDelay)
30623078
worker.RegisterWorkflow(w.QueryTestWorkflow)
30633079

0 commit comments

Comments
 (0)
Please sign in to comment.