Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add support for reverse scans #8755

Merged
merged 34 commits into from Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
50aa069
feat: add more expressive range api
igorbernstein2 Oct 16, 2023
1950510
feat(bigtable): Add support for reverse scans
igorbernstein2 Oct 16, 2023
8b3b0de
resolve feature flag conflict after merge
igorbernstein2 Oct 16, 2023
1c0bbfb
add a new constructor for chunkReader to avoid line noise
igorbernstein2 Oct 16, 2023
c756b76
Merge branch 'main' of github.com:googleapis/google-cloud-go into rev…
igorbernstein2 Oct 16, 2023
4e0262a
remove orphaned code
igorbernstein2 Oct 16, 2023
db6ca11
fix typo
igorbernstein2 Oct 16, 2023
10ec6c6
adding a first test
brandtnewton Oct 19, 2023
2ce4698
extended RowRange to express bound types
brandtnewton Oct 20, 2023
8198ec1
adding unit tests
brandtnewton Oct 20, 2023
5676ef7
correcting backwards compatability behavior
brandtnewton Oct 20, 2023
d6ba568
adding test proxy support for reverse scan
brandtnewton Oct 23, 2023
61ce90d
Merge branch 'main' into rev-scans
brandtnewton Oct 23, 2023
850c122
added todo reminder
brandtnewton Oct 23, 2023
f186570
all unit tests pass
brandtnewton Oct 26, 2023
8dfd483
updated naming
brandtnewton Oct 26, 2023
ea66be7
fix retries on reverse scan
brandtnewton Oct 26, 2023
f5b1974
more tests
brandtnewton Oct 27, 2023
f1261d2
Merge branch 'main' into rev-scans
brandtnewton Oct 30, 2023
915aec2
exposing client messages to test proxy
brandtnewton Oct 30, 2023
c363a42
exposing client messages to test proxy
brandtnewton Oct 30, 2023
31b1d42
fixing vet errors
brandtnewton Oct 30, 2023
dc2726c
Merge branch 'rev-scans' of github.com:brandtnewton/google-cloud-go i…
brandtnewton Oct 30, 2023
4961652
minor style tweaks
brandtnewton Oct 30, 2023
b21aef2
changing error message to be consistent with java and cpp
brandtnewton Nov 1, 2023
cc1cfe6
cleaning up code and adding tests
brandtnewton Nov 2, 2023
b65cde0
simplify RowRange valid logic
brandtnewton Nov 2, 2023
6d9c1b1
rolling back test proxy changes
brandtnewton Nov 2, 2023
93e4be2
rolling back test proxy
brandtnewton Nov 2, 2023
efbc3c7
changed default bound type for better backwards compatability
brandtnewton Nov 2, 2023
9a5a438
Merge branch 'main' into rev-scans
brandtnewton Nov 3, 2023
b2d6ebb
Merge branch 'main' into rev-scans
brandtnewton Nov 7, 2023
10346c2
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
3b9c84f
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
154 changes: 144 additions & 10 deletions bigtable/bigtable.go
Expand Up @@ -207,7 +207,14 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
if err != nil {
return err
}
cr := newChunkReader()

var cr *chunkReader
if req.Reversed {
cr = newReverseChunkReader()
} else {
cr = newChunkReader()
}

for {
res, err := stream.Recv()
if err == io.EOF {
Expand Down Expand Up @@ -306,6 +313,10 @@ type RowSet interface {
// given row key or any row key lexicographically less than it.
retainRowsAfter(lastRowKey string) RowSet

// retainRowsBefore returns a new RowSet that does not include the
// given row key or any row key lexicographically greater than it.
retainRowsBefore(lastRowKey string) RowSet

// Valid reports whether this set can cover at least one row.
valid() bool
}
Expand All @@ -331,17 +342,39 @@ func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
return retryKeys
}

func (r RowList) retainRowsBefore(lastRowKey string) RowSet {
var retryKeys RowList
for _, key := range r {
if key < lastRowKey {
retryKeys = append(retryKeys, key)
}
}
return retryKeys
}

func (r RowList) valid() bool {
return len(r) > 0
}

type BoundType int64
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved

const (
undefined BoundType = iota
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
unbounded
closed
open
)

// A RowRange is a half-open interval [Start, Limit) encompassing
// all the rows with keys at least as large as Start, and less than Limit.
// (Bigtable string comparison is the same as Go's.)
// A RowRange can be unbounded, encompassing all keys at least as large as Start.
type RowRange struct {
start string
limit string
start string
startBound BoundType
// todo rename this field. it's module private so we won't break anybody
limit string
endBound BoundType
}

// NewRange returns the new RowRange [begin, end).
Expand All @@ -352,9 +385,44 @@ func NewRange(begin, end string) RowRange {
}
}

func NewClosedOpenRange(begin, limit string) RowRange {
return RowRange{
start: begin,
limit: limit,
startBound: closed,
endBound: open,
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
}
}
func NewOpenClosedRange(start, limit string) RowRange {
return RowRange{
start: start,
limit: limit,
startBound: open,
endBound: closed,
}
}

func NewOpenRange(start, limit string) RowRange {
return RowRange{
start: start,
limit: limit,
startBound: open,
endBound: open,
}
}

func NewClosedRange(begin, end string) RowRange {
return RowRange{
start: begin,
limit: end,
startBound: closed,
endBound: closed,
}
}

// Unbounded tests whether a RowRange is unbounded.
func (r RowRange) Unbounded() bool {
return r.limit == ""
return r.limit == "" || r.endBound == unbounded
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
}

// Contains says whether the RowRange contains the key.
Expand All @@ -372,13 +440,27 @@ func (r RowRange) String() string {
}

func (r RowRange) proto() *btpb.RowSet {
rr := &btpb.RowRange{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
var rr = btpb.RowRange{}

// empty strings are NOT considered unbounded for backwards compatibility
if r.startBound == unbounded {
// leave unbounded
} else if r.startBound == closed || r.startBound == undefined {
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}
} else {
rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)}
}
if !r.Unbounded() {

// empty strings are considered unbounded for backwards compatibility
if r.endBound == unbounded || r.limit == "" {
// leave unbounded
} else if r.endBound == closed {
rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.limit)}
} else {
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
}
return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}

return &btpb.RowSet{RowRanges: []*btpb.RowRange{&rr}}
}

func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
Expand All @@ -393,6 +475,14 @@ func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
return NewRange(start, r.limit)
}

func (r RowRange) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" || (r.limit != "" && r.limit <= lastRowKey) {
return r
}

return NewRange(r.start, lastRowKey)
}

func (r RowRange) valid() bool {
return r.Unbounded() || r.start < r.limit
}
Expand Down Expand Up @@ -424,6 +514,21 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
return ranges
}

func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" {
return r
}
// Return a list of any range that has not yet been completely processed
var ranges RowRangeList
for _, rr := range r {
retained := rr.retainRowsBefore(lastRowKey)
if retained.valid() {
ranges = append(ranges, retained.(RowRange))
}
}
return ranges
}

func (r RowRangeList) valid() bool {
for _, rr := range r {
if rr.valid() {
Expand All @@ -450,8 +555,18 @@ func PrefixRange(prefix string) RowRange {
// large as start.
func InfiniteRange(start string) RowRange {
return RowRange{
start: start,
limit: "",
start: start,
endBound: unbounded,
}
}

// InfiniteReverseRange returns the RowRange consisting of all keys at least as
// large as start.
func InfiniteReverseRange(limit string) RowRange {
return RowRange{
limit: limit,
startBound: unbounded,
endBound: closed,
}
}
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -577,6 +692,25 @@ func (wrs withFullReadStats) set(settings *readSettings) {
settings.fullReadStatsFunc = wrs.f
}

// ReverseScan returns a RadOption that will reverse the results of a Scan.
// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are
// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
// will remain unchanged from the ordering forward scans. This is particularly useful to get the
// last N records before a key:
//
// table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool {
// return true
// }, bigtable.ReverseScan(), bigtable.LimitRows(10))
func ReverseScan() ReadOption {
return reverseScan{}
}

type reverseScan struct{}

func (rs reverseScan) set(settings *readSettings) {
settings.req.Reversed = true
}

// mutationsAreRetryable returns true if all mutations are idempotent
// and therefore retryable. A mutation is idempotent iff all cell timestamps
// have an explicit timestamp set and do not rely on the timestamp being set on the server.
Expand Down