Skip to content

Commit 1fe6141

Browse files
authoredAug 21, 2024··
Support updates in tracing interceptor (#1595)
Support updates in tracing interceptor
1 parent edc3c6c commit 1fe6141

File tree

8 files changed

+290
-46
lines changed

8 files changed

+290
-46
lines changed
 

Diff for: ‎contrib/datadog/tracing/interceptor.go

+4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type TracerOptions struct {
4747
// DisableQueryTracing can be set to disable query tracing.
4848
DisableQueryTracing bool
4949

50+
// DisableUpdateTracing can be set to disable update tracing.
51+
DisableUpdateTracing bool
52+
5053
// OnFinish sets finish options.
5154
// If unset, this will use [tracer.WithError]
5255
// in case [interceptor.TracerFinishSpanOptions.Error] is non-nil and not [workflow.IsContinueAsNewError].
@@ -78,6 +81,7 @@ func NewTracer(opts TracerOptions) interceptor.Tracer {
7881
opts: TracerOptions{
7982
DisableSignalTracing: opts.DisableSignalTracing,
8083
DisableQueryTracing: opts.DisableQueryTracing,
84+
DisableUpdateTracing: opts.DisableUpdateTracing,
8185
OnFinish: opts.OnFinish,
8286
},
8387
}

Diff for: ‎contrib/datadog/tracing/interceptor_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ func TestSpanName(t *testing.T) {
7979
}
8080
interceptortest.RunTestWorkflow(t, testTracer)
8181
// Ensure the naming scheme follows "temporal.${operation}"
82-
require.Equal(t, "temporal.RunWorkflow", testTracer.FinishedSpans()[0].Name)
82+
require.Equal(t, "temporal.ValidateUpdate", testTracer.FinishedSpans()[0].Name)
83+
require.Equal(t, "temporal.HandleUpdate", testTracer.FinishedSpans()[1].Name)
84+
require.Equal(t, "temporal.RunWorkflow", testTracer.FinishedSpans()[2].Name)
8385

8486
}
8587
func Test_tracerImpl_genSpanID(t1 *testing.T) {

Diff for: ‎contrib/opentelemetry/tracing_interceptor.go

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package opentelemetry
2626
import (
2727
"context"
2828
"fmt"
29+
2930
"go.opentelemetry.io/otel"
3031
"go.opentelemetry.io/otel/attribute"
3132
"go.opentelemetry.io/otel/baggage"
@@ -56,6 +57,9 @@ type TracerOptions struct {
5657
// DisableQueryTracing can be set to disable query tracing.
5758
DisableQueryTracing bool
5859

60+
// DisableUpdateTracing can be set to disable update tracing.
61+
DisableUpdateTracing bool
62+
5963
// DisableBaggage can be set to disable baggage propagation.
6064
DisableBaggage bool
6165

@@ -138,6 +142,7 @@ func (t *tracer) Options() interceptor.TracerOptions {
138142
HeaderKey: t.options.HeaderKey,
139143
DisableSignalTracing: t.options.DisableSignalTracing,
140144
DisableQueryTracing: t.options.DisableQueryTracing,
145+
DisableUpdateTracing: t.options.DisableUpdateTracing,
141146
AllowInvalidParentSpans: t.options.AllowInvalidParentSpans,
142147
}
143148
}

Diff for: ‎interceptor/tracing_interceptor.go

+103
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
workflowIDTagKey = "temporalWorkflowID"
4141
runIDTagKey = "temporalRunID"
4242
activityIDTagKey = "temporalActivityID"
43+
updateIDTagKey = "temporalUpdateID"
4344
)
4445

4546
// Tracer is an interface for tracing implementations as used by
@@ -113,6 +114,9 @@ type TracerOptions struct {
113114
// DisableQueryTracing can be set to disable query tracing.
114115
DisableQueryTracing bool
115116

117+
// DisableUpdateTracing can be set to disable update tracing.
118+
DisableUpdateTracing bool
119+
116120
// AllowInvalidParentSpans will swallow errors interpreting parent
117121
// spans from headers. Useful when migrating from one tracing library
118122
// to another, while workflows/activities may be in progress.
@@ -348,6 +352,33 @@ func (t *tracingClientOutboundInterceptor) QueryWorkflow(
348352
return val, err
349353
}
350354

355+
func (t *tracingClientOutboundInterceptor) UpdateWorkflow(
356+
ctx context.Context,
357+
in *ClientUpdateWorkflowInput,
358+
) (client.WorkflowUpdateHandle, error) {
359+
// Only add tracing if enabled
360+
if t.root.options.DisableUpdateTracing {
361+
return t.Next.UpdateWorkflow(ctx, in)
362+
}
363+
// Start span and write to header
364+
span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{
365+
Operation: "UpdateWorkflow",
366+
Name: in.UpdateName,
367+
Tags: map[string]string{workflowIDTagKey: in.WorkflowID},
368+
ToHeader: true,
369+
Time: time.Now(),
370+
})
371+
if err != nil {
372+
return nil, err
373+
}
374+
var finishOpts TracerFinishSpanOptions
375+
defer span.Finish(&finishOpts)
376+
377+
val, err := t.Next.UpdateWorkflow(ctx, in)
378+
finishOpts.Error = err
379+
return val, err
380+
}
381+
351382
type tracingActivityOutboundInterceptor struct {
352383
ActivityOutboundInterceptorBase
353384
root *tracingInterceptor
@@ -515,6 +546,78 @@ func (t *tracingWorkflowInboundInterceptor) HandleQuery(
515546
return val, err
516547
}
517548

549+
func (t *tracingWorkflowInboundInterceptor) ValidateUpdate(
550+
ctx workflow.Context,
551+
in *UpdateInput,
552+
) error {
553+
// Only add tracing if enabled and not replaying
554+
if t.root.options.DisableUpdateTracing {
555+
return t.Next.ValidateUpdate(ctx, in)
556+
}
557+
// Start span reading from header
558+
info := workflow.GetInfo(ctx)
559+
currentUpdateInfo := workflow.GetCurrentUpdateInfo(ctx)
560+
span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{
561+
Operation: "ValidateUpdate",
562+
Name: in.Name,
563+
Tags: map[string]string{
564+
workflowIDTagKey: info.WorkflowExecution.ID,
565+
runIDTagKey: info.WorkflowExecution.RunID,
566+
updateIDTagKey: currentUpdateInfo.ID,
567+
},
568+
FromHeader: true,
569+
Time: time.Now(),
570+
// We intentionally do not set IdempotencyKey here because validation is not run on
571+
// replay. When the tracing interceptor's span counter is reset between workflow
572+
// replays, the validator will not be processed which could result in impotency key
573+
// collisions with other requests.
574+
})
575+
if err != nil {
576+
return err
577+
}
578+
var finishOpts TracerFinishSpanOptions
579+
defer span.Finish(&finishOpts)
580+
581+
err = t.Next.ValidateUpdate(ctx, in)
582+
finishOpts.Error = err
583+
return err
584+
}
585+
586+
func (t *tracingWorkflowInboundInterceptor) ExecuteUpdate(
587+
ctx workflow.Context,
588+
in *UpdateInput,
589+
) (interface{}, error) {
590+
// Only add tracing if enabled and not replaying
591+
if t.root.options.DisableUpdateTracing {
592+
return t.Next.ExecuteUpdate(ctx, in)
593+
}
594+
// Start span reading from header
595+
info := workflow.GetInfo(ctx)
596+
currentUpdateInfo := workflow.GetCurrentUpdateInfo(ctx)
597+
span, ctx, err := t.root.startSpanFromWorkflowContext(ctx, &TracerStartSpanOptions{
598+
// Using operation name "HandleUpdate" to match other SDKs and by consistence with other operations
599+
Operation: "HandleUpdate",
600+
Name: in.Name,
601+
Tags: map[string]string{
602+
workflowIDTagKey: info.WorkflowExecution.ID,
603+
runIDTagKey: info.WorkflowExecution.RunID,
604+
updateIDTagKey: currentUpdateInfo.ID,
605+
},
606+
FromHeader: true,
607+
Time: time.Now(),
608+
IdempotencyKey: t.newIdempotencyKey(),
609+
})
610+
if err != nil {
611+
return nil, err
612+
}
613+
var finishOpts TracerFinishSpanOptions
614+
defer span.Finish(&finishOpts)
615+
616+
val, err := t.Next.ExecuteUpdate(ctx, in)
617+
finishOpts.Error = err
618+
return val, err
619+
}
620+
518621
type tracingWorkflowOutboundInterceptor struct {
519622
WorkflowOutboundInterceptorBase
520623
root *tracingInterceptor

Diff for: ‎internal/interceptortest/tracing.go

+52
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,24 @@ import (
3939

4040
var testWorkflowStartTime = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC)
4141

42+
type testUpdateCallbacks struct {
43+
AcceptImpl func()
44+
RejectImpl func(err error)
45+
CompleteImpl func(success interface{}, err error)
46+
}
47+
48+
// Accept implements internal.UpdateCallbacks.
49+
func (t *testUpdateCallbacks) Accept() {
50+
}
51+
52+
// Complete implements internal.UpdateCallbacks.
53+
func (t *testUpdateCallbacks) Complete(success interface{}, err error) {
54+
}
55+
56+
// Reject implements internal.UpdateCallbacks.
57+
func (t *testUpdateCallbacks) Reject(err error) {
58+
}
59+
4260
// TestTracer is an interceptor.Tracer that returns finished spans.
4361
type TestTracer interface {
4462
interceptor.Tracer
@@ -73,6 +91,18 @@ func RunTestWorkflow(t *testing.T, tracer interceptor.Tracer) {
7391

7492
env.SetStartTime(testWorkflowStartTime)
7593

94+
// Send an update
95+
env.RegisterDelayedCallback(func() {
96+
env.UpdateWorkflow("testUpdate", "updateID", &testUpdateCallbacks{
97+
RejectImpl: func(err error) {
98+
},
99+
AcceptImpl: func() {
100+
},
101+
CompleteImpl: func(interface{}, error) {
102+
},
103+
})
104+
}, 0*time.Second)
105+
76106
// Exec
77107
env.ExecuteWorkflow(testWorkflow)
78108

@@ -115,6 +145,12 @@ func RunTestWorkflowWithError(t *testing.T, tracer interceptor.Tracer) {
115145
func AssertSpanPropagation(t *testing.T, tracer TestTracer) {
116146

117147
require.Equal(t, []*SpanInfo{
148+
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "ValidateUpdate", Name: "testUpdate"})),
149+
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "HandleUpdate", Name: "testUpdate"}),
150+
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "StartActivity", Name: "testActivity"}),
151+
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunActivity", Name: "testActivity"}))),
152+
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "StartActivity", Name: "testActivityLocal"}),
153+
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunActivity", Name: "testActivityLocal"})))),
118154
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunWorkflow", Name: "testWorkflow"}),
119155
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "StartActivity", Name: "testActivity"}),
120156
Span(tracer.SpanName(&interceptor.TracerStartSpanOptions{Operation: "RunActivity", Name: "testActivity"}))),
@@ -137,6 +173,22 @@ func testWorkflowWithError(_ workflow.Context) error {
137173
}
138174

139175
func testWorkflow(ctx workflow.Context) ([]string, error) {
176+
var updateRan bool
177+
err := workflow.SetUpdateHandler(ctx, "testUpdate", func(ctx workflow.Context) (string, error) {
178+
defer func() { updateRan = true }()
179+
_, err := workflowInternal(ctx, false)
180+
if err != nil {
181+
return "", err
182+
}
183+
return "updateID", nil
184+
})
185+
if err != nil {
186+
return nil, err
187+
}
188+
err = workflow.Await(ctx, func() bool { return updateRan })
189+
if err != nil {
190+
return nil, err
191+
}
140192
// Run code
141193
ret, err := workflowInternal(ctx, false)
142194
if err != nil {

Diff for: ‎test/activity_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ func (a *Activities) ExternalSignalsAndQueries(ctx context.Context) error {
366366
// Signal with start
367367
workflowOpts := client.StartWorkflowOptions{TaskQueue: activity.GetInfo(ctx).TaskQueue}
368368
run, err := a.client.SignalWithStartWorkflow(ctx, "test-external-signals-and-queries", "start-signal",
369-
"signal-value", workflowOpts, new(Workflows).SignalsAndQueries, false, false)
369+
"signal-value", workflowOpts, new(Workflows).SignalsQueriesAndUpdate, false, false)
370370
if err != nil {
371371
return err
372372
}

Diff for: ‎test/integration_test.go

+79-19
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,9 @@ func (ts *IntegrationTestSuite) SetupTest() {
178178
sdktrace.WithSpanProcessor(ts.openTelemetrySpanRecorder)).Tracer("")
179179
interceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{
180180
Tracer: ts.openTelemetryTracer,
181-
DisableSignalTracing: strings.HasSuffix(ts.T().Name(), "WithoutSignalsAndQueries"),
182-
DisableQueryTracing: strings.HasSuffix(ts.T().Name(), "WithoutSignalsAndQueries"),
181+
DisableSignalTracing: strings.HasSuffix(ts.T().Name(), "WithoutMessages"),
182+
DisableQueryTracing: strings.HasSuffix(ts.T().Name(), "WithoutMessages"),
183+
DisableUpdateTracing: strings.HasSuffix(ts.T().Name(), "WithoutMessages"),
183184
DisableBaggage: strings.HasSuffix(ts.T().Name(), "WithDisableBaggageOption"),
184185
})
185186
ts.NoError(err)
@@ -2533,19 +2534,19 @@ func (ts *IntegrationTestSuite) TestOpenTelemetryTracing() {
25332534
ts.testOpenTelemetryTracing(true)
25342535
}
25352536

2536-
func (ts *IntegrationTestSuite) TestOpenTelemetryTracingWithoutSignalsAndQueries() {
2537+
func (ts *IntegrationTestSuite) TestOpenTelemetryTracingWithoutMessages() {
25372538
ts.testOpenTelemetryTracing(false)
25382539
}
25392540

2540-
func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeaders bool) {
2541+
func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withMessages bool) {
25412542
ctx, cancel := context.WithCancel(context.Background())
25422543
defer cancel()
25432544
// Start a top-level span
25442545
ctx, rootSpan := ts.openTelemetryTracer.Start(ctx, "root-span")
25452546

25462547
// Signal with start
25472548
run, err := ts.client.SignalWithStartWorkflow(ctx, "test-interceptor-open-telemetry", "start-signal",
2548-
nil, ts.startWorkflowOptions("test-interceptor-open-telemetry"), ts.workflows.SignalsAndQueries, true, true)
2549+
nil, ts.startWorkflowOptions("test-interceptor-open-telemetry"), ts.workflows.SignalsQueriesAndUpdate, true, true)
25492550
ts.NoError(err)
25502551

25512552
// Query
@@ -2555,6 +2556,16 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeade
25552556
ts.NoError(val.Get(&queryResp))
25562557
ts.Equal("query-response", queryResp)
25572558

2559+
// Update
2560+
handle, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
2561+
WorkflowID: run.GetID(),
2562+
RunID: run.GetRunID(),
2563+
UpdateName: "workflow-update",
2564+
WaitForStage: client.WorkflowUpdateStageCompleted,
2565+
})
2566+
ts.NoError(err)
2567+
ts.NoError(handle.Get(ctx, nil))
2568+
25582569
// Finish signal
25592570
ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish-signal", nil))
25602571
ts.NoError(run.Get(ctx, nil))
@@ -2566,15 +2577,18 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeade
25662577
// Span builder
25672578
span := func(name string, children ...*interceptortest.SpanInfo) *interceptortest.SpanInfo {
25682579
// If without signal-and-query headers, filter out those children in place
2569-
if !withSignalAndQueryHeaders {
2580+
if !withMessages {
25702581
n := 0
25712582
for _, child := range children {
2572-
isSignalOrQuery := strings.HasPrefix(child.Name, "SignalWorkflow:") ||
2583+
isMessage := strings.HasPrefix(child.Name, "SignalWorkflow:") ||
25732584
strings.HasPrefix(child.Name, "SignalChildWorkflow:") ||
25742585
strings.HasPrefix(child.Name, "HandleSignal:") ||
25752586
strings.HasPrefix(child.Name, "QueryWorkflow:") ||
2576-
strings.HasPrefix(child.Name, "HandleQuery:")
2577-
if !isSignalOrQuery {
2587+
strings.HasPrefix(child.Name, "HandleQuery:") ||
2588+
strings.HasPrefix(child.Name, "UpdateWorkflow:") ||
2589+
strings.HasPrefix(child.Name, "ValidateUpdate:") ||
2590+
strings.HasPrefix(child.Name, "HandleUpdate:")
2591+
if !isMessage {
25782592
children[n] = child
25792593
n++
25802594
}
@@ -2588,19 +2602,19 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeade
25882602
actual := interceptortest.Span("root-span")
25892603
ts.addOpenTelemetryChildren(rootSpan.SpanContext().SpanID(), actual, spans)
25902604
expected := span("root-span",
2591-
span("SignalWithStartWorkflow:SignalsAndQueries",
2605+
span("SignalWithStartWorkflow:SignalsQueriesAndUpdate",
25922606
span("HandleSignal:start-signal"),
2593-
span("RunWorkflow:SignalsAndQueries",
2607+
span("RunWorkflow:SignalsQueriesAndUpdate",
25942608
// Child workflow exec
2595-
span("StartChildWorkflow:SignalsAndQueries",
2596-
span("RunWorkflow:SignalsAndQueries",
2609+
span("StartChildWorkflow:SignalsQueriesAndUpdate",
2610+
span("RunWorkflow:SignalsQueriesAndUpdate",
25972611
// Activity inside child workflow
25982612
span("StartActivity:ExternalSignalsAndQueries",
25992613
span("RunActivity:ExternalSignalsAndQueries",
26002614
// Signal and query inside activity
2601-
span("SignalWithStartWorkflow:SignalsAndQueries",
2615+
span("SignalWithStartWorkflow:SignalsQueriesAndUpdate",
26022616
span("HandleSignal:start-signal"),
2603-
span("RunWorkflow:SignalsAndQueries"),
2617+
span("RunWorkflow:SignalsQueriesAndUpdate"),
26042618
),
26052619
span("QueryWorkflow:workflow-query",
26062620
span("HandleQuery:workflow-query"),
@@ -2621,9 +2635,9 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeade
26212635
// Activity in top-level
26222636
span("StartActivity:ExternalSignalsAndQueries",
26232637
span("RunActivity:ExternalSignalsAndQueries",
2624-
span("SignalWithStartWorkflow:SignalsAndQueries",
2638+
span("SignalWithStartWorkflow:SignalsQueriesAndUpdate",
26252639
span("HandleSignal:start-signal"),
2626-
span("RunWorkflow:SignalsAndQueries"),
2640+
span("RunWorkflow:SignalsQueriesAndUpdate"),
26272641
),
26282642
span("QueryWorkflow:workflow-query",
26292643
span("HandleQuery:workflow-query"),
@@ -2635,15 +2649,61 @@ func (ts *IntegrationTestSuite) testOpenTelemetryTracing(withSignalAndQueryHeade
26352649
),
26362650
),
26372651
),
2638-
// Top-level query and signal
2652+
// Top-level query signal, and update
26392653
span("QueryWorkflow:workflow-query",
26402654
span("HandleQuery:workflow-query"),
26412655
),
2656+
span("UpdateWorkflow:workflow-update",
2657+
span("ValidateUpdate:workflow-update"),
2658+
span("HandleUpdate:workflow-update",
2659+
// Child workflow exec
2660+
span("StartChildWorkflow:SignalsQueriesAndUpdate",
2661+
span("RunWorkflow:SignalsQueriesAndUpdate",
2662+
// Activity inside child workflow
2663+
span("StartActivity:ExternalSignalsAndQueries",
2664+
span("RunActivity:ExternalSignalsAndQueries",
2665+
// Signal and query inside activity
2666+
span("SignalWithStartWorkflow:SignalsQueriesAndUpdate",
2667+
span("HandleSignal:start-signal"),
2668+
span("RunWorkflow:SignalsQueriesAndUpdate"),
2669+
),
2670+
span("QueryWorkflow:workflow-query",
2671+
span("HandleQuery:workflow-query"),
2672+
),
2673+
span("SignalWorkflow:finish-signal",
2674+
span("HandleSignal:finish-signal"),
2675+
),
2676+
),
2677+
),
2678+
),
2679+
),
2680+
span("SignalChildWorkflow:start-signal",
2681+
span("HandleSignal:start-signal"),
2682+
),
2683+
span("SignalChildWorkflow:finish-signal",
2684+
span("HandleSignal:finish-signal"),
2685+
),
2686+
// Activity in top-level
2687+
span("StartActivity:ExternalSignalsAndQueries",
2688+
span("RunActivity:ExternalSignalsAndQueries",
2689+
span("SignalWithStartWorkflow:SignalsQueriesAndUpdate",
2690+
span("HandleSignal:start-signal"),
2691+
span("RunWorkflow:SignalsQueriesAndUpdate"),
2692+
),
2693+
span("QueryWorkflow:workflow-query",
2694+
span("HandleQuery:workflow-query"),
2695+
),
2696+
span("SignalWorkflow:finish-signal",
2697+
span("HandleSignal:finish-signal"),
2698+
),
2699+
),
2700+
),
2701+
),
2702+
),
26422703
span("SignalWorkflow:finish-signal",
26432704
span("HandleSignal:finish-signal"),
26442705
),
26452706
)
2646-
26472707
ts.Equal(expected, actual)
26482708
}
26492709

Diff for: ‎test/workflow_test.go

+43-25
Original file line numberDiff line numberDiff line change
@@ -2280,40 +2280,58 @@ func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error {
22802280
return nil
22812281
}
22822282

2283-
func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActivity bool) error {
2283+
func (w *Workflows) SignalsQueriesAndUpdate(ctx workflow.Context, execChild, execActivity bool) error {
2284+
execOperations := func(ctx workflow.Context) error {
2285+
// Run child if requested
2286+
if execChild {
2287+
fut := workflow.ExecuteChildWorkflow(ctx, w.SignalsQueriesAndUpdate, false, true)
2288+
// Signal child twice
2289+
if err := fut.SignalChildWorkflow(ctx, "start-signal", nil).Get(ctx, nil); err != nil {
2290+
return fmt.Errorf("failed signaling child with start: %w", err)
2291+
} else if err = fut.SignalChildWorkflow(ctx, "finish-signal", nil).Get(ctx, nil); err != nil {
2292+
return fmt.Errorf("failed signaling child with finish: %w", err)
2293+
}
2294+
// Wait for done
2295+
if err := fut.Get(ctx, nil); err != nil {
2296+
return fmt.Errorf("child failed: %w", err)
2297+
}
2298+
}
2299+
2300+
// Run activity if requested
2301+
if execActivity {
2302+
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
2303+
var a Activities
2304+
if err := workflow.ExecuteActivity(ctx, a.ExternalSignalsAndQueries).Get(ctx, nil); err != nil {
2305+
return fmt.Errorf("activity failed: %w", err)
2306+
}
2307+
}
2308+
return nil
2309+
}
22842310
// Add query handler
22852311
err := workflow.SetQueryHandler(ctx, "workflow-query", func() (string, error) { return "query-response", nil })
22862312
if err != nil {
22872313
return fmt.Errorf("failed setting query handler: %w", err)
22882314
}
2315+
// Add update handler
2316+
err = workflow.SetUpdateHandler(ctx, "workflow-update", func(ctx workflow.Context) (string, error) {
2317+
err := execOperations(ctx)
2318+
if err != nil {
2319+
return "", fmt.Errorf("failed executing operations: %w", err)
2320+
}
2321+
return "update-response", nil
2322+
})
2323+
if err != nil {
2324+
return fmt.Errorf("failed setting update handler: %w", err)
2325+
}
22892326

22902327
// Wait for signal on start
22912328
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, nil)
22922329

2293-
// Run child if requested
2294-
if execChild {
2295-
fut := workflow.ExecuteChildWorkflow(ctx, w.SignalsAndQueries, false, true)
2296-
// Signal child twice
2297-
if err := fut.SignalChildWorkflow(ctx, "start-signal", nil).Get(ctx, nil); err != nil {
2298-
return fmt.Errorf("failed signaling child with start: %w", err)
2299-
} else if err = fut.SignalChildWorkflow(ctx, "finish-signal", nil).Get(ctx, nil); err != nil {
2300-
return fmt.Errorf("failed signaling child with finish: %w", err)
2301-
}
2302-
// Wait for done
2303-
if err := fut.Get(ctx, nil); err != nil {
2304-
return fmt.Errorf("child failed: %w", err)
2305-
}
2306-
}
2307-
2308-
// Run activity if requested
2309-
if execActivity {
2310-
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
2311-
var a Activities
2312-
if err := workflow.ExecuteActivity(ctx, a.ExternalSignalsAndQueries).Get(ctx, nil); err != nil {
2313-
return fmt.Errorf("activity failed: %w", err)
2314-
}
2330+
// Run some operations
2331+
err = execOperations(ctx)
2332+
if err != nil {
2333+
return err
23152334
}
2316-
23172335
// Wait for finish signal
23182336
workflow.GetSignalChannel(ctx, "finish-signal").Receive(ctx, nil)
23192337
return nil
@@ -3153,7 +3171,7 @@ func (w *Workflows) register(worker worker.Worker) {
31533171
worker.RegisterWorkflow(w.InterceptorCalls)
31543172
worker.RegisterWorkflow(w.WaitSignalToStart)
31553173
worker.RegisterWorkflow(w.BuildIDWorkflow)
3156-
worker.RegisterWorkflow(w.SignalsAndQueries)
3174+
worker.RegisterWorkflow(w.SignalsQueriesAndUpdate)
31573175
worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage)
31583176
worker.RegisterWorkflow(w.AdvancedPostCancellation)
31593177
worker.RegisterWorkflow(w.AdvancedPostCancellationChildWithDone)

0 commit comments

Comments
 (0)
Please sign in to comment.