Skip to content

Commit b5db2b7

Browse files
authoredAug 16, 2024··
Add TaskQueueStats to DescribeTaskQueueEnhanced (#1553)
* Add TaskQueueStats to DescribeTaskQueueEnhanced * reformat * Address comments * Fix test * Get rid of time.sleep in the test * Attempt to fix test in CI * Fix test failure and add Eager comments * Convert BacklogIncreaseRate to a field * improve tests * improve tests * skip test in 1.24
1 parent bb42a8b commit b5db2b7

File tree

6 files changed

+223
-6
lines changed

6 files changed

+223
-6
lines changed
 

Diff for: ‎.github/workflows/ci.yml

+2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ jobs:
9191
env:
9292
# TODO(bergundy): Remove this flag once server 1.25.0 is out.
9393
DISABLE_NEXUS_TESTS: "1"
94+
# TODO(bergundy): Remove this flag too once server 1.25.0 is out. Thanks Roey! :)
95+
DISABLE_BACKLOG_STATS_TESTS: "1"
9496
working-directory: ./internal/cmd/build
9597

9698
cloud-test:

Diff for: ‎client/client.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,6 @@ type (
336336
TaskQueueReachability = internal.TaskQueueReachability
337337

338338
// DescribeTaskQueueEnhancedOptions is the input to [Client.DescribeTaskQueueEnhanced].
339-
// WARNING: Worker versioning is currently experimental.
340339
DescribeTaskQueueEnhancedOptions = internal.DescribeTaskQueueEnhancedOptions
341340

342341
// TaskQueueVersionSelection is a task queue filter based on versioning.
@@ -345,24 +344,26 @@ type (
345344
TaskQueueVersionSelection = internal.TaskQueueVersionSelection
346345

347346
// TaskQueueDescription is the response to [Client.DescribeTaskQueueEnhanced].
348-
// WARNING: Worker versioning is currently experimental.
349347
TaskQueueDescription = internal.TaskQueueDescription
350348

351349
// TaskQueueVersionInfo includes task queue information per Build ID.
352350
// It is part of [Client.TaskQueueDescription].
353-
// WARNING: Worker versioning is currently experimental.
354351
TaskQueueVersionInfo = internal.TaskQueueVersionInfo
355352

356353
// TaskQueueTypeInfo specifies task queue information per task type and Build ID.
357354
// It is included in [Client.TaskQueueVersionInfo].
358-
// WARNING: Worker versioning is currently experimental.
359355
TaskQueueTypeInfo = internal.TaskQueueTypeInfo
360356

361357
// TaskQueuePollerInfo provides information about a worker/client polling a task queue.
362358
// It is used by [Client.TaskQueueTypeInfo].
363-
// WARNING: Worker versioning is currently experimental.
364359
TaskQueuePollerInfo = internal.TaskQueuePollerInfo
365360

361+
// TaskQueueStats contains statistics about task queue backlog and activity.
362+
//
363+
// For workflow task queue type, this result is partial because tasks sent to sticky queues are not included. Read
364+
// comments above each metric to understand the impact of sticky queue exclusion on that metric accuracy.
365+
TaskQueueStats = internal.TaskQueueStats
366+
366367
// WorkerVersionCapabilities includes a worker's build identifier
367368
// and whether it is choosing to use the versioning feature.
368369
// It is an optional component of [Client.TaskQueuePollerInfo].

Diff for: ‎internal/internal_logging_tags.go

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636
tagWorkflowID = "WorkflowID"
3737
tagWorkflowType = "WorkflowType"
3838
tagWorkerID = "WorkerID"
39+
tagBuildID = "BuildID"
3940
tagWorkerType = "WorkerType"
4041
tagSideEffectID = "SideEffectID"
4142
tagChildWorkflowID = "ChildWorkflowID"

Diff for: ‎internal/internal_versioning_client.go

+75
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ type (
101101
// Include task reachability for the requested versions and all task types
102102
// (task reachability is not reported per task type).
103103
ReportTaskReachability bool
104+
// Include task queue stats for requested task queue types and versions.
105+
ReportStats bool
104106
}
105107

106108
// WorkerVersionCapabilities includes a worker's build identifier
@@ -126,11 +128,68 @@ type (
126128
WorkerVersionCapabilities *WorkerVersionCapabilities
127129
}
128130

131+
// TaskQueueStats contains statistics about task queue backlog and activity.
132+
//
133+
// For workflow task queue type, this result is partial because tasks sent to sticky queues are not included. Read
134+
// comments above each metric to understand the impact of sticky queue exclusion on that metric accuracy.
135+
TaskQueueStats struct {
136+
// The approximate number of tasks backlogged in this task queue. May count expired tasks but eventually
137+
// converges to the right value. Can be relied upon for scaling decisions.
138+
//
139+
// Special note for workflow task queue type: this metric does not count sticky queue tasks. However, because
140+
// those tasks only remain valid for a few seconds, the inaccuracy becomes less significant as the backlog size
141+
// grows.
142+
ApproximateBacklogCount int64
143+
// Approximate age of the oldest task in the backlog based on the creation time of the task at the head of
144+
// the queue. Can be relied upon for scaling decisions.
145+
//
146+
// Special note for workflow task queue type: this metric does not count sticky queue tasks. However, because
147+
// those tasks only remain valid for a few seconds, they should not affect the result when backlog is older than
148+
// few seconds.
149+
ApproximateBacklogAge time.Duration
150+
// Approximate *net* tasks per second added to the backlog, averaging the last 30 seconds. This is calculated as
151+
// `TasksAddRate - TasksDispatchRate`.
152+
// A positive value of `X` means the backlog is growing by about `X` tasks per second. A negative `-X` value means the
153+
// backlog is shrinking by about `X` tasks per second.
154+
//
155+
// Special note for workflow task queue type: this metric does not count sticky queue tasks. However, because
156+
// those tasks only remain valid for a few seconds, the inaccuracy becomes less significant as the backlog size
157+
// or age grow.
158+
BacklogIncreaseRate float32
159+
// Approximate tasks per second added to the task queue, averaging the last 30 seconds. This includes both
160+
// backlogged and sync-matched tasks, but excludes the Eagerly dispatched workflow and activity tasks (see
161+
// documentation for `client.StartWorkflowOptions.EnableEagerStart` and `worker.Options.DisableEagerActivities`.)
162+
//
163+
// The difference between `TasksAddRate` and `TasksDispatchRate` is a reliable metric for the rate at which
164+
// backlog grows/shrinks. See `BacklogIncreaseRate`.
165+
//
166+
// Special note for workflow task queue type: this metric does not count sticky queue tasks. Hence, the reported
167+
// value may be significantly lower than the actual number of workflow tasks added. Note that typically, only
168+
// the first workflow task of each workflow goes to a normal queue, and the rest workflow tasks go to the sticky
169+
// queue associated with a specific worker instance. Activity tasks always go to normal queues so their reported
170+
// rate is accurate.
171+
TasksAddRate float32
172+
// Approximate tasks per second dispatched to workers, averaging the last 30 seconds. This includes both
173+
// backlogged and sync-matched tasks, but excludes the Eagerly dispatched workflow and activity tasks (see
174+
// documentation for `client.StartWorkflowOptions.EnableEagerStart` and `worker.Options.DisableEagerActivities`.)
175+
//
176+
// The difference between `TasksAddRate` and `TasksDispatchRate` is a reliable metric for the rate at which
177+
// backlog grows/shrinks. See `BacklogIncreaseRate`.
178+
//
179+
// Special note for workflow task queue type: this metric does not count sticky queue tasks. Hence, the reported
180+
// value may be significantly lower than the actual number of workflow tasks dispatched. Note that typically, only
181+
// the first workflow task of each workflow goes to a normal queue, and the rest workflow tasks go to the sticky
182+
// queue associated with a specific worker instance. Activity tasks always go to normal queues so their reported
183+
// rate is accurate.
184+
TasksDispatchRate float32
185+
}
186+
129187
// TaskQueueTypeInfo specifies task queue information per task type and Build ID.
130188
// It is included in [TaskQueueVersionInfo].
131189
TaskQueueTypeInfo struct {
132190
// Poller details for this task queue category.
133191
Pollers []TaskQueuePollerInfo
192+
Stats *TaskQueueStats
134193
}
135194

136195
// TaskQueueVersionInfo includes task queue information per Build ID.
@@ -174,6 +233,7 @@ func (o *DescribeTaskQueueEnhancedOptions) validateAndConvertToProto(namespace s
174233
TaskQueueTypes: taskQueueTypes,
175234
ReportPollers: o.ReportPollers,
176235
ReportTaskReachability: o.ReportTaskReachability,
236+
ReportStats: o.ReportStats,
177237
}
178238

179239
return opt, nil
@@ -220,6 +280,21 @@ func taskQueueTypeInfoFromResponse(response *taskqueuepb.TaskQueueTypeInfo) Task
220280

221281
return TaskQueueTypeInfo{
222282
Pollers: pollers,
283+
Stats: statsFromResponse(response.Stats),
284+
}
285+
}
286+
287+
func statsFromResponse(stats *taskqueuepb.TaskQueueStats) *TaskQueueStats {
288+
if stats == nil {
289+
return nil
290+
}
291+
292+
return &TaskQueueStats{
293+
ApproximateBacklogCount: stats.GetApproximateBacklogCount(),
294+
ApproximateBacklogAge: stats.GetApproximateBacklogAge().AsDuration(),
295+
TasksAddRate: stats.TasksAddRate,
296+
TasksDispatchRate: stats.TasksDispatchRate,
297+
BacklogIncreaseRate: stats.TasksAddRate - stats.TasksDispatchRate,
223298
}
224299
}
225300

Diff for: ‎internal/internal_worker.go

+6
Original file line numberDiff line numberDiff line change
@@ -1701,6 +1701,12 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
17011701
tagTaskQueue, taskQueue,
17021702
tagWorkerID, workerParams.Identity,
17031703
)
1704+
if workerParams.WorkerBuildID != "" {
1705+
// Add worker build ID to the logs if it's set by user
1706+
workerParams.Logger = log.With(workerParams.Logger,
1707+
tagBuildID, workerParams.WorkerBuildID,
1708+
)
1709+
}
17041710

17051711
processTestTags(&options, &workerParams)
17061712

Diff for: ‎test/worker_versioning_test.go

+133-1
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ package test_test
2424

2525
import (
2626
"context"
27+
"math"
2728
"os"
2829
"testing"
2930
"time"
3031

32+
"github.com/stretchr/testify/assert"
3133
"go.temporal.io/api/common/v1"
3234
"go.temporal.io/api/serviceerror"
3335
"go.temporal.io/api/workflowservice/v1"
36+
"go.temporal.io/sdk/internal"
3437

3538
"github.com/pborman/uuid"
3639
"github.com/stretchr/testify/require"
@@ -54,7 +57,7 @@ func TestWorkerVersioningTestSuite(t *testing.T) {
5457
func (ts *WorkerVersioningTestSuite) SetupSuite() {
5558
ts.Assertions = require.New(ts.T())
5659
ts.workflows = &Workflows{}
57-
ts.activities = &Activities{}
60+
ts.activities = newActivities()
5861
ts.NoError(ts.InitConfigAndNamespace())
5962
ts.NoError(ts.InitClient())
6063
}
@@ -800,6 +803,100 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersionsWithRules() {
800803
ts.Equal(client.BuildIDTaskReachability(client.BuildIDTaskReachabilityReachable), taskQueueVersionInfo.TaskReachability)
801804
}
802805

806+
func (ts *WorkerVersioningTestSuite) TestTaskQueueStats() {
807+
if os.Getenv("DISABLE_BACKLOG_STATS_TESTS") != "" {
808+
ts.T().SkipNow()
809+
}
810+
811+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
812+
defer cancel()
813+
814+
fetchAndValidateStats := func(expectedWorkflowStats *client.TaskQueueStats, expectedActivityStats *client.TaskQueueStats) {
815+
taskQueueInfo, err := ts.client.DescribeTaskQueueEnhanced(ctx, client.DescribeTaskQueueEnhancedOptions{
816+
TaskQueue: ts.taskQueueName,
817+
TaskQueueTypes: []client.TaskQueueType{
818+
client.TaskQueueTypeWorkflow,
819+
client.TaskQueueTypeActivity,
820+
},
821+
ReportStats: true,
822+
})
823+
ts.NoError(err)
824+
ts.Equal(1, len(taskQueueInfo.VersionsInfo))
825+
826+
ts.validateTaskQueueStats(expectedWorkflowStats, taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow].Stats)
827+
ts.validateTaskQueueStats(expectedActivityStats, taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeActivity].Stats)
828+
}
829+
830+
// Basic workflow runs two activities
831+
handle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("basic-wf"), ts.workflows.Basic)
832+
ts.NoError(err)
833+
834+
// Wait until the task goes to the TQ
835+
ts.EventuallyWithT(
836+
func(t *assert.CollectT) {
837+
taskQueueInfo, err := ts.client.DescribeTaskQueueEnhanced(ctx, client.DescribeTaskQueueEnhancedOptions{
838+
TaskQueue: ts.taskQueueName,
839+
TaskQueueTypes: []client.TaskQueueType{
840+
client.TaskQueueTypeWorkflow,
841+
},
842+
ReportStats: true,
843+
})
844+
ts.NoError(err)
845+
ts.Equal(1, len(taskQueueInfo.VersionsInfo))
846+
ts.NotNil(taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow])
847+
ts.NotNil(taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow].Stats)
848+
assert.Greater(t, taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow].Stats.ApproximateBacklogCount, int64(0))
849+
},
850+
time.Second, 100*time.Millisecond,
851+
)
852+
853+
// no workers yet, so only workflow should have a backlog
854+
fetchAndValidateStats(
855+
&client.TaskQueueStats{
856+
ApproximateBacklogCount: 1,
857+
ApproximateBacklogAge: time.Millisecond,
858+
BacklogIncreaseRate: 1,
859+
TasksAddRate: 1,
860+
TasksDispatchRate: 0,
861+
},
862+
&client.TaskQueueStats{
863+
ApproximateBacklogCount: 0,
864+
ApproximateBacklogAge: 0,
865+
BacklogIncreaseRate: 0,
866+
TasksAddRate: 0,
867+
TasksDispatchRate: 0,
868+
},
869+
)
870+
871+
// run the worker
872+
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{DisableEagerActivities: true})
873+
ts.workflows.register(worker1)
874+
ts.activities.register(worker1)
875+
ts.NoError(worker1.Start())
876+
defer worker1.Stop()
877+
878+
// Wait for the wf to finish
879+
ts.NoError(handle.Get(ctx, nil))
880+
881+
// backlogs should be empty but the rates should be non-zero
882+
fetchAndValidateStats(
883+
&client.TaskQueueStats{
884+
ApproximateBacklogCount: 0,
885+
ApproximateBacklogAge: 0,
886+
BacklogIncreaseRate: 0,
887+
TasksAddRate: 1,
888+
TasksDispatchRate: 1,
889+
},
890+
&client.TaskQueueStats{
891+
ApproximateBacklogCount: 0,
892+
ApproximateBacklogAge: 0,
893+
BacklogIncreaseRate: 0,
894+
TasksAddRate: 1,
895+
TasksDispatchRate: 1,
896+
},
897+
)
898+
}
899+
803900
func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
804901
// TODO: Unskip this test, it is flaky with server 1.25.0-rc.0
805902
if os.Getenv("DISABLE_SERVER_1_25_TESTS") != "" {
@@ -984,3 +1081,38 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetimeWithR
9841081
ts.NoError(enval.Get(&lastBuildID))
9851082
ts.Equal("1.1", lastBuildID)
9861083
}
1084+
1085+
// validateTaskQueueStats compares expected vs actual stats.
1086+
// For age and rates, it treats all non-zero values the same.
1087+
// For BacklogIncreaseRate for non-zero expected values we only compare the sign (i.e. backlog grows or shrinks), while
1088+
// zero expected value means "not specified".
1089+
func (ts *WorkerVersioningTestSuite) validateTaskQueueStats(expected *client.TaskQueueStats, actual *internal.TaskQueueStats) {
1090+
if expected == nil {
1091+
ts.Nil(actual)
1092+
return
1093+
}
1094+
ts.NotNil(actual)
1095+
ts.Equal(expected.ApproximateBacklogCount, actual.ApproximateBacklogCount)
1096+
if expected.ApproximateBacklogAge == 0 {
1097+
ts.Equal(time.Duration(0), actual.ApproximateBacklogAge)
1098+
} else {
1099+
ts.Greater(actual.ApproximateBacklogAge, time.Duration(0))
1100+
}
1101+
if expected.TasksAddRate == 0 {
1102+
// TODO: do not accept NaN once the server code is fixed: https://github.com/temporalio/temporal/pull/6404
1103+
ts.True(float32(0) == actual.TasksAddRate || math.IsNaN(float64(actual.TasksAddRate)))
1104+
} else {
1105+
ts.Greater(actual.TasksAddRate, float32(0))
1106+
}
1107+
if expected.TasksDispatchRate == 0 {
1108+
// TODO: do not accept NaN once the server code is fixed: https://github.com/temporalio/temporal/pull/6404
1109+
ts.True(float32(0) == actual.TasksDispatchRate || math.IsNaN(float64(actual.TasksDispatchRate)))
1110+
} else {
1111+
ts.Greater(actual.TasksDispatchRate, float32(0))
1112+
}
1113+
if expected.BacklogIncreaseRate > 0 {
1114+
ts.Greater(actual.BacklogIncreaseRate, float32(0))
1115+
} else if expected.BacklogIncreaseRate < 0 {
1116+
ts.Less(actual.BacklogIncreaseRate, float32(0))
1117+
}
1118+
}

0 commit comments

Comments
 (0)
Please sign in to comment.