Skip to content

Commit b4e934e

Browse files
authoredOct 8, 2024··
Include updateID and updateName in update logger (#1660)
1 parent b300e50 commit b4e934e

File tree

4 files changed

+96
-18
lines changed

4 files changed

+96
-18
lines changed
 

‎internal/internal_logging_tags.go

+2
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,6 @@ const (
5555
tagNexusService = "NexusService"
5656
tagPanicError = "PanicError"
5757
tagPanicStack = "PanicStack"
58+
tagUpdateID = "UpdateID"
59+
tagUpdateName = "UpdateName"
5860
)

‎internal/workflow.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -1262,7 +1262,14 @@ func GetLogger(ctx Context) log.Logger {
12621262
}
12631263

12641264
func (wc *workflowEnvironmentInterceptor) GetLogger(ctx Context) log.Logger {
1265-
return wc.env.GetLogger()
1265+
logger := wc.env.GetLogger()
1266+
// Add update info to the logger if available
1267+
uc := ctx.Value(updateInfoContextKey)
1268+
if uc == nil {
1269+
return logger
1270+
}
1271+
updateInfo := uc.(*UpdateInfo)
1272+
return log.With(logger, tagUpdateID, updateInfo.ID, tagUpdateName, updateInfo.Name)
12661273
}
12671274

12681275
// GetMetricsHandler returns a metrics handler to be used in workflow's context

‎internal/workflow_testsuite_test.go

+81-16
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,22 @@ func TestAllHandlersFinished(t *testing.T) {
545545
require.Equal(t, 2, result)
546546
}
547547

548+
// parseLogs parses the logs from the buffer and returns the logs as a slice of maps
549+
func parseLogs(t *testing.T, buf *bytes.Buffer) []map[string]any {
550+
var ms []map[string]any
551+
for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) {
552+
if len(line) == 0 {
553+
continue
554+
}
555+
var m map[string]any
556+
err := json.Unmarshal(line, &m)
557+
require.NoError(t, err)
558+
fmt.Println(m)
559+
ms = append(ms, m)
560+
}
561+
return ms
562+
}
563+
548564
func TestWorkflowAllHandlersFinished(t *testing.T) {
549565
// runWf runs a workflow that sends two updates and then signals the workflow to complete
550566
runWf := func(completionType string, buf *bytes.Buffer) (int, error) {
@@ -648,21 +664,6 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
648664
require.NoError(t, env.GetWorkflowResult(&result))
649665
return result, nil
650666
}
651-
// parseLogs parses the logs from the buffer and returns the logs as a slice of maps
652-
parseLogs := func(buf *bytes.Buffer) []map[string]any {
653-
var ms []map[string]any
654-
for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) {
655-
if len(line) == 0 {
656-
continue
657-
}
658-
var m map[string]any
659-
err := json.Unmarshal(line, &m)
660-
require.NoError(t, err)
661-
fmt.Println(m)
662-
ms = append(ms, m)
663-
}
664-
return ms
665-
}
666667
// parseWarnedUpdates parses the warned updates from the logs and returns them as a slice of maps
667668
parseWarnedUpdates := func(updates interface{}) []map[string]interface{} {
668669
var warnedUpdates []map[string]interface{}
@@ -674,7 +675,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
674675
}
675676
// assertExpectedLogs asserts that the logs in the buffer are as expected
676677
assertExpectedLogs := func(t *testing.T, buf *bytes.Buffer, shouldWarn bool) {
677-
logs := parseLogs(buf)
678+
logs := parseLogs(t, buf)
678679
if shouldWarn {
679680
require.Len(t, logs, 1)
680681
require.Equal(t, unhandledUpdateWarningMessage, logs[0]["msg"])
@@ -718,6 +719,70 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
718719
})
719720
}
720721

722+
func TestWorkflowUpdateLogger(t *testing.T) {
723+
var suite WorkflowTestSuite
724+
var buf bytes.Buffer
725+
th := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelInfo})
726+
suite.SetLogger(log.NewStructuredLogger(slog.New(th)))
727+
env := suite.NewTestWorkflowEnvironment()
728+
729+
env.RegisterDelayedCallback(func() {
730+
env.UpdateWorkflow("logging_update", "id_1", &updateCallback{
731+
reject: func(err error) {
732+
require.Fail(t, "update should not be rejected")
733+
},
734+
accept: func() {},
735+
complete: func(interface{}, error) {},
736+
})
737+
}, 0)
738+
739+
env.RegisterDelayedCallback(func() {
740+
env.SignalWorkflow("completion", nil)
741+
}, time.Minute*2)
742+
743+
env.ExecuteWorkflow(func(ctx Context) (int, error) {
744+
var ranUpdates int
745+
err := SetUpdateHandler(ctx, "logging_update", func(ctx Context) error {
746+
ranUpdates++
747+
log := GetLogger(ctx)
748+
log.Info("logging update handler")
749+
return nil
750+
}, UpdateHandlerOptions{
751+
Validator: func(ctx Context) error {
752+
log := GetLogger(ctx)
753+
log.Info("logging update validator")
754+
return nil
755+
},
756+
})
757+
if err != nil {
758+
return 0, err
759+
}
760+
761+
var completeType string
762+
s := NewSelector(ctx)
763+
s.AddReceive(ctx.Done(), func(c ReceiveChannel, more bool) {
764+
completeType = "cancel"
765+
}).AddReceive(GetSignalChannel(ctx, "completion"), func(c ReceiveChannel, more bool) {
766+
c.Receive(ctx, &completeType)
767+
}).Select(ctx)
768+
return ranUpdates, nil
769+
})
770+
771+
require.NoError(t, env.GetWorkflowError())
772+
var result int
773+
require.NoError(t, env.GetWorkflowResult(&result))
774+
// Verify logs
775+
logs := parseLogs(t, &buf)
776+
require.Len(t, logs, 2)
777+
require.Equal(t, logs[0][tagUpdateName], "logging_update")
778+
require.Equal(t, logs[0][tagUpdateID], "id_1")
779+
require.Equal(t, logs[0]["msg"], "logging update validator")
780+
require.Equal(t, logs[1][tagUpdateName], "logging_update")
781+
require.Equal(t, logs[1][tagUpdateID], "id_1")
782+
require.Equal(t, logs[1]["msg"], "logging update handler")
783+
784+
}
785+
721786
func TestWorkflowStartTimeInsideTestWorkflow(t *testing.T) {
722787
var suite WorkflowTestSuite
723788
env := suite.NewTestWorkflowEnvironment()

‎workflow/workflow.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,11 @@ func GetCurrentUpdateInfo(ctx Context) *UpdateInfo {
278278
return internal.GetCurrentUpdateInfo(ctx)
279279
}
280280

281-
// GetLogger returns a logger to be used in workflow's context
281+
// GetLogger returns a logger to be used in workflow's context.
282+
// This logger does not record logs during replay.
283+
//
284+
// The logger may also extract additional fields from the context, such as update info
285+
// if used in an update handler.
282286
func GetLogger(ctx Context) log.Logger {
283287
return internal.GetLogger(ctx)
284288
}

0 commit comments

Comments
 (0)
Please sign in to comment.