Skip to content

Commit

Permalink
delta: support dynamic wildcard subscription in delta-xds (#559)
Browse files Browse the repository at this point in the history
* Support dynamic wildcard subscription in delta-xds

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Jun 28, 2022
1 parent 8c53ec3 commit 6a65c54
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 140 deletions.
19 changes: 8 additions & 11 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,26 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetResourceVersions() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
// Reply only with the requested resources
nextVersionMap = make(map[string]string, len(state.GetResourceVersions()))
for name, prevVersion := range state.GetResourceVersions() {
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
prevVersion, found := state.GetResourceVersions()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
}
nextVersionMap[name] = nextVersion
} else {
// We track non-existent resources for non-wildcard streams until the client explicitly unsubscribes from them.
nextVersionMap[name] = ""
// The version check is to make sure we are only sending an update once right after removal.
// If the client keeps the subscription, we skip the add for every subsequent response.
if prevVersion != "" {
toRemove = append(toRemove, name)
}
} else if found {
toRemove = append(toRemove, name)
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, versionMap[typ]), watches[typ])
}, state, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -221,13 +225,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, stream.NewStreamState(false, map[string]string{names[rsrc.EndpointType][0]: ""}), watchCh)
}, state, watchCh)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
Expand Down
6 changes: 5 additions & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
}

for id, watch := range cache.deltaWatches {
if !watch.StreamState.WatchesResources(modified) {
continue
}

res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState)
if res != nil {
delete(cache.deltaWatches, id)
Expand Down Expand Up @@ -391,7 +395,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
cache.typeURL, state.GetResourceVersions(), cache.getVersion())
cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion())
}

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}
Expand Down
22 changes: 16 additions & 6 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,15 @@ func TestLinearDeltaExistingResources(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"b": "", "c": ""}) // watching b and c - not interested in a
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{})

state = stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state = stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
Expand All @@ -511,13 +513,15 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": hashB})
state := stream.NewStreamState(false, map[string]string{"b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -543,14 +547,16 @@ func TestLinearDeltaResourceUpdate(t *testing.T) {
// There is currently no delta watch
checkVersionMapNotSet(t, c)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)
checkVersionMapSet(t, c)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -577,13 +583,15 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
err = c.UpdateResource("b", b)
assert.NoError(t, err)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
checkDeltaWatchCount(t, c, 0)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil)

state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w = make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
Expand All @@ -600,7 +608,8 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
c := NewLinearCache(testType)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
w := make(chan DeltaResponse, 1)
checkVersionMapNotSet(t, c)
assert.Equal(t, 0, c.NumResources())
Expand Down Expand Up @@ -745,6 +754,7 @@ func TestLinearMixedWatches(t *testing.T) {
checkVersionMapNotSet(t, c)

deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB})
deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}})
wd := make(chan DeltaResponse, 1)

// Initial update
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (cache *snapshotCache) ClearSnapshot(node string) {

// nameSet creates a map from a string slice to value true.
func nameSet(names []string) map[string]bool {
set := make(map[string]bool)
set := make(map[string]bool, len(names))
for _, name := range names {
set[name] = true
}
Expand Down Expand Up @@ -498,9 +498,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
watchID := cache.nextDeltaWatchID()

if exists {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetResourceVersions(), nodeID, snapshot.GetVersion(t))
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t))
} else {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetResourceVersions(), nodeID)
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID)
}

info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})
Expand Down
42 changes: 33 additions & 9 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,17 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
if !ok {
// Initialize the state of the stream.
// Since there was no previous state, we know we're handling the first request of this type
// so we set the initial resource versions if we have any, and also signal if this stream is in wildcard mode.
// so we set the initial resource versions if we have any.
// We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe).
// If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard.
// It can still be done by explicitly unsubscribing from "*"
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
watch.Cancel()
}

s.subscribe(req.GetResourceNamesSubscribe(), watch.state.GetResourceVersions())
s.unsubscribe(req.GetResourceNamesUnsubscribe(), watch.state.GetResourceVersions())
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
Expand Down Expand Up @@ -210,17 +213,38 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro
}

// When we subscribe, we just want to make the cache know we are subscribing to a resource.
// Providing a name with an empty version is enough to make that happen.
func (s *server) subscribe(resources []string, sv map[string]string) {
// Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on.
func (s *server) subscribe(resources []string, streamState *stream.StreamState) {
sv := streamState.GetSubscribedResourceNames()
for _, resource := range resources {
sv[resource] = ""
if resource == "*" {
streamState.SetWildcard(true)
continue
}
sv[resource] = struct{}{}
}
}

// Unsubscriptions remove resources from the stream state to
// indicate to the cache that we don't care about the resource anymore
func (s *server) unsubscribe(resources []string, sv map[string]string) {
// Unsubscriptions remove resources from the stream's subscribed resource list.
// If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources.
func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) {
sv := streamState.GetSubscribedResourceNames()
for _, resource := range resources {
if resource == "*" {
streamState.SetWildcard(false)
continue
}
if _, ok := sv[resource]; ok && streamState.IsWildcard() {
// The XDS protocol states that:
// * if a watch is currently wildcard
// * a resource is explicitly unsubscribed by name
// Then the control-plane must return in the response whether the resource is removed (if no longer present for this node)
// or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated
// To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either:
// * detect the version change, and return the resource (as an update)
// * detect the resource deletion, and set it as removed in the response
streamState.GetResourceVersions()[resource] = ""
}
delete(sv, resource)
}
}
51 changes: 45 additions & 6 deletions pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,54 @@ type DeltaStream interface {

// StreamState will keep track of resource state per type on a stream.
type StreamState struct { // nolint:golint,revive
// Indicates whether the original DeltaRequest was a wildcard LDS/RDS request.
// Indicates whether the delta stream currently has a wildcard watch
wildcard bool

// Provides the list of resources explicitly requested by the client
// This list might be non-empty even when set as wildcard
subscribedResourceNames map[string]struct{}

// ResourceVersions contains a hash of the resource as the value and the resource name as the key.
// This field stores the last state sent to the client.
resourceVersions map[string]string

// knownResourceNames contains resource names that a client has received previously
knownResourceNames map[string]map[string]struct{}

// indicates whether the object has beed modified since its creation
// indicates whether the object has been modified since its creation
first bool
}

// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to
// If the request is set to wildcard it may be empty
// Currently populated only when using delta-xds
func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} {
return s.subscribedResourceNames
}

// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to
// It is decorrelated from the wildcard state of the stream
// Currently used only when using delta-xds
func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) {
s.subscribedResourceNames = subscribedResourceNames
}

// WatchesResources returns whether at least one of the resource provided is currently watch by the stream
// It is currently only applicable to delta-xds
// If the request is wildcard, it will always return true
// Otherwise it will compare the provided resources to the list of resources currently subscribed
func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool {
if s.IsWildcard() {
return true
}
for resourceName := range resourceNames {
if _, ok := s.subscribedResourceNames[resourceName]; ok {
return true
}
}
return false
}

func (s *StreamState) GetResourceVersions() map[string]string {
return s.resourceVersions
}
Expand All @@ -50,6 +84,10 @@ func (s *StreamState) IsFirst() bool {
return s.first
}

func (s *StreamState) SetWildcard(wildcard bool) {
s.wildcard = wildcard
}

func (s *StreamState) IsWildcard() bool {
return s.wildcard
}
Expand All @@ -73,10 +111,11 @@ func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
// NewStreamState initializes a stream state.
func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState {
state := StreamState{
wildcard: wildcard,
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
wildcard: wildcard,
subscribedResourceNames: map[string]struct{}{},
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
}

if initialResourceVersions == nil {
Expand Down

0 comments on commit 6a65c54

Please sign in to comment.