Skip to content

Commit fb06909

Browse files
authoredMay 21, 2024··
Send original update request back in response (#1480)
Server will use this value to reconstruct the update if it is lost in certain cases.
1 parent 222d4cf commit fb06909

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed
 

‎internal/internal_update.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type (
9191
updateProtocol struct {
9292
protoInstanceID string
9393
clientIdentity string
94+
initialRequest *updatepb.Request
9495
requestMsgID string
9596
requestSeqID int64
9697
scheduleUpdate func(name string, id string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks)
@@ -134,15 +135,16 @@ func (up *updateProtocol) requireState(action string, valid ...updateState) {
134135
}
135136

136137
func (up *updateProtocol) HandleMessage(msg *protocolpb.Message) error {
137-
var req updatepb.Request
138-
if err := msg.Body.UnmarshalTo(&req); err != nil {
138+
var request updatepb.Request
139+
if err := msg.Body.UnmarshalTo(&request); err != nil {
139140
return err
140141
}
142+
up.initialRequest = &request
141143
up.requireState("update request", updateStateNew)
142144
up.requestMsgID = msg.GetId()
143145
up.requestSeqID = msg.GetEventId()
144-
input := req.GetInput()
145-
up.scheduleUpdate(input.GetName(), req.GetMeta().GetUpdateId(), input.GetArgs(), input.GetHeader(), up)
146+
input := up.initialRequest.GetInput()
147+
up.scheduleUpdate(input.GetName(), up.initialRequest.GetMeta().GetUpdateId(), input.GetArgs(), input.GetHeader(), up)
146148
up.state = updateStateRequestInitiated
147149
return nil
148150
}
@@ -157,8 +159,11 @@ func (up *updateProtocol) Accept() {
157159
Body: protocol.MustMarshalAny(&updatepb.Acceptance{
158160
AcceptedRequestMessageId: up.requestMsgID,
159161
AcceptedRequestSequencingEventId: up.requestSeqID,
162+
AcceptedRequest: up.initialRequest,
160163
}),
161164
}, withExpectedEventPredicate(up.checkAcceptedEvent))
165+
// Stop holding a reference to the initial request to allow it to be GCed
166+
up.initialRequest = nil
162167
up.state = updateStateAccepted
163168
}
164169

@@ -171,8 +176,8 @@ func (up *updateProtocol) Reject(err error) {
171176
Body: protocol.MustMarshalAny(&updatepb.Rejection{
172177
RejectedRequestMessageId: up.requestMsgID,
173178
RejectedRequestSequencingEventId: up.requestSeqID,
179+
RejectedRequest: up.initialRequest,
174180
Failure: up.env.GetFailureConverter().ErrorToFailure(err),
175-
// RejectedRequest field no longer read by server - will be removed from API soon
176181
}),
177182
})
178183
up.state = updateStateCompleted

‎internal/internal_update_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -574,8 +574,8 @@ func TestAcceptedEventPredicate(t *testing.T) {
574574

575575
var acptmsg updatepb.Acceptance
576576
require.NoError(t, env.outbox[0].msg.Body.UnmarshalTo(&acptmsg))
577-
require.Nil(t, acptmsg.AcceptedRequest,
578-
"do not send the original request back - this field will be removed soon")
577+
require.EqualExportedValues(t, &request, acptmsg.AcceptedRequest,
578+
"Sent the original request back in the accepted message")
579579

580580
pred := env.outbox[0].eventPredicate
581581
for _, tc := range [...]struct {

0 commit comments

Comments
 (0)
Please sign in to comment.