Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): PG JSONB support #6874

Merged
merged 13 commits into from
Nov 3, 2022
115 changes: 115 additions & 0 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ var (
PRIMARY KEY(AccountId)
)`,
`CREATE INDEX AccountByNickname ON Accounts(Nickname)`,
`CREATE TABLE Types (
RowID BIGINT PRIMARY KEY,
String VARCHAR,
Bytes BYTEA,
Int64a BIGINT,
Bool BOOL,
Float64 DOUBLE PRECISION,
Numeric NUMERIC,
JSONB jsonb
)`,
}

singerDBStatements = []string{
Expand Down Expand Up @@ -3453,6 +3463,111 @@ func TestIntegration_PGNumeric(t *testing.T) {
}
}

func TestIntegration_PGJSONB(t *testing.T) {
onlyRunForPGTest(t)
skipEmulatorTest(t)
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTestForPG(ctx, t, DefaultSessionPoolConfig, singerDBPGStatements)
defer cleanup()

type Message struct {
Name string
Body string
Time int64
}
msg := Message{"Alice", "Hello", 1294706395881547000}
jsonStr := `{"Name":"Alice","Body":"Hello","Time":1294706395881547000}`
var unmarshalledJSONstruct interface{}
json.Unmarshal([]byte(jsonStr), &unmarshalledJSONstruct)

tests := []struct {
col string
val interface{}
want interface{}
}{
{col: "JSONB", val: PGJsonB{Value: msg, Valid: true}, want: PGJsonB{Value: unmarshalledJSONstruct, Valid: true}},
{col: "JSONB", val: PGJsonB{Value: msg, Valid: false}, want: PGJsonB{}},
}

// Write rows into table first using DML.
statements := make([]Statement, 0)
for i, test := range tests {
stmt := NewStatement(fmt.Sprintf("INSERT INTO Types (RowId, %s) VALUES ($1, $2)", test.col))
// Note: We are not setting the parameter type here to ensure that it
// can be automatically recognized when it is actually needed.
stmt.Params["p1"] = i
stmt.Params["p2"] = test.val
statements = append(statements, stmt)
}
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
rowCounts, err := tx.BatchUpdate(ctx, statements)
if err != nil {
return err
}
if len(rowCounts) != len(tests) {
return fmt.Errorf("rowCounts length mismatch\nGot: %v\nWant: %v", len(rowCounts), len(tests))
}
for i, c := range rowCounts {
if c != 1 {
return fmt.Errorf("row count mismatch for row %v:\nGot: %v\nWant: %v", i, c, 1)
}
}
return nil
})
if err != nil {
t.Fatalf("failed to insert values using DML: %v", err)
}
// Delete all the rows so we can insert them using mutations as well.
_, err = client.Apply(ctx, []*Mutation{Delete("Types", AllKeys())})
if err != nil {
t.Fatalf("failed to delete all rows: %v", err)
}

// Verify that we can insert the rows using mutations.
var muts []*Mutation
for i, test := range tests {
muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
}
if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}

for i, test := range tests {
row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
if err != nil {
t.Fatalf("Unable to fetch row %v: %v", i, err)
}
verifyDirectPathRemoteAddress(t)
// Create new instance of type of test.want.
want := test.want
if want == nil {
want = test.val
}
gotp := reflect.New(reflect.TypeOf(want))
if err := row.Column(0, gotp.Interface()); err != nil {
t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
continue
}
got := reflect.Indirect(gotp).Interface()

// One of the test cases is checking NaN handling. Given
// NaN!=NaN, we can't use reflect to test for it.
if isNaN(got) && isNaN(want) {
continue
}

// Check non-NaN cases.
if !testEqual(got, want) {
t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
continue
}
}

}

func readPGSingerTable(iter *RowIterator) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
Expand Down
4 changes: 4 additions & 0 deletions spanner/protoutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func jsonType() *sppb.Type {
return &sppb.Type{Code: sppb.TypeCode_JSON}
}

func pgJsonbType() *sppb.Type {
return &sppb.Type{Code: sppb.TypeCode_JSON, TypeAnnotation: sppb.TypeAnnotationCode_PG_JSONB}
}

func bytesProto(b []byte) *proto3.Value {
return &proto3.Value{Kind: &proto3.Value_StringValue{StringValue: base64.StdEncoding.EncodeToString(b)}}
}
Expand Down