Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick PRs into v2.10.10 release branch #5030

Merged
merged 8 commits into from
Feb 2, 2024
30 changes: 25 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4089,6 +4089,12 @@ func getHeader(key string, hdr []byte) []byte {
return value
}

// For bytes.HasPrefix below.
var (
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
)

// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) {
Expand All @@ -4110,8 +4116,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
subj := bytesToString(c.pa.subject)
if strings.HasPrefix(subj, jsRequestNextPre) || strings.HasPrefix(subj, jsDirectGetPre) {
if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) {
checkJS = true
}
}
Expand Down Expand Up @@ -4790,7 +4795,11 @@ func (c *client) processPingTimer() {

var sendPing bool

pingInterval := c.srv.getOpts().PingInterval
opts := c.srv.getOpts()
pingInterval := opts.PingInterval
if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
pingInterval = opts.Cluster.PingInterval
}
pingInterval = adjustPingInterval(c.kind, pingInterval)
now := time.Now()
needRTT := c.rtt == 0 || now.Sub(c.rttStart) > DEFAULT_RTT_MEASUREMENT_INTERVAL
Expand All @@ -4810,7 +4819,11 @@ func (c *client) processPingTimer() {

if sendPing {
// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
maxPingsOut := opts.MaxPingsOut
if c.kind == ROUTER && opts.Cluster.MaxPingsOut > 0 {
maxPingsOut = opts.Cluster.MaxPingsOut
}
if c.ping.out+1 > maxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))
c.mu.Unlock()
Expand Down Expand Up @@ -4847,7 +4860,11 @@ func (c *client) setPingTimer() {
if c.srv == nil {
return
}
d := c.srv.getOpts().PingInterval
opts := c.srv.getOpts()
d := opts.PingInterval
if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
d = opts.Cluster.PingInterval
}
d = adjustPingInterval(c.kind, d)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
Expand Down Expand Up @@ -5740,6 +5757,9 @@ func (c *client) setFirstPingTimer() {
opts := s.getOpts()
d := opts.PingInterval

if c.kind == ROUTER && opts.Cluster.PingInterval > 0 {
d = opts.Cluster.PingInterval
}
if !opts.DisableShortFirstPing {
if c.kind != CLIENT {
if d > firstPingInterval {
Expand Down
38 changes: 32 additions & 6 deletions server/config_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter pool size",
name: "wrong type for cluster pool size",
config: `
cluster {
port: -1
Expand All @@ -1592,7 +1592,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter accounts",
name: "wrong type for cluster accounts",
config: `
cluster {
port: -1
Expand All @@ -1604,7 +1604,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter compression",
name: "wrong type for cluster compression",
config: `
cluster {
port: -1
Expand All @@ -1616,7 +1616,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 6,
},
{
name: "wrong type for cluter compression mode",
name: "wrong type for cluster compression mode",
config: `
cluster {
port: -1
Expand All @@ -1630,7 +1630,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 7,
},
{
name: "wrong type for cluter compression rtt thresholds",
name: "wrong type for cluster compression rtt thresholds",
config: `
cluster {
port: -1
Expand All @@ -1645,7 +1645,7 @@ func TestConfigCheck(t *testing.T) {
errorPos: 7,
},
{
name: "invalid durations for cluter compression rtt thresholds",
name: "invalid durations for cluster compression rtt thresholds",
config: `
cluster {
port: -1
Expand All @@ -1659,6 +1659,32 @@ func TestConfigCheck(t *testing.T) {
errorLine: 6,
errorPos: 7,
},
{
name: "invalid durations for cluster ping interval",
config: `
cluster {
port: -1
ping_interval: -1
ping_max: 6
}
`,
err: fmt.Errorf(`invalid use of field "ping_interval": ping_interval should be converted to a duration`),
errorLine: 4,
errorPos: 6,
},
{
name: "invalid durations for cluster ping interval",
config: `
cluster {
port: -1
ping_interval: '2m'
ping_max: 6
}
`,
warningErr: fmt.Errorf(`Cluster 'ping_interval' will reset to 30s which is the max for routes`),
errorLine: 4,
errorPos: 6,
},
{
name: "wrong type for leafnodes compression",
config: `
Expand Down
2 changes: 2 additions & 0 deletions server/configs/reload/reload.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ cluster {
listen: 127.0.0.1:-1
name: "abc"
no_advertise: true # enable on reload
ping_interval: '20s'
ping_max: 8
}
8 changes: 3 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4846,11 +4846,6 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
}

func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// Wait for disk I/O slots to become available. This prevents us from
// running away with system resources.
<-dios
defer func() { dios <- struct{}{} }()

alg := mb.fs.fcfg.Compression
mb.mu.Lock()
defer mb.mu.Unlock()
Expand All @@ -4864,7 +4859,10 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// header, in which case we do nothing.
// 2. The block will be uncompressed, in which case we will compress it
// and then write it back out to disk, reencrypting if necessary.
<-dios
origBuf, err := os.ReadFile(origFN)
dios <- struct{}{}

if err != nil {
return fmt.Errorf("failed to read original block from disk: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,8 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error {

if !a.serviceImportExists(jsAllAPI) {
// Capture si so we can turn on implicit sharing with JetStream layer.
si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, _EMPTY_, nil)
// Make sure to set "to" otherwise will incur performance slow down.
si, err := a.addServiceImport(s.SystemAccount(), jsAllAPI, jsAllAPI, nil)
if err != nil {
return fmt.Errorf("Error setting up jetstream service imports for account: %v", err)
}
Expand Down
18 changes: 6 additions & 12 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ func (js *jetStream) monitorCluster() {

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader {
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
Expand Down Expand Up @@ -4392,7 +4392,6 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
recovering := ca.recovering
js.mu.RUnlock()

stopped := false
var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
var err error
var acc *Account
Expand All @@ -4402,25 +4401,20 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if mset, _ := acc.lookupStream(ca.Stream); mset != nil {
if o := mset.lookupConsumer(ca.Name); o != nil {
err = o.stopWithFlags(true, false, true, wasLeader)
stopped = true
}
}
} else if ca.Group != nil {
// We have a missing account, see if we can cleanup.
if sacc := s.SystemAccount(); sacc != nil {
os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name))
}
}

// Always delete the node if present.
if node != nil {
node.Delete()
}

// This is a stop gap cleanup in case
// 1) the account does not exist (and mset consumer couldn't be stopped) and/or
// 2) node was nil (and couldn't be deleted)
if !stopped || node == nil {
if sacc := s.SystemAccount(); sacc != nil {
os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name))
}
}

if !wasLeader || ca.Reply == _EMPTY_ {
if !(offline && isMetaLeader) {
return
Expand Down
9 changes: 9 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type ClusterOpts struct {
PoolSize int `json:"-"`
PinnedAccounts []string `json:"-"`
Compression CompressionOpts `json:"-"`
PingInterval time.Duration `json:"-"`
MaxPingsOut int `json:"-"`

// Not exported (used in tests)
resolver netResolver
Expand Down Expand Up @@ -1755,6 +1757,13 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err
*errors = append(*errors, err)
continue
}
case "ping_interval":
opts.Cluster.PingInterval = parseDuration("ping_interval", tk, mv, errors, warnings)
if opts.Cluster.PingInterval > routeMaxPingInterval {
*warnings = append(*warnings, &configErr{tk, fmt.Sprintf("Cluster 'ping_interval' will reset to %v which is the max for routes", routeMaxPingInterval)})
}
case "ping_max":
opts.Cluster.MaxPingsOut = int(mv.(int64))
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
4 changes: 4 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3953,6 +3953,10 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
n.resetElect(randCampaignTimeout())
}
}

// Term might have changed, make sure response has the most current
vresp.term = n.term

n.Unlock()

n.sendReply(vr.reply, vresp.encode())
Expand Down
12 changes: 5 additions & 7 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,13 @@ func TestNRGSimpleElection(t *testing.T) {
re := decodeVoteResponse(msg.Data)
require_True(t, re != nil)

// The new term hasn't started yet, so the vote responses
// should contain the term from before the election. It is
// possible that candidates are listening to this to work
// out if they are in previous terms.
require_Equal(t, re.term, vr.lastTerm)
require_Equal(t, re.term, startTerm)

// The vote should have been granted.
require_Equal(t, re.granted, true)

// The node granted the vote, therefore the term in the vote
// response should have advanced as well.
require_Equal(t, re.term, vr.term)
require_Equal(t, re.term, startTerm+1)
}

// Everyone in the group should have voted for our candidate
Expand Down
6 changes: 6 additions & 0 deletions server/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ func TestConfigReload(t *testing.T) {
if !updated.Cluster.NoAdvertise {
t.Fatal("Expected NoAdvertise to be true")
}
if updated.Cluster.PingInterval != 20*time.Second {
t.Fatalf("Cluster PingInterval is incorrect.\nexpected: 20s\ngot: %v", updated.Cluster.PingInterval)
}
if updated.Cluster.MaxPingsOut != 8 {
t.Fatalf("Cluster MaxPingsOut is incorrect.\nexpected: 6\ngot: %v", updated.Cluster.MaxPingsOut)
}
if updated.PidFile != "nats-server.pid" {
t.Fatalf("PidFile is incorrect.\nexpected: nats-server.pid\ngot: %s", updated.PidFile)
}
Expand Down
10 changes: 9 additions & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,15 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie
// the connection as stale based on the ping interval and max out values,
// but without actually sending pings.
if compressionConfigured {
c.ping.tmr = time.AfterFunc(opts.PingInterval*time.Duration(opts.MaxPingsOut+1), func() {
pingInterval := opts.PingInterval
pingMax := opts.MaxPingsOut
if opts.Cluster.PingInterval > 0 {
pingInterval = opts.Cluster.PingInterval
}
if opts.Cluster.MaxPingsOut > 0 {
pingMax = opts.MaxPingsOut
}
c.ping.tmr = time.AfterFunc(pingInterval*time.Duration(pingMax+1), func() {
c.mu.Lock()
c.Debugf("Stale Client Connection - Closing")
c.enqueueProto([]byte(fmt.Sprintf(errProto, "Stale Connection")))
Expand Down