Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(bigtable): more text proxy tune-up #6977

Merged
merged 8 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
141 changes: 88 additions & 53 deletions bigtable/internal/testproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"context"
"crypto/x509"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -346,6 +347,27 @@ func statusFromError(err error) *statpb.Status {
return st
}

// parseTableID extracts a table ID from a table name.
// For example, a table ID is in the format projects/<project>/instances/<instance>/tables/<tableID>
//
// Note that this function does not check all variants and edge cases. It assumes
// that the test suite used with the test proxy sends *generally* correct requests.
func parseTableID(tableName string) (tableID string, _ error) {
paths := strings.Split(tableName, "/")

if len(paths) < 6 {
return "", errors.New("table resource name does not have the correct format")
}

tableID = paths[len(paths)-1]
var err error
if tableID == "" {
err = errors.New("cannot read tableID from table name")
}

return tableID, err
}

// testClient contains a bigtable.Client object, cancel functions for the calls
// made using the client, an appProfileID (optionally), and a
// perOperationTimeout (optionally).
Expand Down Expand Up @@ -507,15 +529,15 @@ type goTestProxyServer struct {

// client retrieves a testClient from the clientIDs map. You must lock clientsLock before calling
// this method.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this comment into an error message?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You must lock clientsLock before calling this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. That makes sense, but is out-of-scope for this review. I'll consider your suggestion for future improvements.

func (s *goTestProxyServer) client(clientID string) (*testClient, bool) {
func (s *goTestProxyServer) client(clientID string) (*testClient, error) {
client, ok := s.clientIDs[clientID]
if !ok {
return nil, false
return nil, fmt.Errorf("client ID %s does not exist", clientID)
}
if !client.isOpen {
return nil, false
return nil, fmt.Errorf("client ID %s is closed to new requests", clientID)
}
return client, true
return client, nil
}

// CreateClient responds to the CreateClient RPC. This method adds a new client
Expand Down Expand Up @@ -571,10 +593,9 @@ func (s *goTestProxyServer) CloseClient(ctx context.Context, req *pb.CloseClient
s.clientsLock.Lock()
defer s.clientsLock.Unlock()

btc, exists := s.client(clientID)
if !exists {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
btc, err := s.client(clientID)
if err != nil {
return nil, err
}
btc.isOpen = false

Expand Down Expand Up @@ -608,16 +629,17 @@ func (s *goTestProxyServer) RemoveClient(ctx context.Context, req *pb.RemoveClie
// data for a single row in the Table.
func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest) (*pb.RowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
if err != nil {
return nil, err
}
s.clientsLock.RUnlock()

if !exists {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
tid, err := parseTableID(req.TableName)
if err != nil {
return nil, err
}

tName := req.TableName
t := btc.c.Open(tName)
t := btc.c.Open(tid)

res := &pb.RowResult{
Status: &statpb.Status{
Expand Down Expand Up @@ -652,13 +674,11 @@ func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest)
// data for a set of rows, a range of rows, or the entire table.
func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsRequest) (*pb.RowsResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("bad client ID: %v\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -669,7 +689,11 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques

}

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)

rowPbs := rrq.Rows
rs := rowSetFromProto(rowPbs)
Expand All @@ -687,7 +711,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques
var c int32
var rowsPb []*btpb.Row
lim := req.GetCancelAfterRows()
err := t.ReadRows(ctx, rs, func(r bigtable.Row) bool {
err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool {

c++
if c == lim {
Expand Down Expand Up @@ -728,12 +752,11 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques
// changes (or deletions) to a single row in a table.
func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequest) (*pb.MutateRowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -744,7 +767,11 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
mPbs := rrq.Mutations
m := mutationFromProto(mPbs)

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
row := rrq.RowKey

res := &pb.MutateRowResult{
Expand All @@ -756,7 +783,7 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
ctx, cancel := btc.timeout(ctx)
defer cancel()

err := t.Apply(ctx, string(row), m)
err = t.Apply(ctx, string(row), m)
if err != nil {
res.Status = statusFromError(err)
return res, nil
Expand All @@ -769,13 +796,11 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
// series of changes or deletions to multiple rows in a single call.
func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRowsRequest) (*pb.MutateRowsResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid client ID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -785,7 +810,11 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo
}

mrs := rrq.Entries
t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)

keys := make([]string, len(mrs))
muts := make([]*bigtable.Mutation, len(mrs))
Expand Down Expand Up @@ -840,13 +869,11 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo
// one mutation if a condition is true and another mutation if it is false.
func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.CheckAndMutateRowRequest) (*pb.CheckAndMutateRowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid ClientID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -873,7 +900,11 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
},
}

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
rowKey := string(rrq.RowKey)

var matched bool
Expand All @@ -882,7 +913,7 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
ctx, cancel := btc.timeout(ctx)
defer cancel()

err := t.Apply(ctx, rowKey, c, ao)
err = t.Apply(ctx, rowKey, c, ao)
if err != nil {
log.Printf("received error from Table.Apply: %v", err)
res.Status = statusFromError(err)
Expand All @@ -900,13 +931,11 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
// of the keys available in a table.
func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRowKeysRequest) (*pb.SampleRowKeysResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid client ID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand All @@ -924,7 +953,11 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow
ctx, cancel := btc.timeout(ctx)
defer cancel()

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
keys, err := t.SampleRowKeys(ctx)
if err != nil {
log.Printf("received error from Table.SampleRowKeys(): %v\n", err)
Expand All @@ -949,13 +982,11 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow
// applies a non-idempotent change to a row.
func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.ReadModifyWriteRowRequest) (*pb.RowResult, error) {
s.clientsLock.RLock()
btc, exists := s.client(req.ClientId)
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()

if !exists {
log.Printf("received invalid client ID: %s\n", req.ClientId)
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
if err != nil {
return nil, err
}

rrq := req.GetRequest()
Expand Down Expand Up @@ -984,7 +1015,11 @@ func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.Read
},
}

t := btc.c.Open(rrq.TableName)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
k := string(rrq.RowKey)

ctx, cancel := btc.timeout(ctx)
Expand Down
9 changes: 5 additions & 4 deletions bigtable/internal/testproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (

const (
buffer = 1024 * 1024
tableName = "table"
tableName = "projects/my-project/instances/my-instance/tables/table"
tableID = "table"
columnFamily = "cf"
testProxyClient = "testProxyClient"
testProxyAddress = "localhost:9990"
Expand Down Expand Up @@ -70,15 +71,15 @@ func populateTable(bts *bttest.Server) error {
}
defer adminClient.Close()

if err := adminClient.CreateTable(ctx, tableName); err != nil {
if err := adminClient.CreateTable(ctx, tableID); err != nil {
return fmt.Errorf("testproxy setup: can't create table: %v", err)
}

// Create column families (3 is an arbitrarily sufficient number)
count := 3
for i := 0; i < count; i++ {
cfName := fmt.Sprintf("%s%d", columnFamily, i)
if err := adminClient.CreateColumnFamily(ctx, tableName, cfName); err != nil {
if err := adminClient.CreateColumnFamily(ctx, tableID, cfName); err != nil {
return fmt.Errorf("testproxy setup: can't create column family: %s", cfName)
}
}
Expand All @@ -90,7 +91,7 @@ func populateTable(bts *bttest.Server) error {
}
defer dataClient.Close()

t := dataClient.Open(tableName)
t := dataClient.Open(tableID)

for fc := 0; fc < count; fc++ {
for cc := count; cc > 0; cc-- {
Expand Down