Skip to content

Commit

Permalink
Update comments based on review
Browse files Browse the repository at this point in the history
  • Loading branch information
valerian-roche committed Jun 21, 2022
1 parent ac4e306 commit f79d7cc
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
23 changes: 13 additions & 10 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// Initialize the state of the stream.
// Since there was no previous state, we know we're handling the first request of this type
// so we set the initial resource versions if we have any.
// We also set the stream as wildcard based on its legacy meaning (no resource subscribed).
// If the state starts with this legacy mode, adding new resources will not unsubscribe to wildcard.
// It can still be done by unsubscribing from "*"
// We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe).
// If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard.
// It can still be done by explicitly unsubscribing from "*"
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
watch.Cancel()
Expand Down Expand Up @@ -225,8 +225,8 @@ func (s *server) subscribe(resources []string, streamState *stream.StreamState)
}
}

// Unsubscriptions remove resources from the stream subscribed resources.
// If explicitly unsubscribing from wildcard, the stream is updated and is now watching only subscribed resources.
// Unsubscriptions remove resources from the stream's subscribed resource list.
// If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources.
func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) {
sv := streamState.GetSubscribedResourceNames()
for _, resource := range resources {
Expand All @@ -235,11 +235,14 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState
continue
}
if _, ok := sv[resource]; ok && streamState.IsWildcard() {
// xds protocol specifically states that if a resource if unsubscribed while a wildcard watch is present,
// the control-plane must return a response with either the resource set as removed (if no longer present in the snapshot)
// or with its content
// If the stream is in wildcard mode specifying an empty entry in the resource versions will either send the resource content
// or mark it as removed (done in createDeltaResponse)
// The XDS protocol states that:
// * if a watch is currently wildcard
// * a resource is explicitly unsubscribed by name
// Then the control-plane must return in the response whether the resource is removed (if no longer present for this node)
// or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated
// To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either:
// * detect the version change, and return the resource (as an update)
// * detect the resource deletion, and set it as removed in the response
streamState.GetResourceVersions()[resource] = ""
}
delete(sv, resource)
Expand Down
12 changes: 11 additions & 1 deletion pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DeltaStream interface {

// StreamState will keep track of resource state per type on a stream.
type StreamState struct { // nolint:golint,revive
// Indicates whether the stream currently has a wildcard watch
// Indicates whether the delta stream currently has a wildcard watch
wildcard bool

// Provides the list of resources explicitly requested by the client
Expand All @@ -41,14 +41,24 @@ type StreamState struct { // nolint:golint,revive
first bool
}

// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to
// If the request is set to wildcard it may be empty
// Currently populated only when using delta-xds
func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} {
return s.subscribedResourceNames
}

// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to
// It is decorrelated from the wildcard state of the stream
// Currently used only when using delta-xds
func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) {
s.subscribedResourceNames = subscribedResourceNames
}

// WatchesResources returns whether at least one of the resource provided is currently watch by the stream
// It is currently only applicable to delta-xds
// If the request is wildcard, it will always return true
// Otherwise it will compare the provided resources to the list of resources currently subscribed
func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool {
if s.IsWildcard() {
return true
Expand Down

0 comments on commit f79d7cc

Please sign in to comment.