Skip to content

Commit

Permalink
fix(bigquery): RowIterator.Schema not filled when using Storage Read …
Browse files Browse the repository at this point in the history
…API (#7671)
  • Loading branch information
alvarowolfx committed Jun 2, 2023
1 parent 0135b60 commit 31040e8
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
75 changes: 75 additions & 0 deletions bigquery/storage_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/iterator"
)

Expand Down Expand Up @@ -233,10 +234,22 @@ func TestIntegration_StorageReadQueryOrdering(t *testing.T) {
t.Fatal(err)
}

var firstValue S
err = it.Next(&firstValue)
if err != nil {
t.Fatal(err)
}

if cmp.Equal(firstValue, S{}) {
t.Fatalf("user defined struct was not filled with data")
}

total, err := countIteratorRows(it)
if err != nil {
t.Fatal(err)
}
total++ // as we read the first value separately

bqSession := it.arrowIterator.session.bqSession
if len(bqSession.Streams) == 0 {
t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams))
Expand All @@ -263,6 +276,56 @@ func TestIntegration_StorageReadQueryOrdering(t *testing.T) {
}
}

func TestIntegration_StorageReadQueryStruct(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
table := "`bigquery-public-data.samples.wikipedia`"
sql := fmt.Sprintf(`SELECT id, title, timestamp, comment FROM %s LIMIT 1000`, table)
q := storageOptimizedClient.Query(sql)
q.forceStorageAPI = true
q.DisableQueryCache = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
if !it.IsAccelerated() {
t.Fatal("expected query to use Storage API")
}

type S struct {
ID int64
Title string
Timestamp int64
Comment NullString
}

total := uint64(0)
for {
var dst S
err := it.Next(&dst)
if err == iterator.Done {
break
}
if err != nil {
t.Fatalf("failed to fetch via storage API: %v", err)
}
if cmp.Equal(dst, S{}) {
t.Fatalf("user defined struct was not filled with data")
}
total++
}

bqSession := it.arrowIterator.session.bqSession
if len(bqSession.Streams) == 0 {
t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
}
if total != it.TotalRows {
t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
}
}

func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand All @@ -287,10 +350,22 @@ func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
Forks NullInt64
}

var firstValue S
err = it.Next(&firstValue)
if err != nil {
t.Fatal(err)
}

if cmp.Equal(firstValue, S{}) {
t.Fatalf("user defined struct was not filled with data")
}

total, err := countIteratorRows(it)
if err != nil {
t.Fatal(err)
}
total++ // as we read the first value separately

bqSession := it.arrowIterator.session.bqSession
if len(bqSession.Streams) == 0 {
t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
Expand Down
5 changes: 4 additions & 1 deletion bigquery/storage_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered b
return nil, err
}
it.arrowIterator.schema = md.Schema
it.Schema = md.Schema
return it, nil
}

Expand Down Expand Up @@ -163,7 +164,9 @@ func nextFuncForStorageIterator(it *RowIterator) func() error {
if err != nil {
return err
}

if it.Schema == nil {
it.Schema = it.arrowIterator.schema
}
rows, err := arrowIt.decoder.decodeArrowRecords(record)
if err != nil {
return err
Expand Down

0 comments on commit 31040e8

Please sign in to comment.