Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alts: Read max number of concurrent ALTS handshakes from environment variable. #6267

Merged
merged 30 commits into from Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1852f83
Read max number of concurrent ALTS handshakes from environment variable.
matthewstevenson88 May 10, 2023
6e70e9b
Refactor to use new envconfig file.
matthewstevenson88 May 10, 2023
e3f3dcc
Remove impossible if condition in acquire().
matthewstevenson88 May 10, 2023
f8d609a
Use weighted semaphore.
matthewstevenson88 May 17, 2023
6f61a5a
Add e2e test for concurrent ALTS handshakes.
matthewstevenson88 May 17, 2023
6f2edc6
Merge branch 'grpc:master' into alts-read-envvar
matthewstevenson88 May 17, 2023
e41e933
Merge branch 'master' of https://github.com/matthewstevenson88/grpc-g…
matthewstevenson88 May 26, 2023
e53683d
Separate into client and server semaphores.
matthewstevenson88 May 30, 2023
93e546c
Merge branch 'master' of https://github.com/matthewstevenson88/grpc-g…
matthewstevenson88 May 30, 2023
97177fa
Use TryAcquire instead of Acquire.
matthewstevenson88 May 31, 2023
ad5e053
Attempt to fix go.sum error.
matthewstevenson88 May 31, 2023
a151bb3
Run go mod tidy compat=1.17.
matthewstevenson88 May 31, 2023
b486c31
Update go.mod for examples subdirectory.
matthewstevenson88 May 31, 2023
e8933fd
Run go mod tidy -compat=1.17 on examples subdirectory.
matthewstevenson88 May 31, 2023
803c907
Update go.mod in subdirectories.
matthewstevenson88 May 31, 2023
cbb7ac2
Update go.mod in security/advancedtls/examples.
matthewstevenson88 May 31, 2023
ac93ee8
Missed another go.mod update.
matthewstevenson88 May 31, 2023
c1216c0
Do not upgrade glog because it requires Golang 1.19.
matthewstevenson88 May 31, 2023
d98424f
Fix glog version in examples/go.sum.
matthewstevenson88 May 31, 2023
9d5d1dc
More glog cleanup.
matthewstevenson88 May 31, 2023
ff48d09
Fix glog issue in gcp/observability/go.sum.
matthewstevenson88 May 31, 2023
75b0dc0
Move ALTS env var into envconfig.go.
matthewstevenson88 Jun 7, 2023
caa6ad8
Merge branch 'master' of https://github.com/matthewstevenson88/grpc-g…
matthewstevenson88 Jun 7, 2023
0fec934
Fix go.mod files.
matthewstevenson88 Jun 7, 2023
0ab0b7d
Revert go.sum files.
matthewstevenson88 Jun 7, 2023
f247acb
Merge branch 'master' of https://github.com/matthewstevenson88/grpc-g…
matthewstevenson88 Jun 7, 2023
d046f3a
Revert interop/observability/go.mod change.
matthewstevenson88 Jun 7, 2023
ab17f8d
Run go mod tidy -compat=1.17 on examples/.
matthewstevenson88 Jun 7, 2023
f75369c
Run gofmt.
matthewstevenson88 Jun 7, 2023
8c5bc34
Add comment describing test init function.
matthewstevenson88 Jun 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 65 additions & 18 deletions credentials/alts/alts_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/alts/internal/handshaker"
"google.golang.org/grpc/credentials/alts/internal/handshaker/service"
altsgrpc "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
altspb "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
Expand Down Expand Up @@ -325,26 +326,49 @@ func (s) TestFullHandshake(t *testing.T) {
defer stopServer()

// Ping the server, authenticating with ALTS.
clientCreds := NewClientCreds(&ClientOptions{HandshakerServiceAddress: handshakerAddress})
conn, err := grpc.Dial(serverAddress, grpc.WithTransportCredentials(clientCreds))
if err != nil {
t.Fatalf("grpc.Dial(%v) failed: %v", serverAddress, err)
establishAltsConnection(t, handshakerAddress, serverAddress)

// Close open connections to the fake handshaker service.
if err := service.CloseForTesting(); err != nil {
t.Errorf("service.CloseForTesting() failed: %v", err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestLongTimeout)
defer cancel()
c := testgrpc.NewTestServiceClient(conn)
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
_, err = c.UnaryCall(ctx, &testpb.SimpleRequest{})
if err == nil {
break
}
if code := status.Code(err); code == codes.Unavailable {
// The server is not ready yet. Try again.
continue
}
t.Fatalf("c.UnaryCall() failed: %v", err)
}

// TestConcurrentHandshakes performs a several, concurrent ALTS handshakes
// between a test client and server, where both client and server offload to a
// local, fake handshaker service.
func (s) TestConcurrentHandshakes(t *testing.T) {
// The vmOnGCP global variable MUST be reset to true after the client
// or server credentials have been created, but before the ALTS
// handshake begins. If vmOnGCP is not reset and this test is run
// anywhere except for a GCP VM, then the ALTS handshake will
// immediately fail.
once.Do(func() {})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just go into an init()? Then it doesn't need to be repeated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, done.

vmOnGCP = true

// Set the max number of concurrent handshakes to 3, so that we can
// test the handshaker behavior when handshakes are queued by
// performing more than 3 concurrent handshakes (specifically, 10).
handshaker.ResetConcurrentHandshakeSemaphoreForTesting(3)

// Start the fake handshaker service and the server.
var wait sync.WaitGroup
defer wait.Wait()
stopHandshaker, handshakerAddress := startFakeHandshakerService(t, &wait)
defer stopHandshaker()
stopServer, serverAddress := startServer(t, handshakerAddress, &wait)
defer stopServer()

// Ping the server, authenticating with ALTS.
var waitForConnections sync.WaitGroup
for i := 0; i < 10; i++ {
waitForConnections.Add(1)
go func() {
establishAltsConnection(t, handshakerAddress, serverAddress)
waitForConnections.Done()
}()
}
waitForConnections.Wait()

// Close open connections to the fake handshaker service.
if err := service.CloseForTesting(); err != nil {
Expand All @@ -366,6 +390,29 @@ func versions(minMajor, minMinor, maxMajor, maxMinor uint32) *altspb.RpcProtocol
}
}

func establishAltsConnection(t *testing.T, handshakerAddress, serverAddress string) {
clientCreds := NewClientCreds(&ClientOptions{HandshakerServiceAddress: handshakerAddress})
conn, err := grpc.Dial(serverAddress, grpc.WithTransportCredentials(clientCreds))
if err != nil {
t.Fatalf("grpc.Dial(%v) failed: %v", serverAddress, err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestLongTimeout)
defer cancel()
c := testgrpc.NewTestServiceClient(conn)
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
_, err = c.UnaryCall(ctx, &testpb.SimpleRequest{})
if err == nil {
break
}
if code := status.Code(err); code == codes.Unavailable {
// The server is not ready yet. Try again.
continue
}
t.Fatalf("c.UnaryCall() failed: %v", err)
}
}

func startFakeHandshakerService(t *testing.T, wait *sync.WaitGroup) (stop func(), address string) {
listener, err := testutils.LocalTCPListener()
if err != nil {
Expand Down
57 changes: 16 additions & 41 deletions credentials/alts/internal/handshaker/handshaker.go
Expand Up @@ -25,8 +25,8 @@ import (
"fmt"
"io"
"net"
"sync"

"golang.org/x/sync/semaphore"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -35,15 +35,13 @@ import (
"google.golang.org/grpc/credentials/alts/internal/conn"
altsgrpc "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
altspb "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
"google.golang.org/grpc/internal/envconfig"
)

const (
// The maximum byte size of receive frames.
frameLimit = 64 * 1024 // 64 KB
rekeyRecordProtocolName = "ALTSRP_GCM_AES128_REKEY"
// maxPendingHandshakes represents the maximum number of concurrent
// handshakes.
maxPendingHandshakes = 100
)

var (
Expand All @@ -59,9 +57,9 @@ var (
return conn.NewAES128GCMRekey(s, keyData)
},
}
// control number of concurrent created (but not closed) handshakers.
mu sync.Mutex
concurrentHandshakes = int64(0)
// control number of concurrent created (but not closed) handshakes.
clientHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes))
serverHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes))
// errDropped occurs when maxPendingHandshakes is reached.
errDropped = errors.New("maximum number of concurrent ALTS handshakes is reached")
// errOutOfBound occurs when the handshake service returns a consumed
Expand All @@ -77,30 +75,6 @@ func init() {
}
}

func acquire() bool {
mu.Lock()
// If we need n to be configurable, we can pass it as an argument.
n := int64(1)
success := maxPendingHandshakes-concurrentHandshakes >= n
if success {
concurrentHandshakes += n
}
mu.Unlock()
return success
}

func release() {
mu.Lock()
// If we need n to be configurable, we can pass it as an argument.
n := int64(1)
concurrentHandshakes -= n
if concurrentHandshakes < 0 {
mu.Unlock()
panic("bad release")
}
mu.Unlock()
}

// ClientHandshakerOptions contains the client handshaker options that can
// provided by the caller.
type ClientHandshakerOptions struct {
Expand Down Expand Up @@ -134,10 +108,6 @@ func DefaultServerHandshakerOptions() *ServerHandshakerOptions {
return &ServerHandshakerOptions{}
}

// TODO: add support for future local and remote endpoint in both client options
// and server options (server options struct does not exist now. When
// caller can provide endpoints, it should be created.

// altsHandshaker is used to complete an ALTS handshake between client and
// server. This handshaker talks to the ALTS handshaker service in the metadata
// server.
Expand Down Expand Up @@ -185,10 +155,10 @@ func NewServerHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn,
// ClientHandshake starts and completes a client ALTS handshake for GCP. Once
// done, ClientHandshake returns a secure connection.
func (h *altsHandshaker) ClientHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) {
if !acquire() {
if !clientHandshakes.TryAcquire(1) {
return nil, nil, errDropped
}
defer release()
defer clientHandshakes.Release(1)

if h.side != core.ClientSide {
return nil, nil, errors.New("only handshakers created using NewClientHandshaker can perform a client handshaker")
Expand Down Expand Up @@ -238,10 +208,10 @@ func (h *altsHandshaker) ClientHandshake(ctx context.Context) (net.Conn, credent
// ServerHandshake starts and completes a server ALTS handshake for GCP. Once
// done, ServerHandshake returns a secure connection.
func (h *altsHandshaker) ServerHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) {
if !acquire() {
if !serverHandshakes.TryAcquire(1) {
return nil, nil, errDropped
}
defer release()
defer serverHandshakes.Release(1)

if h.side != core.ServerSide {
return nil, nil, errors.New("only handshakers created using NewServerHandshaker can perform a server handshaker")
Expand All @@ -264,8 +234,6 @@ func (h *altsHandshaker) ServerHandshake(ctx context.Context) (net.Conn, credent
}

// Prepare server parameters.
// TODO: currently only ALTS parameters are provided. Might need to use
// more options in the future.
params := make(map[int32]*altspb.ServerHandshakeParameters)
params[int32(altspb.HandshakeProtocol_ALTS)] = &altspb.ServerHandshakeParameters{
RecordProtocols: recordProtocols,
Expand Down Expand Up @@ -391,3 +359,10 @@ func (h *altsHandshaker) Close() {
h.stream.CloseSend()
}
}

// ResetConcurrentHandshakeSemaphoreForTesting resets the handshake semaphores
// to allow numberOfAllowedHandshakes concurrent handshakes each.
func ResetConcurrentHandshakeSemaphoreForTesting(numberOfAllowedHandshakes int64) {
clientHandshakes = semaphore.NewWeighted(numberOfAllowedHandshakes)
serverHandshakes = semaphore.NewWeighted(numberOfAllowedHandshakes)
}
13 changes: 7 additions & 6 deletions credentials/alts/internal/handshaker/handshaker_test.go
Expand Up @@ -31,6 +31,7 @@ import (
core "google.golang.org/grpc/credentials/alts/internal"
altspb "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
"google.golang.org/grpc/credentials/alts/internal/testutil"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
)

Expand Down Expand Up @@ -134,7 +135,7 @@ func (s) TestClientHandshake(t *testing.T) {
numberOfHandshakes int
}{
{0 * time.Millisecond, 1},
{100 * time.Millisecond, 10 * maxPendingHandshakes},
{100 * time.Millisecond, 10 * int(envconfig.ALTSMaxConcurrentHandshakes)},
} {
errc := make(chan error)
stat.Reset()
Expand Down Expand Up @@ -182,8 +183,8 @@ func (s) TestClientHandshake(t *testing.T) {
}

// Ensure that there are no concurrent calls more than the limit.
if stat.MaxConcurrentCalls > maxPendingHandshakes {
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, maxPendingHandshakes)
if stat.MaxConcurrentCalls > int(envconfig.ALTSMaxConcurrentHandshakes) {
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, envconfig.ALTSMaxConcurrentHandshakes)
}
}
}
Expand All @@ -194,7 +195,7 @@ func (s) TestServerHandshake(t *testing.T) {
numberOfHandshakes int
}{
{0 * time.Millisecond, 1},
{100 * time.Millisecond, 10 * maxPendingHandshakes},
{100 * time.Millisecond, 10 * int(envconfig.ALTSMaxConcurrentHandshakes)},
} {
errc := make(chan error)
stat.Reset()
Expand Down Expand Up @@ -239,8 +240,8 @@ func (s) TestServerHandshake(t *testing.T) {
}

// Ensure that there are no concurrent calls more than the limit.
if stat.MaxConcurrentCalls > maxPendingHandshakes {
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, maxPendingHandshakes)
if stat.MaxConcurrentCalls > int(envconfig.ALTSMaxConcurrentHandshakes) {
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, envconfig.ALTSMaxConcurrentHandshakes)
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions examples/go.mod
Expand Up @@ -3,26 +3,29 @@ module google.golang.org/grpc/examples
go 1.17

require (
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195
github.com/cncf/xds/go v0.0.0-20230428030218-4003588d1b74
github.com/golang/protobuf v1.5.3
golang.org/x/oauth2 v0.7.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
google.golang.org/grpc v1.54.0
golang.org/x/oauth2 v0.8.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
)

require (
cloud.google.com/go/compute v1.19.1 // indirect
cloud.google.com/go/compute v1.20.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect
github.com/envoyproxy/go-control-plane v0.11.1-0.20230524094728-9239064ad72f // indirect
github.com/envoyproxy/protoc-gen-validate v0.10.1 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
github.com/envoyproxy/go-control-plane v0.11.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.1 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
)

replace google.golang.org/grpc => ../