Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add support for reverse scans #8755

Merged
merged 34 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
50aa069
feat: add more expressive range api
igorbernstein2 Oct 16, 2023
1950510
feat(bigtable): Add support for reverse scans
igorbernstein2 Oct 16, 2023
8b3b0de
resolve feature flag conflict after merge
igorbernstein2 Oct 16, 2023
1c0bbfb
add a new constructor for chunkReader to avoid line noise
igorbernstein2 Oct 16, 2023
c756b76
Merge branch 'main' of github.com:googleapis/google-cloud-go into rev…
igorbernstein2 Oct 16, 2023
4e0262a
remove orphaned code
igorbernstein2 Oct 16, 2023
db6ca11
fix typo
igorbernstein2 Oct 16, 2023
10ec6c6
adding a first test
brandtnewton Oct 19, 2023
2ce4698
extended RowRange to express bound types
brandtnewton Oct 20, 2023
8198ec1
adding unit tests
brandtnewton Oct 20, 2023
5676ef7
correcting backwards compatability behavior
brandtnewton Oct 20, 2023
d6ba568
adding test proxy support for reverse scan
brandtnewton Oct 23, 2023
61ce90d
Merge branch 'main' into rev-scans
brandtnewton Oct 23, 2023
850c122
added todo reminder
brandtnewton Oct 23, 2023
f186570
all unit tests pass
brandtnewton Oct 26, 2023
8dfd483
updated naming
brandtnewton Oct 26, 2023
ea66be7
fix retries on reverse scan
brandtnewton Oct 26, 2023
f5b1974
more tests
brandtnewton Oct 27, 2023
f1261d2
Merge branch 'main' into rev-scans
brandtnewton Oct 30, 2023
915aec2
exposing client messages to test proxy
brandtnewton Oct 30, 2023
c363a42
exposing client messages to test proxy
brandtnewton Oct 30, 2023
31b1d42
fixing vet errors
brandtnewton Oct 30, 2023
dc2726c
Merge branch 'rev-scans' of github.com:brandtnewton/google-cloud-go i…
brandtnewton Oct 30, 2023
4961652
minor style tweaks
brandtnewton Oct 30, 2023
b21aef2
changing error message to be consistent with java and cpp
brandtnewton Nov 1, 2023
cc1cfe6
cleaning up code and adding tests
brandtnewton Nov 2, 2023
b65cde0
simplify RowRange valid logic
brandtnewton Nov 2, 2023
6d9c1b1
rolling back test proxy changes
brandtnewton Nov 2, 2023
93e4be2
rolling back test proxy
brandtnewton Nov 2, 2023
efbc3c7
changed default bound type for better backwards compatability
brandtnewton Nov 2, 2023
9a5a438
Merge branch 'main' into rev-scans
brandtnewton Nov 3, 2023
b2d6ebb
Merge branch 'main' into rev-scans
brandtnewton Nov 7, 2023
10346c2
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
3b9c84f
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
273 changes: 241 additions & 32 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,26 @@
if err != nil {
return err
}
cr := newChunkReader()

var cr *chunkReader
if req.Reversed {
cr = newReverseChunkReader()
} else {
cr = newChunkReader()
}

for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Reset arg for next Invoke call.
arg = arg.retainRowsAfter(prevRowKey)
if req.Reversed {
arg = arg.retainRowsBefore(prevRowKey)
} else {
arg = arg.retainRowsAfter(prevRowKey)
}
attrMap["rowKey"] = prevRowKey
attrMap["error"] = err.Error()
attrMap["time_secs"] = time.Since(startTime).Seconds()
Expand Down Expand Up @@ -306,6 +317,10 @@
// given row key or any row key lexicographically less than it.
retainRowsAfter(lastRowKey string) RowSet

// retainRowsBefore returns a new RowSet that does not include the
// given row key or any row key lexicographically greater than it.
retainRowsBefore(lastRowKey string) RowSet

// Valid reports whether this set can cover at least one row.
valid() bool
}
Expand All @@ -331,70 +346,206 @@
return retryKeys
}

func (r RowList) retainRowsBefore(lastRowKey string) RowSet {
var retryKeys RowList
for _, key := range r {
if key < lastRowKey {
retryKeys = append(retryKeys, key)
}
}
return retryKeys
}

func (r RowList) valid() bool {
return len(r) > 0
}

// A RowRange is a half-open interval [Start, Limit) encompassing
// all the rows with keys at least as large as Start, and less than Limit.
// (Bigtable string comparison is the same as Go's.)
// A RowRange can be unbounded, encompassing all keys at least as large as Start.
type rangeBoundType int64

const (
rangeOpen rangeBoundType = iota
rangeClosed
rangeUnbounded
)

// A RowRange describes a range of rows between the start and end key. Start and
// end keys may be rangeOpen, rangeClosed or rangeUnbounded.
type RowRange struct {
start string
limit string
startBound rangeBoundType
start string
endBound rangeBoundType
end string
}

// NewRange returns the new RowRange [begin, end).
func NewRange(begin, end string) RowRange {
return createRowRange(rangeClosed, begin, rangeOpen, end)
}

func NewClosedOpenRange(begin, end string) RowRange {

Check failure on line 385 in bigtable/bigtable.go

View workflow job for this annotation

GitHub Actions / vet

exported function NewClosedOpenRange should have comment or be unexported
return createRowRange(rangeClosed, begin, rangeOpen, end)
}
func NewOpenClosedRange(start, end string) RowRange {

Check failure on line 388 in bigtable/bigtable.go

View workflow job for this annotation

GitHub Actions / vet

exported function NewOpenClosedRange should have comment or be unexported
return createRowRange(rangeOpen, start, rangeClosed, end)
}

func NewOpenRange(start, end string) RowRange {

Check failure on line 392 in bigtable/bigtable.go

View workflow job for this annotation

GitHub Actions / vet

exported function NewOpenRange should have comment or be unexported
return createRowRange(rangeOpen, start, rangeOpen, end)
}

func NewClosedRange(start, end string) RowRange {

Check failure on line 396 in bigtable/bigtable.go

View workflow job for this annotation

GitHub Actions / vet

exported function NewClosedRange should have comment or be unexported
return createRowRange(rangeClosed, start, rangeClosed, end)
}

func createRowRange(startBound rangeBoundType, start string, endBound rangeBoundType, end string) RowRange {
if start == "" {
startBound = rangeUnbounded
}
if end == "" {
endBound = rangeUnbounded
}
return RowRange{
start: begin,
limit: end,
startBound: startBound,
start: start,
endBound: endBound,
end: end,
}
}

// Unbounded tests whether a RowRange is unbounded.
func (r RowRange) Unbounded() bool {
return r.limit == ""
return r.startBound == rangeUnbounded || r.endBound == rangeUnbounded
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

// Contains says whether the RowRange contains the key.
func (r RowRange) Contains(row string) bool {
return r.start <= row && (r.limit == "" || r.limit > row)
contains := true

switch r.startBound {
case rangeOpen:
contains = contains && r.start < row
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
break
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
case rangeClosed:
contains = contains && r.start <= row
break
case rangeUnbounded:
break
}

switch r.endBound {
case rangeOpen:
contains = contains && r.end > row
break
case rangeClosed:
contains = contains && r.end >= row
break
case rangeUnbounded:
break
}

return contains
}

// String provides a printable description of a RowRange.
func (r RowRange) String() string {
a := strconv.Quote(r.start)
if r.Unbounded() {
return fmt.Sprintf("[%s,∞)", a)
var startStr string
switch r.startBound {
case rangeOpen:
startStr = "(" + strconv.Quote(r.start)
break
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
case rangeClosed:
startStr = "[" + strconv.Quote(r.start)
break
case rangeUnbounded:
startStr = "(∞"
break
}
return fmt.Sprintf("[%s,%q)", a, r.limit)

var endStr string
switch r.endBound {
case rangeOpen:
endStr = r.end + ")"
break
case rangeClosed:
endStr = r.end + "]"
break
case rangeUnbounded:
endStr = "∞)"
break
}

return fmt.Sprintf("%s,%s", startStr, endStr)
}

func (r RowRange) proto() *btpb.RowSet {
rr := &btpb.RowRange{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
var rr = btpb.RowRange{}

switch r.startBound {
case rangeOpen:
rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)}
break
case rangeClosed:
rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}
break
case rangeUnbounded:
// leave unbounded
break
}
if !r.Unbounded() {
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}

switch r.endBound {
case rangeOpen:
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.end)}
break
case rangeClosed:
rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.end)}
break
case rangeUnbounded:
// leave unbounded
break
}
return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}

return &btpb.RowSet{RowRanges: []*btpb.RowRange{&rr}}
}

func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
if lastRowKey == "" || lastRowKey < r.start {
return r
}
// Set the beginning of the range to the row after the last scanned.
start := lastRowKey + "\x00"
if r.Unbounded() {
return InfiniteRange(start)

return RowRange{
// Set the beginning of the range to the row after the last scanned.
startBound: rangeOpen,
start: lastRowKey,
endBound: r.endBound,
end: r.end,
}
}

func (r RowRange) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" || (r.endBound != rangeUnbounded && r.end <= lastRowKey) {
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
return r
}

return RowRange{
startBound: r.startBound,
start: r.start,
endBound: rangeOpen,
end: lastRowKey,
}
return NewRange(start, r.limit)
}

func (r RowRange) valid() bool {
return r.Unbounded() || r.start < r.limit
if r.Unbounded() {
return true
}

if r.startBound == rangeOpen || r.endBound == rangeOpen {
return r.start < r.end
} else if r.startBound == rangeClosed {
return r.start <= r.end
} else {
return true
}
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
}

// RowRangeList is a sequence of RowRanges representing the union of the ranges.
Expand Down Expand Up @@ -424,6 +575,21 @@
return ranges
}

func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" {
return r
}
// Return a list of any range that has not yet been completely processed
var ranges RowRangeList
for _, rr := range r {
retained := rr.retainRowsBefore(lastRowKey)
if retained.valid() {
ranges = append(ranges, retained.(RowRange))
}
}
return ranges
}

func (r RowRangeList) valid() bool {
for _, rr := range r {
if rr.valid() {
Expand All @@ -440,18 +606,42 @@

// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
func PrefixRange(prefix string) RowRange {
end := prefixSuccessor(prefix)
return RowRange{
start: prefix,
limit: prefixSuccessor(prefix),
startBound: rangeClosed,
start: prefix,
endBound: validateBound(end, rangeOpen),
end: end,
}
}

// InfiniteRange returns the RowRange consisting of all keys at least as
// large as start.
func InfiniteRange(start string) RowRange {
return RowRange{
start: start,
limit: "",
startBound: validateBound(start, rangeClosed),
start: start,
endBound: rangeUnbounded,
end: "",
}
}

// InfiniteReverseRange returns the RowRange consisting of all keys less than or
// equal to the end.
func InfiniteReverseRange(end string) RowRange {
return RowRange{
startBound: rangeUnbounded,
start: "",
endBound: validateBound(end, rangeClosed),
end: end,
}
}

func validateBound(bound string, defaultBoundType rangeBoundType) rangeBoundType {
if bound == "" {
return rangeUnbounded
} else {

Check failure on line 643 in bigtable/bigtable.go

View workflow job for this annotation

GitHub Actions / vet

if block ends with a return statement, so drop this else and outdent its block
return defaultBoundType
}
}

Expand Down Expand Up @@ -557,7 +747,7 @@

func (rf rowFilter) set(settings *readSettings) { settings.req.Filter = rf.f.proto() }

// LimitRows returns a ReadOption that will limit the number of rows to be read.
// LimitRows returns a ReadOption that will end the number of rows to be read.
func LimitRows(limit int64) ReadOption { return limitRows{limit} }

type limitRows struct{ limit int64 }
Expand All @@ -577,6 +767,25 @@
settings.fullReadStatsFunc = wrs.f
}

// ReverseScan returns a RadOption that will reverse the results of a Scan.
// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are
// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
// will remain unchanged from the ordering forward scans. This is particularly useful to get the
// last N records before a key:
//
// table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool {
// return true
// }, bigtable.ReverseScan(), bigtable.LimitRows(10))
func ReverseScan() ReadOption {
return reverseScan{}
}

type reverseScan struct{}

func (rs reverseScan) set(settings *readSettings) {
settings.req.Reversed = true
}

// mutationsAreRetryable returns true if all mutations are idempotent
// and therefore retryable. A mutation is idempotent iff all cell timestamps
// have an explicit timestamp set and do not rely on the timestamp being set on the server.
Expand Down