Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delta: [#558] Support dynamic wildcard subscription in delta-xds #559

Merged
merged 6 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 8 additions & 11 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,26 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
for name := range state.GetResourceVersions() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
// Reply only with the requested resources
nextVersionMap = make(map[string]string, len(state.GetResourceVersions()))
for name, prevVersion := range state.GetResourceVersions() {
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
prevVersion, found := state.GetResourceVersions()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
}
nextVersionMap[name] = nextVersion
} else {
// We track non-existent resources for non-wildcard streams until the client explicitly unsubscribes from them.
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
nextVersionMap[name] = ""
// The version check is to make sure we are only sending an update once right after removal.
// If the client keeps the subscription, we skip the add for every subsequent response.
if prevVersion != "" {
toRemove = append(toRemove, name)
}
} else if found {
toRemove = append(toRemove, name)
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, versionMap[typ]), watches[typ])
}, state, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -221,13 +225,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, stream.NewStreamState(false, map[string]string{names[rsrc.EndpointType][0]: ""}), watchCh)
}, state, watchCh)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
Expand Down
6 changes: 5 additions & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
}

for id, watch := range cache.deltaWatches {
if !watch.StreamState.WatchesResources(modified) {
continue
}

res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState)
if res != nil {
delete(cache.deltaWatches, id)
Expand Down Expand Up @@ -391,7 +395,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
cache.typeURL, state.GetResourceVersions(), cache.getVersion())
cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion())
}

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}
Expand Down
22 changes: 16 additions & 6 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,15 @@ func TestLinearDeltaExistingResources(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"b": "", "c": ""}) // watching b and c - not interested in a
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{})

state = stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state = stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
Expand All @@ -511,13 +513,15 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": hashB})
state := stream.NewStreamState(false, map[string]string{"b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -543,14 +547,16 @@ func TestLinearDeltaResourceUpdate(t *testing.T) {
// There is currently no delta watch
checkVersionMapNotSet(t, c)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
checkVersionMapSet(t, c)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -577,13 +583,15 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -600,7 +608,8 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
c := NewLinearCache(testType)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
checkVersionMapNotSet(t, c)
assert.Equal(t, 0, c.NumResources())
Expand Down Expand Up @@ -745,6 +754,7 @@ func TestLinearMixedWatches(t *testing.T) {
checkVersionMapNotSet(t, c)

deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
wd := make(chan DeltaResponse, 1)

// Initial update
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (cache *snapshotCache) ClearSnapshot(node string) {

// nameSet creates a map from a string slice to value true.
func nameSet(names []string) map[string]bool {
set := make(map[string]bool)
set := make(map[string]bool, len(names))
for _, name := range names {
set[name] = true
}
Expand Down Expand Up @@ -498,9 +498,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
watchID := cache.nextDeltaWatchID()

if exists {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t))
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t))
} else {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetResourceVersions(), nodeID)
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID)
}

info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})
Expand Down
39 changes: 30 additions & 9 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,17 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
if !ok {
// Initialize the state of the stream.
// Since there was no previous state, we know we're handling the first request of this type
// so we set the initial resource versions if we have any, and also signal if this stream is in wildcard mode.
// so we set the initial resource versions if we have any.
// We also set the stream as wildcard based on its legacy meaning (no resource subscribed).
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
// If the state starts with this legacy mode, adding new resources will not unsubscribe to wildcard.
// It can still be done by unsubscribing from "*"
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
watch.Cancel()
}

s.subscribe(req.GetResourceNamesSubscribe(), watch.state.GetResourceVersions())
s.unsubscribe(req.GetResourceNamesUnsubscribe(), watch.state.GetResourceVersions())
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
Expand Down Expand Up @@ -210,17 +213,35 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro
}

// When we subscribe, we just want to make the cache know we are subscribing to a resource.
// Providing a name with an empty version is enough to make that happen.
func (s *server) subscribe(resources []string, sv map[string]string) {
// Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on.
func (s *server) subscribe(resources []string, streamState *stream.StreamState) {
sv := streamState.GetSubscribedResourceNames()
for _, resource := range resources {
sv[resource] = ""
if resource == "*" {
streamState.SetWildcard(true)
continue
}
sv[resource] = struct{}{}
}
}

// Unsubscriptions remove resources from the stream state to
// indicate to the cache that we don't care about the resource anymore
func (s *server) unsubscribe(resources []string, sv map[string]string) {
// Unsubscriptions remove resources from the stream subscribed resources.
valerian-roche marked this conversation as resolved.
Show resolved Hide resolved
// If explicitly unsubscribing from wildcard, the stream is updated and is now watching only subscribed resources.
valerian-roche marked this conversation as resolved.
Show resolved Hide resolved
func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) {
sv := streamState.GetSubscribedResourceNames()
for _, resource := range resources {
if resource == "*" {
streamState.SetWildcard(false)
continue
}
if _, ok := sv[resource]; ok && streamState.IsWildcard() {
// xds protocol specifically states that if a resource if unsubscribed while a wildcard watch is present,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we clean this comment block up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make it clearer, though it is still so counter-intuitive to me I have a hard time formalizing it

// the control-plane must return a response with either the resource set as removed (if no longer present in the snapshot)
// or with its content
// If the stream is in wildcard mode specifying an empty entry in the resource versions will either send the resource content
// or mark it as removed (done in createDeltaResponse)
streamState.GetResourceVersions()[resource] = ""
}
delete(sv, resource)
}
}
41 changes: 35 additions & 6 deletions pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,44 @@ type DeltaStream interface {

// StreamState will keep track of resource state per type on a stream.
type StreamState struct { // nolint:golint,revive
// Indicates whether the original DeltaRequest was a wildcard LDS/RDS request.
// Indicates whether the stream currently has a wildcard watch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this comment explicit to delta since SOTW also uses this StreamState internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I have a follow-up PR to add it for sotw, will re-update then

wildcard bool

// Provides the list of resources explicitly requested by the client
// This list might be non-empty even when set as wildcard
subscribedResourceNames map[string]struct{}

// ResourceVersions contains a hash of the resource as the value and the resource name as the key.
// This field stores the last state sent to the client.
resourceVersions map[string]string

// knownResourceNames contains resource names that a client has received previously
knownResourceNames map[string]map[string]struct{}

// indicates whether the object has beed modified since its creation
// indicates whether the object has been modified since its creation
first bool
}

func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} {
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
return s.subscribedResourceNames
}

func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) {
s.subscribedResourceNames = subscribedResourceNames
}

func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool {
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
if s.IsWildcard() {
return true
}
for resourceName := range resourceNames {
if _, ok := s.subscribedResourceNames[resourceName]; ok {
return true
}
}
return false
}

func (s *StreamState) GetResourceVersions() map[string]string {
return s.resourceVersions
}
Expand All @@ -50,6 +74,10 @@ func (s *StreamState) IsFirst() bool {
return s.first
}

func (s *StreamState) SetWildcard(wildcard bool) {
s.wildcard = wildcard
}

func (s *StreamState) IsWildcard() bool {
return s.wildcard
}
Expand All @@ -73,10 +101,11 @@ func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
// NewStreamState initializes a stream state.
func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState {
state := StreamState{
wildcard: wildcard,
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
wildcard: wildcard,
subscribedResourceNames: map[string]struct{}{},
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
}

if initialResourceVersions == nil {
Expand Down