Skip to content

Commit 9c40461

Browse files
bergundypdoerner
andauthoredJul 24, 2024··
Add utility to get metrics handler for a Nexus operation (#1559)
This is work done by @pdoerner in #1544 that was lost in the nexus feature branch merge. Co-authored-by: pdoerner <122412190+pdoerner@users.noreply.github.com>
1 parent 0732f3d commit 9c40461

File tree

5 files changed

+59
-7
lines changed

5 files changed

+59
-7
lines changed
 

‎internal/cmd/build/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ func (b *builder) unitTest() error {
265265
log.Printf("Running unit tests in dirs: %v", testDirs)
266266
for _, testDir := range testDirs {
267267
// Run unit test
268-
args := []string{"go", "test", "-tags", "protolegacy", "-count", "1", "-race", "-v", "-timeout", "10m"}
268+
args := []string{"go", "test", "-tags", "protolegacy", "-count", "1", "-race", "-v", "-timeout", "15m"}
269269
if *runFlag != "" {
270270
args = append(args, "-run", *runFlag)
271271
}

‎internal/internal_nexus_task_handler.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"go.temporal.io/api/common/v1"
3737
nexuspb "go.temporal.io/api/nexus/v1"
3838
"go.temporal.io/api/workflowservice/v1"
39+
3940
"go.temporal.io/sdk/converter"
4041
"go.temporal.io/sdk/internal/common/metrics"
4142
"go.temporal.io/sdk/log"
@@ -109,14 +110,19 @@ func (h *nexusTaskHandler) Execute(task *workflowservice.PollNexusTaskQueueRespo
109110
}
110111

111112
func (h *nexusTaskHandler) execute(task *workflowservice.PollNexusTaskQueueResponse) (*nexuspb.Response, *nexuspb.HandlerError, error) {
113+
metricsHandler, handlerErr := h.metricsHandlerForTask(task)
114+
if handlerErr != nil {
115+
return nil, handlerErr, nil
116+
}
112117
log, handlerErr := h.loggerForTask(task)
113118
if handlerErr != nil {
114119
return nil, handlerErr, nil
115120
}
116121
nctx := &NexusOperationContext{
117-
Client: h.client,
118-
TaskQueue: h.taskQueueName,
119-
Log: log,
122+
Client: h.client,
123+
TaskQueue: h.taskQueueName,
124+
MetricsHandler: metricsHandler,
125+
Log: log,
120126
}
121127
header := nexus.Header(task.GetRequest().GetHeader())
122128
if header == nil {
@@ -317,6 +323,7 @@ func (h *nexusTaskHandler) loggerForTask(response *workflowservice.PollNexusTask
317323
return log.With(h.logger,
318324
tagNexusService, service,
319325
tagNexusOperation, operation,
326+
tagTaskQueue, h.taskQueueName,
320327
), nil
321328
}
322329

‎internal/nexus_operations.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,18 @@ import (
3434
nexuspb "go.temporal.io/api/nexus/v1"
3535
"go.temporal.io/api/operatorservice/v1"
3636
"go.temporal.io/api/workflowservice/v1"
37+
3738
"go.temporal.io/sdk/converter"
39+
"go.temporal.io/sdk/internal/common/metrics"
3840
"go.temporal.io/sdk/log"
3941
)
4042

4143
// NexusOperationContext is an internal only struct that holds fields used by the temporalnexus functions.
4244
type NexusOperationContext struct {
43-
Client Client
44-
TaskQueue string
45-
Log log.Logger
45+
Client Client
46+
TaskQueue string
47+
MetricsHandler metrics.Handler
48+
Log log.Logger
4649
}
4750

4851
type nexusOperationContextKeyType struct{}

‎temporalnexus/operation.go

+21
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,32 @@ import (
4444

4545
"github.com/nexus-rpc/sdk-go/nexus"
4646
"go.temporal.io/api/common/v1"
47+
4748
"go.temporal.io/sdk/client"
4849
"go.temporal.io/sdk/internal"
50+
"go.temporal.io/sdk/internal/common/metrics"
51+
"go.temporal.io/sdk/log"
4952
"go.temporal.io/sdk/workflow"
5053
)
5154

55+
// GetMetricsHandler returns a metrics handler to be used in a Nexus operation's context.
56+
func GetMetricsHandler(ctx context.Context) metrics.Handler {
57+
nctx, ok := internal.NexusOperationContextFromGoContext(ctx)
58+
if !ok {
59+
panic("temporalnexus GetMetricsHandler: Not a valid Nexus context")
60+
}
61+
return nctx.MetricsHandler
62+
}
63+
64+
// GetLogger returns a logger to be used in a Nexus operation's context.
65+
func GetLogger(ctx context.Context) log.Logger {
66+
nctx, ok := internal.NexusOperationContextFromGoContext(ctx)
67+
if !ok {
68+
panic("temporalnexus GetLogger: Not a valid Nexus context")
69+
}
70+
return nctx.Log
71+
}
72+
5273
type syncOperation[I, O any] struct {
5374
nexus.UnimplementedOperation[I, O]
5475

‎test/nexus_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
historypb "go.temporal.io/api/history/v1"
4343
nexuspb "go.temporal.io/api/nexus/v1"
4444
"go.temporal.io/api/operatorservice/v1"
45+
4546
"go.temporal.io/sdk/client"
4647
"go.temporal.io/sdk/internal/common/metrics"
4748
ilog "go.temporal.io/sdk/internal/log"
@@ -320,6 +321,11 @@ func TestSyncOperationFromWorkflow(t *testing.T) {
320321
tc := newTestContext(t, ctx)
321322

322323
op := temporalnexus.NewSyncOperation("op", func(ctx context.Context, c client.Client, outcome string, o nexus.StartOperationOptions) (string, error) {
324+
require.NotPanicsf(t, func() {
325+
temporalnexus.GetMetricsHandler(ctx)
326+
temporalnexus.GetLogger(ctx)
327+
}, "Failed to get metrics handler or logger from operation context.")
328+
323329
switch outcome {
324330
case "successful":
325331
return outcome, nil
@@ -445,6 +451,11 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
445451
}
446452
}
447453
op := temporalnexus.NewWorkflowRunOperation("op", handlerWorkflow, func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
454+
require.NotPanicsf(t, func() {
455+
temporalnexus.GetMetricsHandler(ctx)
456+
temporalnexus.GetLogger(ctx)
457+
}, "Failed to get metrics handler or logger from operation context.")
458+
448459
if action == "fail-to-start" {
449460
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error")
450461
}
@@ -677,6 +688,11 @@ func TestReplay(t *testing.T) {
677688

678689
func TestWorkflowTestSuite_NexusSyncOperation(t *testing.T) {
679690
op := nexus.NewSyncOperation("op", func(ctx context.Context, outcome string, opts nexus.StartOperationOptions) (string, error) {
691+
require.NotPanicsf(t, func() {
692+
temporalnexus.GetMetricsHandler(ctx)
693+
temporalnexus.GetLogger(ctx)
694+
}, "Failed to get metrics handler or logger from operation context.")
695+
680696
switch outcome {
681697
case "ok":
682698
return outcome, nil
@@ -765,6 +781,11 @@ func TestWorkflowTestSuite_WorkflowRunOperation(t *testing.T) {
765781
"op",
766782
handlerWF,
767783
func(ctx context.Context, id string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
784+
require.NotPanicsf(t, func() {
785+
temporalnexus.GetMetricsHandler(ctx)
786+
temporalnexus.GetLogger(ctx)
787+
}, "Failed to get metrics handler or logger from operation context.")
788+
768789
return client.StartWorkflowOptions{ID: opts.RequestID}, nil
769790
})
770791

0 commit comments

Comments
 (0)
Please sign in to comment.