Skip to content

Commit bcc623d

Browse files
authoredJul 25, 2024··
Add support for managing schedule search attributes when when updating a schedule (#1562)
Add support for defining search attributes when updating schedules
1 parent 9c40461 commit bcc623d

File tree

3 files changed

+166
-13
lines changed

3 files changed

+166
-13
lines changed
 

‎internal/internal_schedule_client.go

+26-8
Original file line numberDiff line numberDiff line change
@@ -294,13 +294,24 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
294294
if err != nil {
295295
return err
296296
}
297+
298+
var newSA *commonpb.SearchAttributes
299+
attributes := newSchedule.TypedSearchAttributes
300+
if attributes != nil {
301+
newSA, err = serializeTypedSearchAttributes(attributes.GetUntypedValues())
302+
if err != nil {
303+
return err
304+
}
305+
}
306+
297307
_, err = scheduleHandle.client.workflowService.UpdateSchedule(grpcCtx, &workflowservice.UpdateScheduleRequest{
298-
Namespace: scheduleHandle.client.namespace,
299-
ScheduleId: scheduleHandle.ID,
300-
Schedule: newSchedulePB,
301-
ConflictToken: nil,
302-
Identity: scheduleHandle.client.identity,
303-
RequestId: uuid.New(),
308+
Namespace: scheduleHandle.client.namespace,
309+
ScheduleId: scheduleHandle.ID,
310+
Schedule: newSchedulePB,
311+
ConflictToken: nil,
312+
Identity: scheduleHandle.client.identity,
313+
RequestId: uuid.New(),
314+
SearchAttributes: newSA,
304315
})
305316
return err
306317
}
@@ -484,6 +495,12 @@ func scheduleDescriptionFromPB(
484495
return nil, err
485496
}
486497

498+
var typedSearchAttributes SearchAttributes
499+
searchAttributes := describeResponse.SearchAttributes
500+
if searchAttributes != nil {
501+
typedSearchAttributes = convertToTypedSearchAttributes(logger, searchAttributes.IndexedFields)
502+
}
503+
487504
return &ScheduleDescription{
488505
Schedule: Schedule{
489506
Action: actionDescription,
@@ -510,8 +527,9 @@ func scheduleDescriptionFromPB(
510527
CreatedAt: describeResponse.Info.GetCreateTime().AsTime(),
511528
LastUpdateAt: describeResponse.Info.GetUpdateTime().AsTime(),
512529
},
513-
Memo: describeResponse.Memo,
514-
SearchAttributes: describeResponse.SearchAttributes,
530+
Memo: describeResponse.Memo,
531+
SearchAttributes: searchAttributes,
532+
TypedSearchAttributes: typedSearchAttributes,
515533
}, nil
516534
}
517535

‎internal/schedule_client.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -416,12 +416,19 @@ type (
416416
// Memo - Non-indexed user supplied information.
417417
Memo *commonpb.Memo
418418

419-
// SearchAttributes - Indexed info that can be used in query of List schedules APIs. The key and value type must be registered on Temporal server side.
420-
// Use GetSearchAttributes API to get valid key and corresponding value type.
419+
// SearchAttributes - Additional indexed information used for search and visibility. The key and its value type
420+
// are registered on Temporal server side.
421421
// For supported operations on different server versions see [Visibility].
422422
//
423423
// [Visibility]: https://docs.temporal.io/visibility
424424
SearchAttributes *commonpb.SearchAttributes
425+
426+
// TypedSearchAttributes - Additional indexed information used for search and visibility. The key and its value
427+
// type are registered on Temporal server side.
428+
// For supported operations on different server versions see [Visibility].
429+
//
430+
// [Visibility]: https://docs.temporal.io/visibility
431+
TypedSearchAttributes SearchAttributes
425432
}
426433

427434
// SchedulePolicies describes the current polcies of a schedule.
@@ -476,6 +483,15 @@ type (
476483
ScheduleUpdate struct {
477484
// Schedule - New schedule to replace the existing schedule with
478485
Schedule *Schedule
486+
487+
// TypedSearchAttributes - Optional indexed info that can be used for querying via the List schedules APIs.
488+
// The key and value type must be registered on Temporal server side.
489+
//
490+
// nil: leave any pre-existing assigned search attributes intact
491+
// empty: remove any and all pre-existing assigned search attributes
492+
// attributes present: replace any and all pre-existing assigned search attributes with the defined search
493+
// attributes, i.e. upsert
494+
TypedSearchAttributes *SearchAttributes
479495
}
480496

481497
// ScheduleUpdateInput describes the current state of the schedule to be updated.

‎test/integration_test.go

+122-3
Original file line numberDiff line numberDiff line change
@@ -4980,9 +4980,18 @@ func (ts *IntegrationTestSuite) TestScheduleUpdate() {
49804980
err = handle.Delete(ctx)
49814981
ts.NoError(err)
49824982
}()
4983+
4984+
stringKey := temporal.NewSearchAttributeKeyString("CustomStringField")
4985+
keywordKey := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField")
4986+
sa := temporal.NewSearchAttributes(
4987+
stringKey.ValueSet("CustomStringFieldValue"),
4988+
keywordKey.ValueSet("foo"),
4989+
)
4990+
49834991
updateFunc := func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
49844992
return &client.ScheduleUpdate{
4985-
Schedule: &input.Description.Schedule,
4993+
Schedule: &input.Description.Schedule,
4994+
TypedSearchAttributes: &sa,
49864995
}, nil
49874996
}
49884997
description, err := handle.Describe(ctx)
@@ -4993,9 +5002,119 @@ func (ts *IntegrationTestSuite) TestScheduleUpdate() {
49935002
})
49945003
ts.NoError(err)
49955004

4996-
description2, err := handle.Describe(ctx)
5005+
ts.EventuallyWithT(func(c *assert.CollectT) {
5006+
d, err := handle.Describe(ctx)
5007+
assert.NoError(c, err)
5008+
assert.Equal(c, description.Schedule, d.Schedule)
5009+
assert.Equal(c, 2, d.TypedSearchAttributes.Size())
5010+
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
5011+
expectedString, _ := sa.GetString(stringKey)
5012+
assert.Equal(c, expectedString, returnedString)
5013+
returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey)
5014+
expectedKeyword, _ := sa.GetKeyword(keywordKey)
5015+
assert.Equal(c, expectedKeyword, returnedKeyword)
5016+
assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields))
5017+
}, time.Second, 100*time.Millisecond)
5018+
5019+
// nil search attributes should leave current search attributes untouched
5020+
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
5021+
return &client.ScheduleUpdate{
5022+
Schedule: &input.Description.Schedule,
5023+
}, nil
5024+
}
5025+
5026+
err = handle.Update(ctx, client.ScheduleUpdateOptions{
5027+
DoUpdate: updateFunc,
5028+
})
5029+
ts.NoError(err)
5030+
5031+
ts.EventuallyWithT(func(c *assert.CollectT) {
5032+
d, err := handle.Describe(ctx)
5033+
assert.NoError(c, err)
5034+
assert.Equal(c, 2, d.TypedSearchAttributes.Size())
5035+
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
5036+
expectedString, _ := sa.GetString(stringKey)
5037+
assert.Equal(c, expectedString, returnedString)
5038+
returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey)
5039+
expectedKeyword, _ := sa.GetKeyword(keywordKey)
5040+
assert.Equal(c, expectedKeyword, returnedKeyword)
5041+
assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields))
5042+
}, time.Second, 100*time.Millisecond)
5043+
5044+
// Updating an attribute without affecting the others
5045+
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
5046+
newSa := temporal.NewSearchAttributes(
5047+
input.Description.TypedSearchAttributes.Copy(),
5048+
stringKey.ValueSet("Changed"),
5049+
)
5050+
return &client.ScheduleUpdate{
5051+
Schedule: &input.Description.Schedule,
5052+
TypedSearchAttributes: &newSa,
5053+
}, nil
5054+
}
5055+
5056+
err = handle.Update(ctx, client.ScheduleUpdateOptions{
5057+
DoUpdate: updateFunc,
5058+
})
5059+
ts.NoError(err)
5060+
5061+
ts.EventuallyWithT(func(c *assert.CollectT) {
5062+
d, err := handle.Describe(ctx)
5063+
assert.NoError(c, err)
5064+
assert.Equal(c, 2, d.TypedSearchAttributes.Size())
5065+
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
5066+
expectedString, _ := temporal.NewSearchAttributes(stringKey.ValueSet("Changed")).GetString(stringKey)
5067+
assert.Equal(c, expectedString, returnedString)
5068+
returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey)
5069+
expectedKeyword, _ := sa.GetKeyword(keywordKey)
5070+
assert.Equal(c, expectedKeyword, returnedKeyword)
5071+
assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields))
5072+
}, time.Second, 100*time.Millisecond)
5073+
5074+
// updating a single search attribute on an existing collection acts as an upsert on the entire collection
5075+
newSa := temporal.NewSearchAttributes(stringKey.ValueSet("Changed"))
5076+
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
5077+
return &client.ScheduleUpdate{
5078+
Schedule: &input.Description.Schedule,
5079+
TypedSearchAttributes: &newSa,
5080+
}, nil
5081+
}
5082+
5083+
err = handle.Update(ctx, client.ScheduleUpdateOptions{
5084+
DoUpdate: updateFunc,
5085+
})
5086+
ts.NoError(err)
5087+
5088+
ts.EventuallyWithT(func(c *assert.CollectT) {
5089+
d, err := handle.Describe(ctx)
5090+
assert.NoError(c, err)
5091+
assert.Equal(c, 1, d.TypedSearchAttributes.Size())
5092+
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
5093+
expectedString, _ := newSa.GetString(stringKey)
5094+
assert.Equal(c, expectedString, returnedString)
5095+
assert.Equal(c, 1, len(d.SearchAttributes.IndexedFields))
5096+
}, time.Second, 100*time.Millisecond)
5097+
5098+
// empty search attributes should remove pre-existing search attributes
5099+
sa = temporal.NewSearchAttributes()
5100+
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
5101+
return &client.ScheduleUpdate{
5102+
Schedule: &input.Description.Schedule,
5103+
TypedSearchAttributes: &sa,
5104+
}, nil
5105+
}
5106+
5107+
err = handle.Update(ctx, client.ScheduleUpdateOptions{
5108+
DoUpdate: updateFunc,
5109+
})
49975110
ts.NoError(err)
4998-
ts.Equal(description.Schedule, description2.Schedule)
5111+
5112+
ts.EventuallyWithT(func(c *assert.CollectT) {
5113+
d, err := handle.Describe(ctx)
5114+
assert.NoError(c, err)
5115+
assert.Nil(c, d.SearchAttributes)
5116+
assert.Empty(c, d.TypedSearchAttributes)
5117+
}, time.Second, 100*time.Millisecond)
49995118
}
50005119

50015120
func (ts *IntegrationTestSuite) TestScheduleUpdateCancelUpdate() {

0 commit comments

Comments
 (0)
Please sign in to comment.