Skip to content

Commit

Permalink
Cherry-pick PRs into v2.10.10 release branch (#5030)
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Feb 2, 2024
2 parents 61f06c2 + 79960bb commit 7304c56
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 55 deletions.
30 changes: 25 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4089,6 +4089,12 @@ func getHeader(key string, hdr []byte) []byte {
return value
}

// For bytes.HasPrefix below.
var (
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
)

// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) {
Expand All @@ -4110,8 +4116,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
subj := bytesToString(c.pa.subject)
if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) {
if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) {
checkJS = true
}
}
Expand Down Expand Up @@ -4790,7 +4795,11 @@ func (c *client) processPingTimer() {

var sendPing bool

pingInterval := c.srv.getOpts().PingInterval
opts := c.srv.getOpts()
pingInterval := opts.PingInterval
if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
pingInterval = opts.Cluster.PingInterval
}
pingInterval = adjustPingInterval(c.kind, pingInterval)
now := time.Now()
needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL
Expand All @@ -4810,7 +4819,11 @@ func (c *client) processPingTimer() {

if sendPing {
// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
maxPingsOut := opts.MaxPingsOut
if c.kind == ROUTER && opts.Cluster.MaxPingsOut > 0 {
maxPingsOut = opts.Cluster.MaxPingsOut
}
if c.ping.out+1 > maxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))
c.mu.Unlock()
Expand Down Expand Up @@ -4847,7 +4860,11 @@ func (c *client) setPingTimer() {
if c.srv == nil {
return
}
d := c.srv.getOpts().PingInterval
opts := c.srv.getOpts()
d := opts.PingInterval
if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
d = opts.Cluster.PingInterval
}
d = adjustPingInterval(c.kind, d)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
Expand Down Expand Up @@ -5740,6 +5757,9 @@ func (c *client) setFirstPingTimer() {
opts := s.getOpts()
d := opts.PingInterval

if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
d = opts.Cluster.PingInterval
}
if !opts.DisableShortFirstPing {
if c.kind != CLIENT {
if d > firstPingInterval {
Expand Down
38 changes: 32 additions & 6 deletions server/config_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter pool size",
name: "wrong type for cluster pool size",
config: `
cluster {
port: -1
Expand All @@ -1592,7 +1592,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter accounts",
name: "wrong type for cluster accounts",
config: `
cluster {
port: -1
Expand All @@ -1604,7 +1604,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter compression",
name: "wrong type for cluster compression",
config: `
cluster {
port: -1
Expand All @@ -1616,7 +1616,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter compression mode",
name: "wrong type for cluster compression mode",
config: `
cluster {
port: -1
Expand All @@ -1630,7 +1630,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 7,
},
{
name: "wrong type for cluter compression rtt thresholds",
name: "wrong type for cluster compression rtt thresholds",
config: `
cluster {
port: -1
Expand All @@ -1645,7 +1645,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 7,
},
{
name: "invalid durations for cluter compression rtt thresholds",
name: "invalid durations for cluster compression rtt thresholds",
config: `
cluster {
port: -1
Expand All @@ -1659,6 +1659,32 @@ func TestConfigCheck(t *testing.T) {
errorLine: 6,
errorPos: 7,
},
{
name: "invalid durations for cluster ping interval",
config: `
cluster {
port: -1
ping_interval: -1
ping_max: 6
}
`,
err: fmt.Errorf(`invalid use of field "ping_interval": ping_interval should be converted to a duration`),
errorLine: 4,
errorPos: 6,
},
{
name: "invalid durations for cluster ping interval",
config: `
cluster {
port: -1
ping_interval: '2m'
ping_max: 6
}
`,
warningErr: fmt.Errorf(`Cluster 'ping_interval' will reset to 30s which is the max for routes`),
errorLine: 4,
errorPos: 6,
},
{
name: "wrong type for leafnodes compression",
config: `
Expand Down
2 changes: 2 additions & 0 deletions server/configs/reload/reload.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ cluster {
listen: 127.0.0.1:-1
name: "abc"
no_advertise: true # enable on reload
ping_interval: '20s'
ping_max: 8
}
8 changes: 3 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4846,11 +4846,6 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
}

func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// Wait for disk I/O slots to become available. This prevents us from
// running away with system resources.
<-dios
defer func() { dios <- struct{}{} }()

alg := mb.fs.fcfg.Compression
mb.mu.Lock()
defer mb.mu.Unlock()
Expand All @@ -4864,7 +4859,10 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// header, in which case we do nothing.
// 2. The block will be uncompressed, in which case we will compress it
// and then write it back out to disk, reencrypting if necessary.
<-dios
origBuf, err := os.ReadFile(origFN)
dios <- struct{}{}

if err != nil {
return fmt.Errorf("failed to read original block from disk: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,8 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error {

if !a.serviceImportExists(jsAllAPI) {
// Capture si so we can turn on implicit sharing with JetStream layer.
si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, _EMPTY_, nil)
// Make sure to set "to" otherwise will incur performance slow down.
si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, jsAllAPI, nil)
if err != nil {
return fmt.Errorf("Error setting up jetstream service imports for account: %v", err)
}
Expand Down
18 changes: 6 additions & 12 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ func (js *jetStream) monitorCluster() {

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader {
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
Expand Down Expand Up @@ -4392,7 +4392,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
recovering := ca.recovering
js.mu.RUnlock()

stopped := false
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
var err error
var acc *Account
Expand All @@ -4402,25 +4401,20 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if mset, _ := acc.lookupStream(ca.Stream); mset != nil {
if o := mset.lookupConsumer(ca.Name); o != nil {
err = o.stopWithFlags(true, false, true, wasLeader)
stopped = true
}
}
} else if ca.Group != nil {
// We have a missing account, see if we can cleanup.
if sacc := s.SystemAccount(); sacc != nil {
os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name))
}
}

// Always delete the node if present.
if node != nil {
node.Delete()
}

// This is a stop gap cleanup in case
// 1) the account does not exist (and mset consumer couldn't be stopped) and/or
// 2) node was nil (and couldn't be deleted)
if !stopped || node == nil {
if sacc := s.SystemAccount(); sacc != nil {
os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name))
}
}

if !wasLeader || ca.Reply == _EMPTY_ {
if !(offline && isMetaLeader) {
return
Expand Down
9 changes: 9 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type ClusterOpts struct {
PoolSize int `json:"-"`
PinnedAccounts []string `json:"-"`
Compression CompressionOpts `json:"-"`
PingInterval time.Duration `json:"-"`
MaxPingsOut int `json:"-"`

// Not exported (used in tests)
resolver netResolver
Expand Down Expand Up @@ -1755,6 +1757,13 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err
*errors = append(*errors, err)
continue
}
case "ping_interval":
opts.Cluster.PingInterval = parseDuration("ping_interval", tk, mv, errors, warnings)
if opts.Cluster.PingInterval > routeMaxPingInterval {
*warnings = append(*warnings, &configErr{tk, fmt.Sprintf("Cluster 'ping_interval' will reset to %v which is the max for routes", routeMaxPingInterval)})
}
case "ping_max":
opts.Cluster.MaxPingsOut = int(mv.(int64))
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
4 changes: 4 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3953,6 +3953,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
n.resetElect(randCampaignTimeout())
}
}

// Term might have changed, make sure response has the most current
vresp.term = n.term

n.Unlock()

n.sendReply(vr.reply, vresp.encode())
Expand Down
12 changes: 5 additions & 7 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,13 @@ func TestNRGSimpleElection(t *testing.T) {
re := decodeVoteResponse(msg.Data)
require_True(t, re != nil)

// The new term hasn't started yet, so the vote responses
// should contain the term from before the election. It is
// possible that candidates are listening to this to work
// out if they are in previous terms.
require_Equal(t, re.term, vr.lastTerm)
require_Equal(t, re.term, startTerm)

// The vote should have been granted.
require_Equal(t, re.granted, true)

// The node granted the vote, therefore the term in the vote
// response should have advanced as well.
require_Equal(t, re.term, vr.term)
require_Equal(t, re.term, startTerm+1)
}

// Everyone in the group should have voted for our candidate
Expand Down
6 changes: 6 additions & 0 deletions server/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ func TestConfigReload(t *testing.T) {
if !updated.Cluster.NoAdvertise {
t.Fatal("Expected NoAdvertise to be true")
}
if updated.Cluster.PingInterval != 20*time.Second {
t.Fatalf("Cluster PingInterval is incorrect.\nexpected: 20s\ngot: %v", updated.Cluster.PingInterval)
}
if updated.Cluster.MaxPingsOut != 8 {
t.Fatalf("Cluster MaxPingsOut is incorrect.\nexpected: 6\ngot: %v", updated.Cluster.MaxPingsOut)
}
if updated.PidFile != "nats-server.pid" {
t.Fatalf("PidFile is incorrect.\nexpected: nats-server.pid\ngot: %s", updated.PidFile)
}
Expand Down
10 changes: 9 additions & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,15 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie
// the connection as stale based on the ping interval and max out values,
// but without actually sending pings.
if compressionConfigured {
c.ping.tmr = time.AfterFunc(opts.PingInterval*time.Duration(opts.MaxPingsOut+1), func() {
pingInterval := opts.PingInterval
pingMax := opts.MaxPingsOut
if opts.Cluster.PingInterval > 0 {
pingInterval = opts.Cluster.PingInterval
}
if opts.Cluster.MaxPingsOut > 0 {
pingMax = opts.MaxPingsOut
}
c.ping.tmr = time.AfterFunc(pingInterval*time.Duration(pingMax+1), func() {
c.mu.Lock()
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))
Expand Down

0 comments on commit 7304c56

Please sign in to comment.