Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Stream placement in cluster with N > R #5079

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 27 additions & 7 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5550,6 +5550,7 @@ func (e *selectPeerError) accumulate(eAdd *selectPeerError) {

// selectPeerGroup will select a group of peers to start a raft group.
// when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped
// js lock should be held.
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int, ignore []string) ([]string, *selectPeerError) {
if cluster == _EMPTY_ || cfg == nil {
return nil, &selectPeerError{misc: true}
Expand All @@ -5571,6 +5572,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
id string
avail uint64
ha int
ns int
}

var nodes []wn
Expand Down Expand Up @@ -5635,6 +5637,22 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}

// Grab the number of streams and HA assets currently assigned to each peer.
// HAAssets under usage is async, so calculate here in realtime based on assignments.
peerStreams := make(map[string]int, len(peers))
peerHA := make(map[string]int, len(peers))
for _, asa := range cc.streams {
for _, sa := range asa {
isHA := len(sa.Group.Peers) > 1
for _, peer := range sa.Group.Peers {
peerStreams[peer]++
if isHA {
peerHA[peer]++
}
}
}
}

maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets

// An error is a result of multiple individual placement decisions.
Expand Down Expand Up @@ -5697,7 +5715,6 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}

var available uint64
var ha int
if ni.stats != nil {
switch cfg.Storage {
case MemoryStorage:
Expand All @@ -5717,7 +5734,6 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
available = uint64(ni.cfg.MaxStore) - used
}
}
ha = ni.stats.HAAssets
}

// Otherwise check if we have enough room if maxBytes set.
Expand Down Expand Up @@ -5749,7 +5765,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}
// Add to our list of potential nodes.
nodes = append(nodes, wn{p.ID, available, ha})
nodes = append(nodes, wn{p.ID, available, peerHA[p.ID], peerStreams[p.ID]})
}

// If we could not select enough peers, fail.
Expand All @@ -5761,10 +5777,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
return nil, &err
}
// Sort based on available from most to least.
sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail })

// If we are placing a replicated stream, let's sort based in haAssets, as that is more important to balance.
// Sort based on available from most to least, breaking ties by number of total streams assigned to the peer.
sort.Slice(nodes, func(i, j int) bool {
if nodes[i].avail == nodes[j].avail {
return nodes[i].ns < nodes[j].ns
}
return nodes[i].avail > nodes[j].avail
})
// If we are placing a replicated stream, let's sort based on HAAssets, as that is more important to balance.
if cfg.Replicas > 1 {
sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].ha < nodes[j].ha })
}
Expand Down
27 changes: 27 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6519,3 +6519,30 @@ Setup:
}
}
}

// https://github.com/nats-io/nats-server/issues/5071
func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 5)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

for i := 1; i <= 10; i++ {
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", i),
Subjects: []string{fmt.Sprintf("foo.%d.*", i)},
Replicas: 3,
})
require_NoError(t, err)
}

// 10 streams, 3 replicas div 5 servers.
expectedStreams := 10 * 3 / 5
for _, s := range c.servers {
jsz, err := s.Jsz(nil)
require_NoError(t, err)
require_Equal(t, jsz.Streams, expectedStreams)
}
}