Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add support for reverse scans #8755

Merged
merged 34 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
50aa069
feat: add more expressive range api
igorbernstein2 Oct 16, 2023
1950510
feat(bigtable): Add support for reverse scans
igorbernstein2 Oct 16, 2023
8b3b0de
resolve feature flag conflict after merge
igorbernstein2 Oct 16, 2023
1c0bbfb
add a new constructor for chunkReader to avoid line noise
igorbernstein2 Oct 16, 2023
c756b76
Merge branch 'main' of github.com:googleapis/google-cloud-go into rev…
igorbernstein2 Oct 16, 2023
4e0262a
remove orphaned code
igorbernstein2 Oct 16, 2023
db6ca11
fix typo
igorbernstein2 Oct 16, 2023
10ec6c6
adding a first test
brandtnewton Oct 19, 2023
2ce4698
extended RowRange to express bound types
brandtnewton Oct 20, 2023
8198ec1
adding unit tests
brandtnewton Oct 20, 2023
5676ef7
correcting backwards compatability behavior
brandtnewton Oct 20, 2023
d6ba568
adding test proxy support for reverse scan
brandtnewton Oct 23, 2023
61ce90d
Merge branch 'main' into rev-scans
brandtnewton Oct 23, 2023
850c122
added todo reminder
brandtnewton Oct 23, 2023
f186570
all unit tests pass
brandtnewton Oct 26, 2023
8dfd483
updated naming
brandtnewton Oct 26, 2023
ea66be7
fix retries on reverse scan
brandtnewton Oct 26, 2023
f5b1974
more tests
brandtnewton Oct 27, 2023
f1261d2
Merge branch 'main' into rev-scans
brandtnewton Oct 30, 2023
915aec2
exposing client messages to test proxy
brandtnewton Oct 30, 2023
c363a42
exposing client messages to test proxy
brandtnewton Oct 30, 2023
31b1d42
fixing vet errors
brandtnewton Oct 30, 2023
dc2726c
Merge branch 'rev-scans' of github.com:brandtnewton/google-cloud-go i…
brandtnewton Oct 30, 2023
4961652
minor style tweaks
brandtnewton Oct 30, 2023
b21aef2
changing error message to be consistent with java and cpp
brandtnewton Nov 1, 2023
cc1cfe6
cleaning up code and adding tests
brandtnewton Nov 2, 2023
b65cde0
simplify RowRange valid logic
brandtnewton Nov 2, 2023
6d9c1b1
rolling back test proxy changes
brandtnewton Nov 2, 2023
93e4be2
rolling back test proxy
brandtnewton Nov 2, 2023
efbc3c7
changed default bound type for better backwards compatability
brandtnewton Nov 2, 2023
9a5a438
Merge branch 'main' into rev-scans
brandtnewton Nov 3, 2023
b2d6ebb
Merge branch 'main' into rev-scans
brandtnewton Nov 7, 2023
10346c2
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
3b9c84f
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
280 changes: 236 additions & 44 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,26 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
if err != nil {
return err
}
cr := newChunkReader()

var cr *chunkReader
if req.Reversed {
cr = newReverseChunkReader()
} else {
cr = newChunkReader()
}

for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Reset arg for next Invoke call.
arg = arg.retainRowsAfter(prevRowKey)
if req.Reversed {
arg = arg.retainRowsBefore(prevRowKey)
} else {
arg = arg.retainRowsAfter(prevRowKey)
}
attrMap["rowKey"] = prevRowKey
attrMap["error"] = err.Error()
attrMap["time_secs"] = time.Since(startTime).Seconds()
Expand Down Expand Up @@ -306,6 +317,10 @@ type RowSet interface {
// given row key or any row key lexicographically less than it.
retainRowsAfter(lastRowKey string) RowSet

// retainRowsBefore returns a new RowSet that does not include the
// given row key or any row key lexicographically greater than it.
retainRowsBefore(lastRowKey string) RowSet

// Valid reports whether this set can cover at least one row.
valid() bool
}
Expand All @@ -331,70 +346,230 @@ func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
return retryKeys
}

func (r RowList) retainRowsBefore(lastRowKey string) RowSet {
var retryKeys RowList
for _, key := range r {
if key < lastRowKey {
retryKeys = append(retryKeys, key)
}
}
return retryKeys
}

func (r RowList) valid() bool {
return len(r) > 0
}

// A RowRange is a half-open interval [Start, Limit) encompassing
// all the rows with keys at least as large as Start, and less than Limit.
// (Bigtable string comparison is the same as Go's.)
// A RowRange can be unbounded, encompassing all keys at least as large as Start.
type rangeBoundType int64

const (
rangeUnbounded rangeBoundType = iota
rangeOpen
rangeClosed
)

// A RowRange describes a range of rows between the start and end key. Start and
// end keys may be rangeOpen, rangeClosed or rangeUnbounded.
type RowRange struct {
start string
limit string
startBound rangeBoundType
start string
endBound rangeBoundType
end string
}

// NewRange returns the new RowRange [begin, end).
func NewRange(begin, end string) RowRange {
return createRowRange(rangeClosed, begin, rangeOpen, end)
}

// NewClosedOpenRange returns the RowRange consisting of all greater than or
// equal to the start and less than the end: [start, end).
func NewClosedOpenRange(start, end string) RowRange {
return createRowRange(rangeClosed, start, rangeOpen, end)
}

// NewOpenClosedRange returns the RowRange consisting of all keys greater than
// the start and less than or equal to the end: (start, end].
func NewOpenClosedRange(start, end string) RowRange {
return createRowRange(rangeOpen, start, rangeClosed, end)
}

// NewOpenRange returns the RowRange consisting of all keys greater than the
// start and less than the end: (start, end).
func NewOpenRange(start, end string) RowRange {
return createRowRange(rangeOpen, start, rangeOpen, end)
}

// NewClosedRange returns the RowRange consisting of all keys greater than or
// equal to the start and less than or equal to the end: [start, end].
func NewClosedRange(start, end string) RowRange {
return createRowRange(rangeClosed, start, rangeClosed, end)
}

// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
func PrefixRange(prefix string) RowRange {
end := prefixSuccessor(prefix)
return createRowRange(rangeClosed, prefix, rangeOpen, end)
}

// InfiniteRange returns the RowRange consisting of all keys at least as
// large as start: [start, ∞).
func InfiniteRange(start string) RowRange {
return createRowRange(rangeClosed, start, rangeUnbounded, "")
}

// InfiniteReverseRange returns the RowRange consisting of all keys less than or
// equal to the end: (∞, end].
func InfiniteReverseRange(end string) RowRange {
return createRowRange(rangeUnbounded, "", rangeClosed, end)
}

// createRowRange creates a new RowRange, normalizing start and end
// rangeBoundType to rangeUnbounded if they're empty strings because empty
// strings also represent unbounded keys
func createRowRange(startBound rangeBoundType, start string, endBound rangeBoundType, end string) RowRange {
// normalize start bound type
if start == "" {
startBound = rangeUnbounded
}
// normalize end bound type
if end == "" {
endBound = rangeUnbounded
}
return RowRange{
start: begin,
limit: end,
startBound: startBound,
start: start,
endBound: endBound,
end: end,
}
}

// Unbounded tests whether a RowRange is unbounded.
func (r RowRange) Unbounded() bool {
return r.limit == ""
return r.startBound == rangeUnbounded || r.endBound == rangeUnbounded
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

// Contains says whether the RowRange contains the key.
func (r RowRange) Contains(row string) bool {
return r.start <= row && (r.limit == "" || r.limit > row)
switch r.startBound {
case rangeOpen:
if r.start >= row {
return false
}
case rangeClosed:
if r.start > row {
return false
}
case rangeUnbounded:
}

switch r.endBound {
case rangeOpen:
if r.end <= row {
return false
}
case rangeClosed:
if r.end < row {
return false
}
case rangeUnbounded:
}

return true
}

// String provides a printable description of a RowRange.
func (r RowRange) String() string {
a := strconv.Quote(r.start)
if r.Unbounded() {
return fmt.Sprintf("[%s,∞)", a)
var startStr string
switch r.startBound {
case rangeOpen:
startStr = "(" + strconv.Quote(r.start)
case rangeClosed:
startStr = "[" + strconv.Quote(r.start)
case rangeUnbounded:
startStr = "(∞"
}
return fmt.Sprintf("[%s,%q)", a, r.limit)

var endStr string
switch r.endBound {
case rangeOpen:
endStr = r.end + ")"
case rangeClosed:
endStr = r.end + "]"
case rangeUnbounded:
endStr = "∞)"
}

return fmt.Sprintf("%s,%s", startStr, endStr)
}

func (r RowRange) proto() *btpb.RowSet {
rr := &btpb.RowRange{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
rr := &btpb.RowRange{}

switch r.startBound {
case rangeOpen:
rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)}
case rangeClosed:
rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}
case rangeUnbounded:
// leave unbounded
}
if !r.Unbounded() {
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}

switch r.endBound {
case rangeOpen:
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.end)}
case rangeClosed:
rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.end)}
case rangeUnbounded:
// leave unbounded
}

return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
}

func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
if lastRowKey == "" || lastRowKey < r.start {
return r
}
// Set the beginning of the range to the row after the last scanned.
start := lastRowKey + "\x00"
if r.Unbounded() {
return InfiniteRange(start)

return RowRange{
// Set the beginning of the range to the row after the last scanned.
startBound: rangeOpen,
start: lastRowKey,
endBound: r.endBound,
end: r.end,
}
}

func (r RowRange) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" || (r.endBound != rangeUnbounded && r.end < lastRowKey) {
return r
}

return RowRange{
startBound: r.startBound,
start: r.start,
endBound: rangeOpen,
end: lastRowKey,
}
return NewRange(start, r.limit)
}

func (r RowRange) valid() bool {
return r.Unbounded() || r.start < r.limit
// If either end is unbounded, then the range is always valid.
if r.Unbounded() {
return true
}

// If either end is an open interval, then the start must be strictly less
// than the end and since neither end is unbounded, we don't have to check
// for empty strings.
if r.startBound == rangeOpen || r.endBound == rangeOpen {
return r.start < r.end
}

// At this point both endpoints must be closed, which makes [a,a] a valid
// interval
return r.start <= r.end
}

// RowRangeList is a sequence of RowRanges representing the union of the ranges.
Expand Down Expand Up @@ -424,6 +599,21 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
return ranges
}

func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" {
return r
}
// Return a list of any range that has not yet been completely processed
var ranges RowRangeList
for _, rr := range r {
retained := rr.retainRowsBefore(lastRowKey)
if retained.valid() {
ranges = append(ranges, retained.(RowRange))
}
}
return ranges
}

func (r RowRangeList) valid() bool {
for _, rr := range r {
if rr.valid() {
Expand All @@ -438,23 +628,6 @@ func SingleRow(row string) RowSet {
return RowList{row}
}

// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
func PrefixRange(prefix string) RowRange {
return RowRange{
start: prefix,
limit: prefixSuccessor(prefix),
}
}

// InfiniteRange returns the RowRange consisting of all keys at least as
// large as start.
func InfiniteRange(start string) RowRange {
return RowRange{
start: start,
limit: "",
}
}

// prefixSuccessor returns the lexically smallest string greater than the
// prefix, if it exists, or "" otherwise. In either case, it is the string
// needed for the Limit of a RowRange.
Expand Down Expand Up @@ -557,7 +730,7 @@ type rowFilter struct{ f Filter }

func (rf rowFilter) set(settings *readSettings) { settings.req.Filter = rf.f.proto() }

// LimitRows returns a ReadOption that will limit the number of rows to be read.
// LimitRows returns a ReadOption that will end the number of rows to be read.
func LimitRows(limit int64) ReadOption { return limitRows{limit} }

type limitRows struct{ limit int64 }
Expand All @@ -577,6 +750,25 @@ func (wrs withFullReadStats) set(settings *readSettings) {
settings.fullReadStatsFunc = wrs.f
}

// ReverseScan returns a RadOption that will reverse the results of a Scan.
// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are
// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
// will remain unchanged from the ordering forward scans. This is particularly useful to get the
// last N records before a key:
//
// table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool {
// return true
// }, bigtable.ReverseScan(), bigtable.LimitRows(10))
func ReverseScan() ReadOption {
return reverseScan{}
}

type reverseScan struct{}

func (rs reverseScan) set(settings *readSettings) {
settings.req.Reversed = true
}

// mutationsAreRetryable returns true if all mutations are idempotent
// and therefore retryable. A mutation is idempotent iff all cell timestamps
// have an explicit timestamp set and do not rely on the timestamp being set on the server.
Expand Down