Skip to content

Commit 7432064

Browse files
authoredOct 15, 2024··
Add regression test to validate temporal_workflow_task_execution_failed on replay (#1669)
1 parent e503995 commit 7432064

File tree

2 files changed

+63
-0
lines changed

2 files changed

+63
-0
lines changed
 

‎test/integration_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -4458,6 +4458,56 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b
44584458
ts.True(taskFailedMetric >= 1)
44594459
}
44604460

4461+
func (ts *IntegrationTestSuite) TestNonDeterminismFailureCauseReplay() {
4462+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4463+
defer cancel()
4464+
4465+
fetchMetrics := func() (localMetric int64) {
4466+
for _, counter := range ts.metricsHandler.Counters() {
4467+
counter := counter
4468+
if counter.Name == "temporal_workflow_task_execution_failed" && counter.Tags["failure_reason"] == "NonDeterminismError" {
4469+
localMetric = counter.Value()
4470+
}
4471+
}
4472+
return
4473+
}
4474+
4475+
// Confirm no metrics to start
4476+
taskFailedMetric := fetchMetrics()
4477+
ts.Zero(taskFailedMetric)
4478+
4479+
// Start workflow
4480+
forcedNonDeterminismCounter = 0
4481+
run, err := ts.client.ExecuteWorkflow(
4482+
ctx,
4483+
ts.startWorkflowOptions("test-non-determinism-failure-cause-replay-"+uuid.New()),
4484+
ts.workflows.NonDeterminismReplay,
4485+
)
4486+
4487+
ts.NoError(err)
4488+
defer func() { _ = ts.client.TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "", nil) }()
4489+
ts.NoError(run.Get(ctx, nil))
4490+
4491+
// Now, stop the worker and start a new one
4492+
ts.worker.Stop()
4493+
ts.workerStopped = true
4494+
nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{})
4495+
ts.registerWorkflowsAndActivities(nextWorker)
4496+
ts.NoError(nextWorker.Start())
4497+
defer nextWorker.Stop()
4498+
4499+
// Increase the determinism counter and send a tick to trigger replay
4500+
// non-determinism
4501+
forcedNonDeterminismCounter++
4502+
fmt.Println("Querying workflow")
4503+
_, err = ts.client.QueryWorkflow(ctx, run.GetID(), run.GetRunID(), client.QueryTypeStackTrace, nil)
4504+
ts.Error(err)
4505+
ts.Equal("context deadline exceeded", err.Error())
4506+
4507+
taskFailedMetric = fetchMetrics()
4508+
ts.True(taskFailedMetric >= 1)
4509+
}
4510+
44614511
func (ts *IntegrationTestSuite) TestDeterminismUpsertSearchAttributesConditional() {
44624512
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
44634513
defer cancel()

‎test/workflow_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -2784,6 +2784,18 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif
27842784
return
27852785
}
27862786

2787+
func (w *Workflows) NonDeterminismReplay(ctx workflow.Context) error {
2788+
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
2789+
var a Activities
2790+
var err error
2791+
if forcedNonDeterminismCounter == 0 {
2792+
err = workflow.ExecuteActivity(ctx, a.Sleep, 1*time.Millisecond).Get(ctx, nil)
2793+
} else {
2794+
err = workflow.Sleep(ctx, 1*time.Millisecond)
2795+
}
2796+
return err
2797+
}
2798+
27872799
func (w *Workflows) ScheduleTypedSearchAttributesWorkflow(ctx workflow.Context) (string, error) {
27882800
attributes := workflow.GetTypedSearchAttributes(ctx)
27892801

@@ -3259,6 +3271,7 @@ func (w *Workflows) register(worker worker.Worker) {
32593271
worker.RegisterWorkflow(w.SignalCounter)
32603272
worker.RegisterWorkflow(w.PanicOnSignal)
32613273
worker.RegisterWorkflow(w.ForcedNonDeterminism)
3274+
worker.RegisterWorkflow(w.NonDeterminismReplay)
32623275
worker.RegisterWorkflow(w.MutableSideEffect)
32633276
worker.RegisterWorkflow(w.HistoryLengths)
32643277
worker.RegisterWorkflow(w.HeartbeatSpecificCount)

0 commit comments

Comments
 (0)
Please sign in to comment.