Skip to content

Commit dd28ced

Browse files
authoredJul 29, 2024··
Protect against legacy queries when state is destroyed (#1568)
1 parent 1f0296c commit dd28ced

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed
 

‎internal/internal_task_handlers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -769,14 +769,14 @@ func (wth *workflowTaskHandlerImpl) GetOrCreateWorkflowContext(
769769
if task.Query != nil && !isFullHistory && wth == workflowContext.wth && !workflowContext.IsDestroyed() {
770770
// query task and we have a valid cached state
771771
metricsHandler.Counter(metrics.StickyCacheHit).Inc(1)
772-
} else if history.Events[0].GetEventId() == workflowContext.previousStartedEventID+1 && wth == workflowContext.wth && !workflowContext.IsDestroyed() {
772+
} else if len(history.Events) > 0 && history.Events[0].GetEventId() == workflowContext.previousStartedEventID+1 && wth == workflowContext.wth && !workflowContext.IsDestroyed() {
773773
// non query task and we have a valid cached state
774774
metricsHandler.Counter(metrics.StickyCacheHit).Inc(1)
775775
} else {
776776
// possible another task already destroyed this context.
777777
if !workflowContext.IsDestroyed() {
778778
// non query task and cached state is missing events, we need to discard the cached state and build a new one.
779-
if history.Events[0].GetEventId() != workflowContext.previousStartedEventID+1 {
779+
if len(history.Events) > 0 && history.Events[0].GetEventId() != workflowContext.previousStartedEventID+1 {
780780
wth.logger.Debug("Cached state staled, new task has unexpected events",
781781
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
782782
tagRunID, task.WorkflowExecution.GetRunId(),

‎internal/internal_task_handlers_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,34 @@ func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() {
650650
params.cache.getWorkflowCache().Delete(task.WorkflowExecution.RunId)
651651
}
652652

653+
func (t *TaskHandlersTestSuite) TestStickyLegacyQueryTaskOnEvictedCache() {
654+
taskQueue := "tq1"
655+
testEvents := []*historypb.HistoryEvent{
656+
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
657+
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
658+
createTestEventWorkflowTaskStarted(3),
659+
}
660+
task := createWorkflowTask(testEvents, 0, "HelloWorld_Workflow")
661+
params := t.getTestWorkerExecutionParams()
662+
taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
663+
wftask := workflowTask{task: task}
664+
wfctx := t.mustWorkflowContextImpl(&wftask, taskHandler)
665+
wfctx.Unlock(nil)
666+
wfctx.clearState()
667+
// Now make the task look like a legacy query task on the sticky queue
668+
task.History = &historypb.History{}
669+
task.Query = &querypb.WorkflowQuery{}
670+
wfQueryTask := workflowTask{task: task, historyIterator: &historyIteratorImpl{
671+
iteratorFunc: func(nextToken []byte) (*historypb.History, []byte, error) {
672+
return &historypb.History{Events: testEvents}, nil, nil
673+
},
674+
}}
675+
wfctx = t.mustWorkflowContextImpl(&wfQueryTask, taskHandler)
676+
t.NotNil(wfctx)
677+
// clean up workflow left in cache
678+
params.cache.getWorkflowCache().Delete(task.WorkflowExecution.RunId)
679+
}
680+
653681
func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
654682
// Schedule an activity and see if we complete workflow.
655683
taskQueue := "tq1"

0 commit comments

Comments
 (0)
Please sign in to comment.