Skip to content

Commit 69bc6c3

Browse files
authoredApr 17, 2024··
Update reapply (#1436)
Handle Update Admitted events
1 parent c4bf074 commit 69bc6c3

11 files changed

+963
-141
lines changed
 

‎internal/internal_event_handlers.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -1245,6 +1245,12 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
12451245
case enumspb.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
12461246
weh.handleUpsertWorkflowSearchAttributes(event)
12471247

1248+
case enumspb.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED:
1249+
weh.handleWorkflowPropertiesModified(event)
1250+
1251+
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
1252+
// No Operation
1253+
12481254
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
12491255
// No Operation
12501256

@@ -1254,9 +1260,6 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
12541260
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
12551261
// No Operation
12561262

1257-
case enumspb.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED:
1258-
weh.handleWorkflowPropertiesModified(event)
1259-
12601263
default:
12611264
if event.WorkerMayIgnore {
12621265
// Do not fail to be forward compatible with new events

‎internal/internal_task_handlers.go

+52-17
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ type (
177177
nextEventID int64 // next expected eventID for sanity
178178
lastEventID int64 // last expected eventID, zero indicates read until end of stream
179179
next []*historypb.HistoryEvent
180+
nextMessages []*protocolpb.Message
180181
nextFlags []sdkFlag
181182
binaryChecksum string
182183
sdkVersion string
@@ -199,7 +200,8 @@ type (
199200
events []*historypb.HistoryEvent
200201
markers []*historypb.HistoryEvent
201202
flags []sdkFlag
202-
msgs []*protocolpb.Message
203+
acceptedMsgs []*protocolpb.Message
204+
admittedMsgs []*protocolpb.Message
203205
binaryChecksum string
204206
sdkVersion string
205207
sdkName string
@@ -349,38 +351,42 @@ func (eh *history) nextTask() (*preparedTask, error) {
349351
return nil, err
350352
}
351353
eh.next = firstTask.events
354+
eh.nextMessages = firstTask.admittedMsgs
352355
eh.nextFlags = firstTask.flags
353356
eh.sdkName = firstTask.sdkName
354357
eh.sdkVersion = firstTask.sdkVersion
355358
}
356359

357360
result := eh.next
361+
requestMessages := eh.nextMessages
358362
checksum := eh.binaryChecksum
359363
sdkFlags := eh.nextFlags
360364
sdkName := eh.sdkName
361365
sdkVersion := eh.sdkVersion
362366

363367
var markers []*historypb.HistoryEvent
364-
var msgs []*protocolpb.Message
368+
var acceptedMsgs []*protocolpb.Message
365369
var buildID *string
366370
if len(result) > 0 {
367371
nextTaskEvents, err := eh.prepareTask()
368372
if err != nil {
369373
return nil, err
370374
}
371375
eh.next = nextTaskEvents.events
376+
eh.nextMessages = nextTaskEvents.admittedMsgs
372377
eh.nextFlags = nextTaskEvents.flags
373378
eh.sdkName = nextTaskEvents.sdkName
374379
eh.sdkVersion = nextTaskEvents.sdkVersion
375380
markers = nextTaskEvents.markers
376-
msgs = nextTaskEvents.msgs
381+
acceptedMsgs = nextTaskEvents.acceptedMsgs
377382
buildID = nextTaskEvents.buildID
378383
}
379384
return &preparedTask{
380385
events: result,
381386
markers: markers,
382387
flags: sdkFlags,
383-
msgs: msgs,
388+
acceptedMsgs: acceptedMsgs,
389+
admittedMsgs: requestMessages,
384390
binaryChecksum: checksum,
385391
sdkName: sdkName,
386392
sdkVersion: sdkVersion,
@@ -481,7 +487,17 @@ OrderEvents:
481487
} else if isPreloadMarkerEvent(event) {
482488
taskEvents.markers = append(taskEvents.markers, event)
483489
} else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil {
484-
taskEvents.msgs = append(taskEvents.msgs, inferMessage(attrs))
490+
taskEvents.acceptedMsgs = append(taskEvents.acceptedMsgs, inferMessageFromAcceptedEvent(attrs))
491+
} else if attrs := event.GetWorkflowExecutionUpdateAdmittedEventAttributes(); attrs != nil {
492+
updateID := attrs.GetRequest().GetMeta().GetUpdateId()
493+
taskEvents.admittedMsgs = append(taskEvents.admittedMsgs, &protocolpb.Message{
494+
Id: updateID + "/request",
495+
ProtocolInstanceId: updateID,
496+
SequencingId: &protocolpb.Message_EventId{
497+
EventId: event.GetEventId(),
498+
},
499+
Body: protocol.MustMarshalAny(attrs.GetRequest()),
500+
})
485501
}
486502
taskEvents.events = append(taskEvents.events, event)
487503
}
@@ -506,7 +522,7 @@ func isPreloadMarkerEvent(event *historypb.HistoryEvent) bool {
506522
return event.GetEventType() == enumspb.EVENT_TYPE_MARKER_RECORDED
507523
}
508524

509-
func inferMessage(attrs *historypb.WorkflowExecutionUpdateAcceptedEventAttributes) *protocolpb.Message {
525+
func inferMessageFromAcceptedEvent(attrs *historypb.WorkflowExecutionUpdateAcceptedEventAttributes) *protocolpb.Message {
510526
return &protocolpb.Message{
511527
Id: attrs.GetAcceptedRequestMessageId(),
512528
ProtocolInstanceId: attrs.GetProtocolInstanceId(),
@@ -1005,19 +1021,36 @@ ProcessEvents:
10051021
}
10061022
reorderedEvents := nextTask.events
10071023
markers := nextTask.markers
1008-
historyMessages := nextTask.msgs
1024+
historyMessages := nextTask.acceptedMsgs
10091025
flags := nextTask.flags
10101026
binaryChecksum := nextTask.binaryChecksum
10111027
nextTaskBuildId := nextTask.buildID
1028+
admittedUpdates := nextTask.admittedMsgs
10121029
// Check if we are replaying so we know if we should use the messages in the WFT or the history
10131030
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
10141031
var msgs *eventMsgIndex
10151032
if isReplay {
1033+
admittedUpdatesByID := make(map[string]*protocolpb.Message, len(admittedUpdates))
1034+
for _, admittedUpdate := range admittedUpdates {
1035+
admittedUpdatesByID[admittedUpdate.GetProtocolInstanceId()] = admittedUpdate
1036+
}
1037+
// Check if we need to replace the update message synthesize from an
1038+
// accepted event with the update message synthesize from an admitted event
1039+
for i, msg := range historyMessages {
1040+
if admittedUpdate, ok := admittedUpdatesByID[msg.GetProtocolInstanceId()]; ok {
1041+
historyMessages[i] = admittedUpdate
1042+
}
1043+
// At this point, all update messages should have a body
1044+
if historyMessages[i].Body == nil {
1045+
return nil, errors.New("missing body for accepted message")
1046+
}
1047+
}
10161048
msgs = indexMessagesByEventID(historyMessages)
10171049

10181050
eventHandler.sdkVersion = nextTask.sdkVersion
10191051
eventHandler.sdkName = nextTask.sdkName
10201052
} else {
1053+
taskMessages = append(taskMessages, admittedUpdates...)
10211054
msgs = indexMessagesByEventID(taskMessages)
10221055
taskMessages = []*protocolpb.Message{}
10231056
if eventHandler.sdkVersion != SDKVersion {
@@ -1405,6 +1438,8 @@ func skipDeterministicCheckForEvent(e *historypb.HistoryEvent, sdkFlags *sdkFlag
14051438
return true
14061439
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
14071440
return true
1441+
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
1442+
return true
14081443
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
14091444
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED,
14101445
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
@@ -1735,16 +1770,16 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
17351770
useCompat := determineInheritBuildIdFlagForCommand(
17361771
contErr.VersioningIntent, workflowContext.workflowInfo.TaskQueueName, contErr.TaskQueueName)
17371772
closeCommand.Attributes = &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
1738-
WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name},
1739-
Input: contErr.Input,
1740-
TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
1741-
WorkflowRunTimeout: durationpb.New(contErr.WorkflowRunTimeout),
1742-
WorkflowTaskTimeout: durationpb.New(contErr.WorkflowTaskTimeout),
1743-
Header: contErr.Header,
1744-
Memo: workflowContext.workflowInfo.Memo,
1745-
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
1746-
RetryPolicy: convertToPBRetryPolicy(retryPolicy),
1747-
InheritBuildId: useCompat,
1773+
WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name},
1774+
Input: contErr.Input,
1775+
TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
1776+
WorkflowRunTimeout: durationpb.New(contErr.WorkflowRunTimeout),
1777+
WorkflowTaskTimeout: durationpb.New(contErr.WorkflowTaskTimeout),
1778+
Header: contErr.Header,
1779+
Memo: workflowContext.workflowInfo.Memo,
1780+
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
1781+
RetryPolicy: convertToPBRetryPolicy(retryPolicy),
1782+
InheritBuildId: useCompat,
17481783
}}
17491784
} else if workflowContext.err != nil {
17501785
// Workflow failures

‎internal/internal_task_handlers_interfaces_test.go

+9-15
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,8 @@ func (s *PollLayerInterfacesTestSuite) TestMessageCommands() {
309309
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, nextTask.events[0].GetEventType())
310310
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, nextTask.events[1].GetEventType())
311311

312-
s.Equal(1, len(nextTask.msgs))
313-
s.Equal("test", nextTask.msgs[0].GetProtocolInstanceId())
312+
s.Equal(1, len(nextTask.acceptedMsgs))
313+
s.Equal("test", nextTask.acceptedMsgs[0].GetProtocolInstanceId())
314314

315315
nextTask, err = eh.nextTask()
316316
s.NoError(err)
@@ -319,7 +319,7 @@ func (s *PollLayerInterfacesTestSuite) TestMessageCommands() {
319319
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, nextTask.events[1].GetEventType())
320320
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, nextTask.events[2].GetEventType())
321321

322-
s.Equal(0, len(nextTask.msgs))
322+
s.Equal(0, len(nextTask.acceptedMsgs))
323323
}
324324

325325
func (s *PollLayerInterfacesTestSuite) TestEmptyPages() {
@@ -339,16 +339,10 @@ func (s *PollLayerInterfacesTestSuite) TestEmptyPages() {
339339
ScheduledEventId: 5,
340340
StartedEventId: 6,
341341
}),
342-
{
343-
EventId: 8,
344-
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
345-
Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{
346-
WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
347-
ProtocolInstanceId: "test",
348-
AcceptedRequest: &updatepb.Request{},
349-
},
350-
},
351-
},
342+
createTestEventWorkflowExecutionUpdateAccepted(8, &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
343+
ProtocolInstanceId: "test",
344+
AcceptedRequest: &updatepb.Request{},
345+
}),
352346
createTestEventWorkflowTaskScheduled(9, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
353347
createTestEventWorkflowTaskStarted(10),
354348
}
@@ -433,8 +427,8 @@ func (s *PollLayerInterfacesTestSuite) TestEmptyPages() {
433427
s.Equal(expected.events[i].EventType, event.EventType)
434428
}
435429

436-
s.Equal(len(expected.messages), len(nexTask.msgs))
437-
for i, msg := range nexTask.msgs {
430+
s.Equal(len(expected.messages), len(nexTask.acceptedMsgs))
431+
for i, msg := range nexTask.acceptedMsgs {
438432
s.Equal(expected.messages[i].ProtocolInstanceId, msg.ProtocolInstanceId)
439433
}
440434
}

‎internal/internal_task_handlers_test.go

+298-92
Large diffs are not rendered by default.

‎internal/internal_update.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ func (up *updateProtocol) Accept() {
157157
Body: protocol.MustMarshalAny(&updatepb.Acceptance{
158158
AcceptedRequestMessageId: up.requestMsgID,
159159
AcceptedRequestSequencingEventId: up.requestSeqID,
160-
// AcceptedRequest field no longer read by server - will be removed from API soon
161160
}),
162161
}, withExpectedEventPredicate(up.checkAcceptedEvent))
163162
up.state = updateStateAccepted
@@ -224,10 +223,9 @@ func (up *updateProtocol) checkAcceptedEvent(e *historypb.HistoryEvent) bool {
224223
if attrs == nil {
225224
return false
226225
}
227-
return attrs.AcceptedRequest.GetMeta().GetUpdateId() == up.protoInstanceID &&
226+
return attrs.GetProtocolInstanceId() == up.protoInstanceID &&
228227
attrs.AcceptedRequestMessageId == up.requestMsgID &&
229-
attrs.AcceptedRequestSequencingEventId == up.requestSeqID &&
230-
attrs.AcceptedRequest != nil
228+
attrs.AcceptedRequestSequencingEventId == up.requestSeqID
231229
}
232230

233231
// defaultHandler receives the initial invocation of an update during WFT

‎internal/internal_update_test.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ func TestAcceptedEventPredicate(t *testing.T) {
585585
name: "wrong req msg ID",
586586
test: require.False,
587587
attrs: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
588+
ProtocolInstanceId: updateID,
588589
AcceptedRequest: &request,
589590
AcceptedRequestMessageId: "wrong request message ID",
590591
AcceptedRequestSequencingEventId: requestSeqID,
@@ -594,25 +595,18 @@ func TestAcceptedEventPredicate(t *testing.T) {
594595
name: "wrong req seq ID",
595596
test: require.False,
596597
attrs: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
598+
ProtocolInstanceId: updateID,
597599
AcceptedRequest: &request,
598600
AcceptedRequestMessageId: requestMsgID,
599601
AcceptedRequestSequencingEventId: requestSeqID + 10,
600602
},
601603
},
602-
{
603-
name: "missing request",
604-
test: require.False,
605-
attrs: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
606-
AcceptedRequest: nil,
607-
AcceptedRequestMessageId: requestMsgID,
608-
AcceptedRequestSequencingEventId: requestSeqID,
609-
},
610-
},
611604
{
612605
name: "match",
613606
test: require.True,
614607
attrs: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
615608
AcceptedRequest: &request,
609+
ProtocolInstanceId: updateID,
616610
AcceptedRequestMessageId: requestMsgID,
617611
AcceptedRequestSequencingEventId: requestSeqID,
618612
},

‎internal/protocol/util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var ErrProtoNameNotFound = errors.New("protocol name not found")
3838
// NameFromMessage extracts the name of the protocol to which the supplied
3939
// message belongs.
4040
func NameFromMessage(msg *protocolpb.Message) (string, error) {
41-
bodyType := string(msg.Body.MessageName())
41+
bodyType := string(msg.GetBody().MessageName())
4242
if bodyType == "" {
4343
return "", ErrProtoNameNotFound
4444
}

‎test/replaytests/replay_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,20 @@ func (s *replayTestSuite) TestMultipleUpdates() {
431431
s.NoError(err)
432432
}
433433

434+
func (s *replayTestSuite) TestResetWithUpdateAccepted() {
435+
replayer := worker.NewWorkflowReplayer()
436+
replayer.RegisterWorkflow(CounterWorkflow)
437+
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "update-reset-accepted.json")
438+
s.NoError(err)
439+
}
440+
441+
func (s *replayTestSuite) TestResetWithUpdateRejected() {
442+
replayer := worker.NewWorkflowReplayer()
443+
replayer.RegisterWorkflow(CounterWorkflow)
444+
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "update-reset-rejected.json")
445+
s.NoError(err)
446+
}
447+
434448
type captureConverter struct {
435449
converter.DataConverter
436450
toPayloads []interface{}
+324
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
{
2+
"events": [
3+
{
4+
"eventId": "1",
5+
"eventTime": "2024-03-25T22:18:22.830541Z",
6+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
7+
"taskId": "1050032",
8+
"workflowExecutionStartedEventAttributes": {
9+
"workflowType": {
10+
"name": "CounterWorkflow"
11+
},
12+
"taskQueue": {
13+
"name": "update",
14+
"kind": "TASK_QUEUE_KIND_NORMAL"
15+
},
16+
"workflowExecutionTimeout": "0s",
17+
"workflowRunTimeout": "0s",
18+
"workflowTaskTimeout": "10s",
19+
"originalExecutionRunId": "a178da94-2ac5-4866-b1a5-4c786fa4e29d",
20+
"identity": "75101@Quinn-Klassens-MacBook-Pro.local@",
21+
"firstExecutionRunId": "a178da94-2ac5-4866-b1a5-4c786fa4e29d",
22+
"attempt": 1,
23+
"firstWorkflowTaskBackoff": "0s",
24+
"header": {},
25+
"workflowId": "update-workflow-ID"
26+
}
27+
},
28+
{
29+
"eventId": "2",
30+
"eventTime": "2024-03-25T22:18:22.830576Z",
31+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
32+
"taskId": "1050033",
33+
"workflowTaskScheduledEventAttributes": {
34+
"taskQueue": {
35+
"name": "update",
36+
"kind": "TASK_QUEUE_KIND_NORMAL"
37+
},
38+
"startToCloseTimeout": "10s",
39+
"attempt": 1
40+
}
41+
},
42+
{
43+
"eventId": "3",
44+
"eventTime": "2024-03-25T22:18:22.832619Z",
45+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
46+
"taskId": "1050040",
47+
"workflowTaskStartedEventAttributes": {
48+
"scheduledEventId": "2",
49+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
50+
"requestId": "7812cc67-4c7e-46a8-918c-2c744b88aea0",
51+
"historySizeBytes": "502"
52+
}
53+
},
54+
{
55+
"eventId": "4",
56+
"eventTime": "2024-03-25T22:18:26.641635Z",
57+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_FAILED",
58+
"taskId": "1050069",
59+
"workflowTaskFailedEventAttributes": {
60+
"scheduledEventId": "2",
61+
"startedEventId": "3",
62+
"cause": "WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW",
63+
"failure": {
64+
"message": "<unknown-user>: test",
65+
"resetWorkflowFailureInfo": {}
66+
},
67+
"identity": "history-service",
68+
"baseRunId": "a178da94-2ac5-4866-b1a5-4c786fa4e29d",
69+
"newRunId": "3962f036-4c03-4563-95f7-766ca7aa9dd9"
70+
}
71+
},
72+
{
73+
"eventId": "5",
74+
"eventTime": "2024-03-25T22:18:26.641732Z",
75+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED",
76+
"taskId": "1050070",
77+
"workflowExecutionUpdateAdmittedEventAttributes": {
78+
"request": {
79+
"meta": {
80+
"updateId": "33160fb1-3285-49a6-a655-078dfc30099a",
81+
"identity": "75101@Quinn-Klassens-MacBook-Pro.local@"
82+
},
83+
"input": {
84+
"header": {},
85+
"name": "fetch_and_add",
86+
"args": {
87+
"payloads": [
88+
{
89+
"metadata": {
90+
"encoding": "anNvbi9wbGFpbg=="
91+
},
92+
"data": "MA=="
93+
}
94+
]
95+
}
96+
}
97+
},
98+
"origin": "UPDATE_REQUESTED_EVENT_ORIGIN_REAPPLY"
99+
}
100+
},
101+
{
102+
"eventId": "6",
103+
"eventTime": "2024-03-25T22:18:26.641772Z",
104+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
105+
"taskId": "1050071",
106+
"workflowTaskScheduledEventAttributes": {
107+
"taskQueue": {
108+
"name": "update",
109+
"kind": "TASK_QUEUE_KIND_NORMAL"
110+
},
111+
"startToCloseTimeout": "10s",
112+
"attempt": 1
113+
}
114+
},
115+
{
116+
"eventId": "7",
117+
"eventTime": "2024-03-25T22:18:26.644481Z",
118+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
119+
"taskId": "1050079",
120+
"workflowTaskStartedEventAttributes": {
121+
"scheduledEventId": "6",
122+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
123+
"requestId": "1b4a2c02-27ac-46a6-838a-43fbf6870e4a",
124+
"historySizeBytes": "722"
125+
}
126+
},
127+
{
128+
"eventId": "8",
129+
"eventTime": "2024-03-25T22:18:26.646631Z",
130+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
131+
"taskId": "1050083",
132+
"workflowTaskCompletedEventAttributes": {
133+
"scheduledEventId": "6",
134+
"startedEventId": "7",
135+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
136+
"workerVersion": {
137+
"buildId": "c39a89ac57bee1cb328b9820896586dc"
138+
},
139+
"sdkMetadata": {
140+
"langUsedFlags": [
141+
3,
142+
4
143+
],
144+
"sdkName": "temporal-go",
145+
"sdkVersion": "1.26.0"
146+
},
147+
"meteringMetadata": {}
148+
}
149+
},
150+
{
151+
"eventId": "9",
152+
"eventTime": "2024-03-25T22:18:26.646722Z",
153+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED",
154+
"taskId": "1050084",
155+
"workflowExecutionUpdateAcceptedEventAttributes": {
156+
"protocolInstanceId": "33160fb1-3285-49a6-a655-078dfc30099a",
157+
"acceptedRequestMessageId": "33160fb1-3285-49a6-a655-078dfc30099a/request",
158+
"acceptedRequestSequencingEventId": "5"
159+
}
160+
},
161+
{
162+
"eventId": "10",
163+
"eventTime": "2024-03-25T22:18:26.646738Z",
164+
"eventType": "EVENT_TYPE_TIMER_STARTED",
165+
"taskId": "1050085",
166+
"timerStartedEventAttributes": {
167+
"timerId": "10",
168+
"startToFireTimeout": "1s",
169+
"workflowTaskCompletedEventId": "8"
170+
}
171+
},
172+
{
173+
"eventId": "11",
174+
"eventTime": "2024-03-25T22:18:27.648712Z",
175+
"eventType": "EVENT_TYPE_TIMER_FIRED",
176+
"taskId": "1050089",
177+
"timerFiredEventAttributes": {
178+
"timerId": "10",
179+
"startedEventId": "10"
180+
}
181+
},
182+
{
183+
"eventId": "12",
184+
"eventTime": "2024-03-25T22:18:27.648735Z",
185+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
186+
"taskId": "1050090",
187+
"workflowTaskScheduledEventAttributes": {
188+
"taskQueue": {
189+
"name": "update",
190+
"kind": "TASK_QUEUE_KIND_NORMAL"
191+
},
192+
"startToCloseTimeout": "10s",
193+
"attempt": 1
194+
}
195+
},
196+
{
197+
"eventId": "13",
198+
"eventTime": "2024-03-25T22:18:27.651571Z",
199+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
200+
"taskId": "1050093",
201+
"workflowTaskStartedEventAttributes": {
202+
"scheduledEventId": "12",
203+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
204+
"requestId": "1c608045-63f5-4ee9-95ad-95cb34b9ab7d",
205+
"historySizeBytes": "1196"
206+
}
207+
},
208+
{
209+
"eventId": "14",
210+
"eventTime": "2024-03-25T22:18:27.653864Z",
211+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
212+
"taskId": "1050097",
213+
"workflowTaskCompletedEventAttributes": {
214+
"scheduledEventId": "12",
215+
"startedEventId": "13",
216+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
217+
"workerVersion": {
218+
"buildId": "c39a89ac57bee1cb328b9820896586dc"
219+
},
220+
"sdkMetadata": {},
221+
"meteringMetadata": {}
222+
}
223+
},
224+
{
225+
"eventId": "15",
226+
"eventTime": "2024-03-25T22:18:27.653912Z",
227+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED",
228+
"taskId": "1050098",
229+
"workflowExecutionUpdateCompletedEventAttributes": {
230+
"meta": {
231+
"updateId": "33160fb1-3285-49a6-a655-078dfc30099a"
232+
},
233+
"acceptedEventId": "9",
234+
"outcome": {
235+
"success": {
236+
"payloads": [
237+
{
238+
"metadata": {
239+
"encoding": "anNvbi9wbGFpbg=="
240+
},
241+
"data": "MA=="
242+
}
243+
]
244+
}
245+
}
246+
}
247+
},
248+
{
249+
"eventId": "16",
250+
"eventTime": "2024-03-25T22:18:48.754878Z",
251+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
252+
"taskId": "1050100",
253+
"workflowExecutionSignaledEventAttributes": {
254+
"signalName": "done",
255+
"input": {},
256+
"identity": "temporal-cli:quinnklassen@Quinn-Klassens-MacBook-Pro.local"
257+
}
258+
},
259+
{
260+
"eventId": "17",
261+
"eventTime": "2024-03-25T22:18:48.754880Z",
262+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
263+
"taskId": "1050101",
264+
"workflowTaskScheduledEventAttributes": {
265+
"taskQueue": {
266+
"name": "update",
267+
"kind": "TASK_QUEUE_KIND_NORMAL"
268+
},
269+
"startToCloseTimeout": "10s",
270+
"attempt": 1
271+
}
272+
},
273+
{
274+
"eventId": "18",
275+
"eventTime": "2024-03-25T22:18:48.756089Z",
276+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
277+
"taskId": "1050104",
278+
"workflowTaskStartedEventAttributes": {
279+
"scheduledEventId": "17",
280+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
281+
"requestId": "cf986b25-0ead-4639-a727-57e3ac88da65",
282+
"historySizeBytes": "1658"
283+
}
284+
},
285+
{
286+
"eventId": "19",
287+
"eventTime": "2024-03-25T22:18:48.758130Z",
288+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
289+
"taskId": "1050108",
290+
"workflowTaskCompletedEventAttributes": {
291+
"scheduledEventId": "17",
292+
"startedEventId": "18",
293+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
294+
"workerVersion": {
295+
"buildId": "c39a89ac57bee1cb328b9820896586dc"
296+
},
297+
"sdkMetadata": {
298+
"sdkName": "temporal-go",
299+
"sdkVersion": "1.26.0"
300+
},
301+
"meteringMetadata": {}
302+
}
303+
},
304+
{
305+
"eventId": "20",
306+
"eventTime": "2024-03-25T22:18:48.758153Z",
307+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
308+
"taskId": "1050109",
309+
"workflowExecutionCompletedEventAttributes": {
310+
"result": {
311+
"payloads": [
312+
{
313+
"metadata": {
314+
"encoding": "anNvbi9wbGFpbg=="
315+
},
316+
"data": "MA=="
317+
}
318+
]
319+
},
320+
"workflowTaskCompletedEventId": "19"
321+
}
322+
}
323+
]
324+
}
+226
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
{
2+
"events": [
3+
{
4+
"eventId": "1",
5+
"eventTime": "2024-03-25T22:18:22.830541Z",
6+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
7+
"taskId": "1050032",
8+
"workflowExecutionStartedEventAttributes": {
9+
"workflowType": {
10+
"name": "CounterWorkflow"
11+
},
12+
"taskQueue": {
13+
"name": "update",
14+
"kind": "TASK_QUEUE_KIND_NORMAL"
15+
},
16+
"workflowExecutionTimeout": "0s",
17+
"workflowRunTimeout": "0s",
18+
"workflowTaskTimeout": "10s",
19+
"originalExecutionRunId": "a178da94-2ac5-4866-b1a5-4c786fa4e29d",
20+
"identity": "75101@Quinn-Klassens-MacBook-Pro.local@",
21+
"firstExecutionRunId": "a178da94-2ac5-4866-b1a5-4c786fa4e29d",
22+
"attempt": 1,
23+
"firstWorkflowTaskBackoff": "0s",
24+
"header": {},
25+
"workflowId": "update-workflow-ID"
26+
}
27+
},
28+
{
29+
"eventId": "2",
30+
"eventTime": "2024-03-25T22:18:22.830576Z",
31+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
32+
"taskId": "1050033",
33+
"workflowTaskScheduledEventAttributes": {
34+
"taskQueue": {
35+
"name": "update",
36+
"kind": "TASK_QUEUE_KIND_NORMAL"
37+
},
38+
"startToCloseTimeout": "10s",
39+
"attempt": 1
40+
}
41+
},
42+
{
43+
"eventId": "3",
44+
"eventTime": "2024-03-25T22:18:22.832619Z",
45+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
46+
"taskId": "1050040",
47+
"workflowTaskStartedEventAttributes": {
48+
"scheduledEventId": "2",
49+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
50+
"requestId": "7812cc67-4c7e-46a8-918c-2c744b88aea0",
51+
"historySizeBytes": "502"
52+
}
53+
},
54+
{
55+
"eventId": "4",
56+
"eventTime": "2024-03-25T22:18:26.641635Z",
57+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_FAILED",
58+
"taskId": "1050069",
59+
"workflowTaskFailedEventAttributes": {
60+
"scheduledEventId": "2",
61+
"startedEventId": "3",
62+
"cause": "WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW",
63+
"failure": {
64+
"message": "<unknown-user>: test",
65+
"resetWorkflowFailureInfo": {}
66+
},
67+
"identity": "history-service",
68+
"baseRunId": "a178da94-2ac5-4866-b1a5-4c786fa4e29d",
69+
"newRunId": "3962f036-4c03-4563-95f7-766ca7aa9dd9"
70+
}
71+
},
72+
{
73+
"eventId": "5",
74+
"eventTime": "2024-03-25T22:18:26.641732Z",
75+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED",
76+
"taskId": "1050070",
77+
"workflowExecutionUpdateAdmittedEventAttributes": {
78+
"request": {
79+
"meta": {
80+
"updateId": "33160fb1-3285-49a6-a655-078dfc30099a",
81+
"identity": "75101@Quinn-Klassens-MacBook-Pro.local@"
82+
},
83+
"input": {
84+
"header": {},
85+
"name": "fetch_and_add",
86+
"args": {
87+
"payloads": [
88+
{
89+
"metadata": {
90+
"encoding": "anNvbi9wbGFpbg=="
91+
},
92+
"data": "MA=="
93+
}
94+
]
95+
}
96+
}
97+
},
98+
"origin": "UPDATE_REQUESTED_EVENT_ORIGIN_REAPPLY"
99+
}
100+
},
101+
{
102+
"eventId": "6",
103+
"eventTime": "2024-03-25T22:18:26.641772Z",
104+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
105+
"taskId": "1050071",
106+
"workflowTaskScheduledEventAttributes": {
107+
"taskQueue": {
108+
"name": "update",
109+
"kind": "TASK_QUEUE_KIND_NORMAL"
110+
},
111+
"startToCloseTimeout": "10s",
112+
"attempt": 1
113+
}
114+
},
115+
{
116+
"eventId": "7",
117+
"eventTime": "2024-03-25T22:18:26.644481Z",
118+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
119+
"taskId": "1050079",
120+
"workflowTaskStartedEventAttributes": {
121+
"scheduledEventId": "6",
122+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
123+
"requestId": "1b4a2c02-27ac-46a6-838a-43fbf6870e4a",
124+
"historySizeBytes": "722"
125+
}
126+
},
127+
{
128+
"eventId": "8",
129+
"eventTime": "2024-03-25T22:18:26.646631Z",
130+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
131+
"taskId": "1050083",
132+
"workflowTaskCompletedEventAttributes": {
133+
"scheduledEventId": "6",
134+
"startedEventId": "7",
135+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
136+
"workerVersion": {
137+
"buildId": "c39a89ac57bee1cb328b9820896586dc"
138+
},
139+
"sdkMetadata": {
140+
"langUsedFlags": [
141+
3,
142+
4
143+
],
144+
"sdkName": "temporal-go",
145+
"sdkVersion": "1.26.0"
146+
},
147+
"meteringMetadata": {}
148+
}
149+
},
150+
{
151+
"eventId": "9",
152+
"eventTime": "2024-03-25T22:18:48.754878Z",
153+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
154+
"taskId": "1050100",
155+
"workflowExecutionSignaledEventAttributes": {
156+
"signalName": "done",
157+
"input": {},
158+
"identity": "temporal-cli:quinnklassen@Quinn-Klassens-MacBook-Pro.local"
159+
}
160+
},
161+
{
162+
"eventId": "10",
163+
"eventTime": "2024-03-25T22:18:48.754880Z",
164+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
165+
"taskId": "1050101",
166+
"workflowTaskScheduledEventAttributes": {
167+
"taskQueue": {
168+
"name": "update",
169+
"kind": "TASK_QUEUE_KIND_NORMAL"
170+
},
171+
"startToCloseTimeout": "10s",
172+
"attempt": 1
173+
}
174+
},
175+
{
176+
"eventId": "11",
177+
"eventTime": "2024-03-25T22:18:48.756089Z",
178+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
179+
"taskId": "1050104",
180+
"workflowTaskStartedEventAttributes": {
181+
"scheduledEventId": "10",
182+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
183+
"requestId": "cf986b25-0ead-4639-a727-57e3ac88da65",
184+
"historySizeBytes": "1658"
185+
}
186+
},
187+
{
188+
"eventId": "12",
189+
"eventTime": "2024-03-25T22:18:48.758130Z",
190+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
191+
"taskId": "1050108",
192+
"workflowTaskCompletedEventAttributes": {
193+
"scheduledEventId": "10",
194+
"startedEventId": "11",
195+
"identity": "74841@Quinn-Klassens-MacBook-Pro.local@",
196+
"workerVersion": {
197+
"buildId": "c39a89ac57bee1cb328b9820896586dc"
198+
},
199+
"sdkMetadata": {
200+
"sdkName": "temporal-go",
201+
"sdkVersion": "1.26.0"
202+
},
203+
"meteringMetadata": {}
204+
}
205+
},
206+
{
207+
"eventId": "13",
208+
"eventTime": "2024-03-25T22:18:48.758153Z",
209+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
210+
"taskId": "1050109",
211+
"workflowExecutionCompletedEventAttributes": {
212+
"result": {
213+
"payloads": [
214+
{
215+
"metadata": {
216+
"encoding": "anNvbi9wbGFpbg=="
217+
},
218+
"data": "MA=="
219+
}
220+
]
221+
},
222+
"workflowTaskCompletedEventId": "12"
223+
}
224+
}
225+
]
226+
}

‎test/replaytests/workflows.go

+28
Original file line numberDiff line numberDiff line change
@@ -546,3 +546,31 @@ func MultipleUpdateWorkflow(ctx workflow.Context) (int, error) {
546546
}
547547
return updatesRan, nil
548548
}
549+
550+
func CounterWorkflow(ctx workflow.Context) (int, error) {
551+
counter := 0
552+
553+
if err := workflow.SetUpdateHandlerWithOptions(
554+
ctx,
555+
"fetch_and_add",
556+
func(ctx workflow.Context, i int) (int, error) {
557+
tmp := counter
558+
counter += i
559+
_ = workflow.Sleep(ctx, 1*time.Second)
560+
return tmp, nil
561+
},
562+
workflow.UpdateHandlerOptions{Validator: nonNegative},
563+
); err != nil {
564+
return 0, err
565+
}
566+
567+
_ = workflow.GetSignalChannel(ctx, "done").Receive(ctx, nil)
568+
return counter, ctx.Err()
569+
}
570+
571+
func nonNegative(ctx workflow.Context, i int) error {
572+
if i < 0 {
573+
return fmt.Errorf("addend must be non-negative (%v)", i)
574+
}
575+
return nil
576+
}

0 commit comments

Comments
 (0)
Please sign in to comment.