Skip to content

Commit

Permalink
slack-vitess-r15.0.5: backport v20 vtgate syscall/topo-call optim…
Browse files Browse the repository at this point in the history
…izations (#227)

* Load `--grpc_auth_static_client_creds` file once (vitessio#15030)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Filter by keyspace earlier in `tabletgateway`s `WaitForTablets(...)` (vitessio#15347)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Limit concurrent creation of healthcheck gRPC connections (vitessio#15053)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* go mod tidy

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Update MySQL apt package and GPG signature (vitessio#14785)

Signed-off-by: Matt Lord <mattalord@gmail.com>

* remove unrelated workflow files from v20

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

---------

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
timvaillancourt and mattlord committed Mar 15, 2024
1 parent 687889c commit 355006a
Show file tree
Hide file tree
Showing 14 changed files with 323 additions and 113 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -119,6 +119,7 @@ require (
github.com/bndr/gotabulate v1.1.2
github.com/hashicorp/go-version v1.6.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/sync v0.3.0
golang.org/x/tools/cmd/cover v0.1.0-deprecated
modernc.org/sqlite v1.20.3
)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -917,6 +917,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctld.txt
Expand Up @@ -55,6 +55,7 @@ Usage of vtctld:
--grpc_server_initial_window_size int gRPC server initial window size
--grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s)
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
-h, --help display usage and exit
--jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done
--keep_logs duration keep logs for this long (using ctime) (zero to keep forever)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Expand Up @@ -59,6 +59,7 @@ Usage of vtgate:
--grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s)
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
-h, --help display usage and exit
Expand Down
32 changes: 9 additions & 23 deletions go/vt/discovery/healthcheck.go
Expand Up @@ -45,6 +45,7 @@ import (
"time"

"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -88,6 +89,9 @@ var (
// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency = 32

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond
)
Expand Down Expand Up @@ -166,6 +170,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -282,6 +287,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -316,6 +323,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -700,30 +708,8 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets
return hc.waitForTablets(ctx, targets, true)
}

// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces
func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target {
filteredTargets := make([]*query.Target, 0)

// Keep them all if there are no keyspaces to watch
if len(KeyspacesToWatch) == 0 {
return append(filteredTargets, targets...)
}

// Let's remove from the target shards that are not in the keyspaceToWatch list.
for _, target := range targets {
for _, keyspaceToWatch := range keyspaces {
if target.Keyspace == keyspaceToWatch {
filteredTargets = append(filteredTargets, target)
}
}
}
return filteredTargets
}

// waitForTablets is the internal method that polls for tablets.
func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error {
targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets)

for {
// We nil targets as we find them.
allPresent := true
Expand Down Expand Up @@ -800,7 +786,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(), nil
return thc.Connection(hc), nil
}

// getAliasByCell should only be called while holding hc.mu
Expand Down
21 changes: 0 additions & 21 deletions go/vt/discovery/healthcheck_test.go
Expand Up @@ -645,27 +645,6 @@ func TestWaitForAllServingTablets(t *testing.T) {

err = hc.WaitForAllServingTablets(ctx, targets)
assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace")

targets = []*querypb.Target{

{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
{
Keyspace: "newkeyspace",
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

KeyspacesToWatch = []string{tablet.Keyspace}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered")

KeyspacesToWatch = []string{}
}

// TestRemoveTablet tests the behavior when a tablet goes away.
Expand Down
37 changes: 31 additions & 6 deletions go/vt/discovery/tablet_health_check.go
Expand Up @@ -19,6 +19,7 @@ package discovery
import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"
Expand All @@ -34,12 +35,16 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
var withDialerContextOnce sync.Once

// tabletHealthCheck maintains the health status of a tablet. A map of this
// structure is maintained in HealthCheck.
type tabletHealthCheck struct {
Expand Down Expand Up @@ -123,8 +128,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
}

// stream streams healthcheck responses to callback.
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection()
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(hc)
if conn == nil {
// This signals the caller to retry
return nil
Expand All @@ -137,14 +142,34 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S
return err
}

func (thc *tabletHealthCheck) Connection() queryservice.QueryService {
func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService {
thc.connMu.Lock()
defer thc.connMu.Unlock()
return thc.connectionLocked()
return thc.connectionLocked(hc)
}

func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer hc.healthCheckDialSem.Release(1)
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)
}
}

func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService {
func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService {
if thc.Conn == nil {
withDialerContextOnce.Do(func() {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
})
})
conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true))
if err != nil {
thc.LastError = err
Expand Down Expand Up @@ -273,7 +298,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}()

// Read stream health responses.
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
// We received a message. Reset the back-off.
retryDelay = hc.retryDelay
// Don't block on send to avoid deadlocks.
Expand Down
6 changes: 6 additions & 0 deletions go/vt/grpcclient/client.go
Expand Up @@ -21,6 +21,7 @@ package grpcclient
import (
"context"
"crypto/tls"
"sync"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -39,6 +40,7 @@ import (
)

var (
grpcDialOptionsMu sync.Mutex
keepaliveTime = 10 * time.Second
keepaliveTimeout = 10 * time.Second
initialConnWindowSize int
Expand Down Expand Up @@ -88,6 +90,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error)

// RegisterGRPCDialOptions registers an implementation of AuthServer.
func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) {
grpcDialOptionsMu.Lock()
defer grpcDialOptionsMu.Unlock()
grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc)
}

Expand Down Expand Up @@ -137,12 +141,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ...

newopts = append(newopts, opts...)
var err error
grpcDialOptionsMu.Lock()
for _, grpcDialOptionInitializer := range grpcDialOptions {
newopts, err = grpcDialOptionInitializer(newopts)
if err != nil {
log.Fatalf("There was an error initializing client grpc.DialOption: %v", err)
}
}
grpcDialOptionsMu.Unlock()

newopts = append(newopts, interceptors()...)

Expand Down
87 changes: 75 additions & 12 deletions go/vt/grpcclient/client_auth_static.go
Expand Up @@ -20,24 +20,35 @@ import (
"context"
"encoding/json"
"os"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"vitess.io/vitess/go/vt/servenv"
)

var (
credsFile string // registered as --grpc_auth_static_client_creds in RegisterFlags
// StaticAuthClientCreds implements client interface to be able to WithPerRPCCredentials
_ credentials.PerRPCCredentials = (*StaticAuthClientCreds)(nil)

clientCreds *StaticAuthClientCreds
clientCredsCancel context.CancelFunc
clientCredsErr error
clientCredsMu sync.Mutex
clientCredsSigChan chan os.Signal
)

// StaticAuthClientCreds holder for client credentials
// StaticAuthClientCreds holder for client credentials.
type StaticAuthClientCreds struct {
Username string
Password string
}

// GetRequestMetadata gets the request metadata as a map from StaticAuthClientCreds
// GetRequestMetadata gets the request metadata as a map from StaticAuthClientCreds.
func (c *StaticAuthClientCreds) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{
"username": c.Username,
Expand All @@ -47,30 +58,82 @@ func (c *StaticAuthClientCreds) GetRequestMetadata(context.Context, ...string) (

// RequireTransportSecurity indicates whether the credentials requires transport security.
// Given that people can use this with or without TLS, at the moment we are not enforcing
// transport security
// transport security.
func (c *StaticAuthClientCreds) RequireTransportSecurity() bool {
return false
}

// AppendStaticAuth optionally appends static auth credentials if provided.
func AppendStaticAuth(opts []grpc.DialOption) ([]grpc.DialOption, error) {
if credsFile == "" {
return opts, nil
}
data, err := os.ReadFile(credsFile)
creds, err := getStaticAuthCreds()
if err != nil {
return nil, err
}
clientCreds := &StaticAuthClientCreds{}
err = json.Unmarshal(data, clientCreds)
if creds != nil {
grpcCreds := grpc.WithPerRPCCredentials(creds)
opts = append(opts, grpcCreds)
}
return opts, nil
}

// ResetStaticAuth resets the static auth credentials.
func ResetStaticAuth() {
clientCredsMu.Lock()
defer clientCredsMu.Unlock()
if clientCredsCancel != nil {
clientCredsCancel()
clientCredsCancel = nil
}
clientCreds = nil
clientCredsErr = nil
}

// getStaticAuthCreds returns the static auth creds and error.
func getStaticAuthCreds() (*StaticAuthClientCreds, error) {
clientCredsMu.Lock()
defer clientCredsMu.Unlock()
if credsFile != "" && clientCreds == nil {
var ctx context.Context
ctx, clientCredsCancel = context.WithCancel(context.Background())
go handleClientCredsSignals(ctx)
clientCreds, clientCredsErr = loadStaticAuthCredsFromFile(credsFile)
}
return clientCreds, clientCredsErr
}

// handleClientCredsSignals handles signals to reload client creds.
func handleClientCredsSignals(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-clientCredsSigChan:
if newCreds, err := loadStaticAuthCredsFromFile(credsFile); err == nil {
clientCredsMu.Lock()
clientCreds = newCreds
clientCredsErr = err
clientCredsMu.Unlock()
}
}
}
}

// loadStaticAuthCredsFromFile loads static auth credentials from a file.
func loadStaticAuthCredsFromFile(path string) (*StaticAuthClientCreds, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
creds := grpc.WithPerRPCCredentials(clientCreds)
opts = append(opts, creds)
return opts, nil
creds := &StaticAuthClientCreds{}
err = json.Unmarshal(data, creds)
return creds, err
}

func init() {
servenv.OnInit(func() {
clientCredsSigChan = make(chan os.Signal, 1)
signal.Notify(clientCredsSigChan, syscall.SIGHUP)
_, _ = getStaticAuthCreds() // preload static auth credentials
})
RegisterGRPCDialOptions(AppendStaticAuth)
}

0 comments on commit 355006a

Please sign in to comment.