Skip to content

Commit

Permalink
[IMPROVED] Stream placement in cluster with N > R (#5079)
Browse files Browse the repository at this point in the history
Calculate peer group based on streams as well as usage and realtime data
on HAAssets vs async.

Resolves: #5071 

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 14, 2024
2 parents b96c9bc + cc622b4 commit 056ad18
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
34 changes: 27 additions & 7 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5550,6 +5550,7 @@ func (e *selectPeerError) accumulate(eAdd *selectPeerError) {

// selectPeerGroup will select a group of peers to start a raft group.
// when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped
// js lock should be held.
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int, ignore []string) ([]string, *selectPeerError) {
if cluster == _EMPTY_ || cfg == nil {
return nil, &selectPeerError{misc: true}
Expand All @@ -5571,6 +5572,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
id string
avail uint64
ha int
ns int
}

var nodes []wn
Expand Down Expand Up @@ -5635,6 +5637,22 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}

// Grab the number of streams and HA assets currently assigned to each peer.
// HAAssets under usage is async, so calculate here in realtime based on assignments.
peerStreams := make(map[string]int, len(peers))
peerHA := make(map[string]int, len(peers))
for _, asa := range cc.streams {
for _, sa := range asa {
isHA := len(sa.Group.Peers) > 1
for _, peer := range sa.Group.Peers {
peerStreams[peer]++
if isHA {
peerHA[peer]++
}
}
}
}

maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets

// An error is a result of multiple individual placement decisions.
Expand Down Expand Up @@ -5697,7 +5715,6 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}

var available uint64
var ha int
if ni.stats != nil {
switch cfg.Storage {
case MemoryStorage:
Expand All @@ -5717,7 +5734,6 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
available = uint64(ni.cfg.MaxStore) - used
}
}
ha = ni.stats.HAAssets
}

// Otherwise check if we have enough room if maxBytes set.
Expand Down Expand Up @@ -5749,7 +5765,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}
// Add to our list of potential nodes.
nodes = append(nodes, wn{p.ID, available, ha})
nodes = append(nodes, wn{p.ID, available, peerHA[p.ID], peerStreams[p.ID]})
}

// If we could not select enough peers, fail.
Expand All @@ -5761,10 +5777,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
return nil, &err
}
// Sort based on available from most to least.
sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail })

// If we are placing a replicated stream, let's sort based in haAssets, as that is more important to balance.
// Sort based on available from most to least, breaking ties by number of total streams assigned to the peer.
sort.Slice(nodes, func(i, j int) bool {
if nodes[i].avail == nodes[j].avail {
return nodes[i].ns < nodes[j].ns
}
return nodes[i].avail > nodes[j].avail
})
// If we are placing a replicated stream, let's sort based on HAAssets, as that is more important to balance.
if cfg.Replicas > 1 {
sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].ha < nodes[j].ha })
}
Expand Down
27 changes: 27 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6519,3 +6519,30 @@ Setup:
}
}
}

// https://github.com/nats-io/nats-server/issues/5071
func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 5)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

for i := 1; i <= 10; i++ {
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", i),
Subjects: []string{fmt.Sprintf("foo.%d.*", i)},
Replicas: 3,
})
require_NoError(t, err)
}

// 10 streams, 3 replicas div 5 servers.
expectedStreams := 10 * 3 / 5
for _, s := range c.servers {
jsz, err := s.Jsz(nil)
require_NoError(t, err)
require_Equal(t, jsz.Streams, expectedStreams)
}
}

0 comments on commit 056ad18

Please sign in to comment.