Skip to content

Commit ea60ad5

Browse files
authoredOct 3, 2024··
Evict the workflow from cache if their is a panic in the SDK (#1654)
Evict the workflow from cache if their is a panic in the SDK
1 parent 772bc83 commit ea60ad5

File tree

2 files changed

+70
-1
lines changed

2 files changed

+70
-1
lines changed
 

‎internal/internal_task_pollers.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error {
364364
}
365365
}
366366

367-
func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
367+
func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) (retErr error) {
368368
if task.task == nil {
369369
// We didn't have task, poll might have timeout.
370370
traceLog(func() {
@@ -385,6 +385,20 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error {
385385
}
386386
var taskErr error
387387
defer func() {
388+
// If we panic during processing the workflow task, we need to unlock the workflow context with an error to discard it.
389+
if p := recover(); p != nil {
390+
topLine := fmt.Sprintf("workflow task for %s [panic]:", wtp.taskQueueName)
391+
st := getStackTraceRaw(topLine, 7, 0)
392+
wtp.logger.Error("Workflow task processing panic.",
393+
tagWorkflowID, task.task.WorkflowExecution.GetWorkflowId(),
394+
tagRunID, task.task.WorkflowExecution.GetRunId(),
395+
tagWorkerType, task.task.GetWorkflowType().Name,
396+
tagAttempt, task.task.Attempt,
397+
tagPanicError, fmt.Sprintf("%v", p),
398+
tagPanicStack, st)
399+
taskErr = newPanicError(p, st)
400+
retErr = taskErr
401+
}
388402
wfctx.Unlock(taskErr)
389403
}()
390404

‎internal/internal_task_pollers_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -377,3 +377,58 @@ func TestWFTReset(t *testing.T) {
377377
cachedExecution = cache.getWorkflowContext(runID)
378378
require.True(t, originalCachedExecution == cachedExecution)
379379
}
380+
381+
type panickingTaskHandler struct {
382+
WorkflowTaskHandler
383+
}
384+
385+
func (wth *panickingTaskHandler) ProcessWorkflowTask(
386+
task *workflowTask,
387+
wfctx *workflowExecutionContextImpl,
388+
hb workflowTaskHeartbeatFunc,
389+
) (interface{}, error) {
390+
panic("panickingTaskHandler")
391+
}
392+
393+
func TestWFTPanicInTaskHandler(t *testing.T) {
394+
cache := NewWorkerCache()
395+
params := workerExecutionParameters{cache: cache}
396+
ensureRequiredParams(&params)
397+
wfType := commonpb.WorkflowType{Name: t.Name() + "-workflow-type"}
398+
reg := newRegistry()
399+
reg.RegisterWorkflowWithOptions(func(ctx Context) error {
400+
return nil
401+
}, RegisterWorkflowOptions{
402+
Name: wfType.Name,
403+
})
404+
var (
405+
taskQueue = taskqueuepb.TaskQueue{Name: t.Name() + "task-queue"}
406+
startedAttrs = historypb.WorkflowExecutionStartedEventAttributes{
407+
TaskQueue: &taskQueue,
408+
}
409+
startedEvent = createTestEventWorkflowExecutionStarted(1, &startedAttrs)
410+
history = historypb.History{Events: []*historypb.HistoryEvent{startedEvent}}
411+
runID = t.Name() + "-run-id"
412+
wfID = t.Name() + "-workflow-id"
413+
wfe = commonpb.WorkflowExecution{RunId: runID, WorkflowId: wfID}
414+
ctrl = gomock.NewController(t)
415+
client = workflowservicemock.NewMockWorkflowServiceClient(ctrl)
416+
innerTaskHandler = newWorkflowTaskHandler(params, nil, newRegistry())
417+
taskHandler = &panickingTaskHandler{WorkflowTaskHandler: innerTaskHandler}
418+
contextManager = taskHandler
419+
codec = binary.LittleEndian
420+
pollResp0 = workflowservice.PollWorkflowTaskQueueResponse{
421+
Attempt: 1,
422+
WorkflowExecution: &wfe,
423+
WorkflowType: &wfType,
424+
History: &history,
425+
TaskToken: codec.AppendUint32(nil, 0),
426+
}
427+
task0 = workflowTask{task: &pollResp0}
428+
)
429+
430+
poller := newWorkflowTaskPoller(taskHandler, contextManager, client, params)
431+
require.Error(t, poller.processWorkflowTask(&task0))
432+
// Workflow should not be in cache
433+
require.Nil(t, cache.getWorkflowContext(runID))
434+
}

0 commit comments

Comments
 (0)
Please sign in to comment.