Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add support for reverse scans #8755

Merged
merged 34 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
50aa069
feat: add more expressive range api
igorbernstein2 Oct 16, 2023
1950510
feat(bigtable): Add support for reverse scans
igorbernstein2 Oct 16, 2023
8b3b0de
resolve feature flag conflict after merge
igorbernstein2 Oct 16, 2023
1c0bbfb
add a new constructor for chunkReader to avoid line noise
igorbernstein2 Oct 16, 2023
c756b76
Merge branch 'main' of github.com:googleapis/google-cloud-go into rev…
igorbernstein2 Oct 16, 2023
4e0262a
remove orphaned code
igorbernstein2 Oct 16, 2023
db6ca11
fix typo
igorbernstein2 Oct 16, 2023
10ec6c6
adding a first test
brandtnewton Oct 19, 2023
2ce4698
extended RowRange to express bound types
brandtnewton Oct 20, 2023
8198ec1
adding unit tests
brandtnewton Oct 20, 2023
5676ef7
correcting backwards compatability behavior
brandtnewton Oct 20, 2023
d6ba568
adding test proxy support for reverse scan
brandtnewton Oct 23, 2023
61ce90d
Merge branch 'main' into rev-scans
brandtnewton Oct 23, 2023
850c122
added todo reminder
brandtnewton Oct 23, 2023
f186570
all unit tests pass
brandtnewton Oct 26, 2023
8dfd483
updated naming
brandtnewton Oct 26, 2023
ea66be7
fix retries on reverse scan
brandtnewton Oct 26, 2023
f5b1974
more tests
brandtnewton Oct 27, 2023
f1261d2
Merge branch 'main' into rev-scans
brandtnewton Oct 30, 2023
915aec2
exposing client messages to test proxy
brandtnewton Oct 30, 2023
c363a42
exposing client messages to test proxy
brandtnewton Oct 30, 2023
31b1d42
fixing vet errors
brandtnewton Oct 30, 2023
dc2726c
Merge branch 'rev-scans' of github.com:brandtnewton/google-cloud-go i…
brandtnewton Oct 30, 2023
4961652
minor style tweaks
brandtnewton Oct 30, 2023
b21aef2
changing error message to be consistent with java and cpp
brandtnewton Nov 1, 2023
cc1cfe6
cleaning up code and adding tests
brandtnewton Nov 2, 2023
b65cde0
simplify RowRange valid logic
brandtnewton Nov 2, 2023
6d9c1b1
rolling back test proxy changes
brandtnewton Nov 2, 2023
93e4be2
rolling back test proxy
brandtnewton Nov 2, 2023
efbc3c7
changed default bound type for better backwards compatability
brandtnewton Nov 2, 2023
9a5a438
Merge branch 'main' into rev-scans
brandtnewton Nov 3, 2023
b2d6ebb
Merge branch 'main' into rev-scans
brandtnewton Nov 7, 2023
10346c2
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
3b9c84f
Merge branch 'main' into rev-scans
igorbernstein2 Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
299 changes: 258 additions & 41 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,26 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
if err != nil {
return err
}
cr := newChunkReader()

var cr *chunkReader
if req.Reversed {
cr = newReverseChunkReader()
} else {
cr = newChunkReader()
}

for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Reset arg for next Invoke call.
arg = arg.retainRowsAfter(prevRowKey)
if req.Reversed {
arg = arg.retainRowsBefore(prevRowKey)
} else {
arg = arg.retainRowsAfter(prevRowKey)
}
attrMap["rowKey"] = prevRowKey
attrMap["error"] = err.Error()
attrMap["time_secs"] = time.Since(startTime).Seconds()
Expand Down Expand Up @@ -306,6 +317,10 @@ type RowSet interface {
// given row key or any row key lexicographically less than it.
retainRowsAfter(lastRowKey string) RowSet

// retainRowsBefore returns a new RowSet that does not include the
// given row key or any row key lexicographically greater than it.
retainRowsBefore(lastRowKey string) RowSet

// Valid reports whether this set can cover at least one row.
valid() bool
}
Expand All @@ -331,70 +346,248 @@ func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
return retryKeys
}

func (r RowList) retainRowsBefore(lastRowKey string) RowSet {
var retryKeys RowList
for _, key := range r {
if key < lastRowKey {
retryKeys = append(retryKeys, key)
}
}
return retryKeys
}

func (r RowList) valid() bool {
return len(r) > 0
}

// A RowRange is a half-open interval [Start, Limit) encompassing
// all the rows with keys at least as large as Start, and less than Limit.
// (Bigtable string comparison is the same as Go's.)
// A RowRange can be unbounded, encompassing all keys at least as large as Start.
type rangeBoundType int64

const (
rangeOpen rangeBoundType = iota
rangeClosed
rangeUnbounded
)

// A RowRange describes a range of rows between the start and end key. Start and
// end keys may be rangeOpen, rangeClosed or rangeUnbounded.
type RowRange struct {
start string
limit string
startBound rangeBoundType
start string
endBound rangeBoundType
end string
}

// NewRange returns the new RowRange [begin, end).
func NewRange(begin, end string) RowRange {
return createRowRange(rangeClosed, begin, rangeOpen, end)
}

// NewClosedOpenRange returns the RowRange consisting of all greater than or
// equal to the start and less than the end: [start, end).
func NewClosedOpenRange(start, end string) RowRange {
return createRowRange(rangeClosed, start, rangeOpen, end)
}

// NewOpenClosedRange returns the RowRange consisting of all keys greater than
// the start and less than or equal to the end: (start, end].
func NewOpenClosedRange(start, end string) RowRange {
return createRowRange(rangeOpen, start, rangeClosed, end)
}

// NewOpenRange returns the RowRange consisting of all keys greater than the
// start and less than the end: (start, end).
func NewOpenRange(start, end string) RowRange {
return createRowRange(rangeOpen, start, rangeOpen, end)
}

// NewClosedRange returns the RowRange consisting of all keys greater than or
// equal to the start and less than or equal to the end: [start, end].
func NewClosedRange(start, end string) RowRange {
return createRowRange(rangeClosed, start, rangeClosed, end)
}

// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
func PrefixRange(prefix string) RowRange {
end := prefixSuccessor(prefix)
return RowRange{
startBound: rangeClosed,
start: prefix,
endBound: parseBoundType(end, rangeOpen),
end: end,
}
}

// InfiniteRange returns the RowRange consisting of all keys at least as
// large as start: [start, ∞).
func InfiniteRange(start string) RowRange {
return RowRange{
startBound: parseBoundType(start, rangeClosed),
start: start,
endBound: rangeUnbounded,
end: "",
}
}

// InfiniteReverseRange returns the RowRange consisting of all keys less than or
// equal to the end: (∞, end].
func InfiniteReverseRange(end string) RowRange {
return RowRange{
startBound: rangeUnbounded,
start: "",
endBound: parseBoundType(end, rangeClosed),
end: end,
}
}

func createRowRange(startBound rangeBoundType, start string, endBound rangeBoundType, end string) RowRange {
if start == "" {
startBound = rangeUnbounded
}
if end == "" {
endBound = rangeUnbounded
}
return RowRange{
start: begin,
limit: end,
startBound: startBound,
start: start,
endBound: endBound,
end: end,
}
}

// Unbounded tests whether a RowRange is unbounded.
func (r RowRange) Unbounded() bool {
return r.limit == ""
return r.startBound == rangeUnbounded || r.endBound == rangeUnbounded
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

// Contains says whether the RowRange contains the key.
func (r RowRange) Contains(row string) bool {
return r.start <= row && (r.limit == "" || r.limit > row)
contains := true

switch r.startBound {
case rangeOpen:
contains = contains && r.start < row
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
break
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
case rangeClosed:
contains = contains && r.start <= row
break
case rangeUnbounded:
break
}

switch r.endBound {
case rangeOpen:
contains = contains && r.end > row
break
case rangeClosed:
contains = contains && r.end >= row
break
case rangeUnbounded:
break
}

return contains
}

// String provides a printable description of a RowRange.
func (r RowRange) String() string {
a := strconv.Quote(r.start)
if r.Unbounded() {
return fmt.Sprintf("[%s,∞)", a)
var startStr string
switch r.startBound {
case rangeOpen:
startStr = "(" + strconv.Quote(r.start)
break
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
case rangeClosed:
startStr = "[" + strconv.Quote(r.start)
break
case rangeUnbounded:
startStr = "(∞"
break
}
return fmt.Sprintf("[%s,%q)", a, r.limit)

var endStr string
switch r.endBound {
case rangeOpen:
endStr = r.end + ")"
break
case rangeClosed:
endStr = r.end + "]"
break
case rangeUnbounded:
endStr = "∞)"
break
}

return fmt.Sprintf("%s,%s", startStr, endStr)
}

func (r RowRange) proto() *btpb.RowSet {
rr := &btpb.RowRange{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
rr := &btpb.RowRange{}

switch r.startBound {
case rangeOpen:
rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)}
break
case rangeClosed:
rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}
break
case rangeUnbounded:
// leave unbounded
break
}
if !r.Unbounded() {
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}

switch r.endBound {
case rangeOpen:
rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.end)}
break
case rangeClosed:
rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.end)}
break
case rangeUnbounded:
// leave unbounded
break
}

return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
}

func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
if lastRowKey == "" || lastRowKey < r.start {
return r
}
// Set the beginning of the range to the row after the last scanned.
start := lastRowKey + "\x00"
if r.Unbounded() {
return InfiniteRange(start)

return RowRange{
// Set the beginning of the range to the row after the last scanned.
startBound: rangeOpen,
start: lastRowKey,
endBound: r.endBound,
end: r.end,
}
}

func (r RowRange) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" || (r.endBound != rangeUnbounded && r.end <= lastRowKey) {
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
return r
}

return RowRange{
startBound: r.startBound,
start: r.start,
endBound: rangeOpen,
end: lastRowKey,
}
return NewRange(start, r.limit)
}

func (r RowRange) valid() bool {
return r.Unbounded() || r.start < r.limit
if r.Unbounded() {
return true
}

if r.startBound == rangeOpen || r.endBound == rangeOpen {
return r.start < r.end
} else if r.startBound == rangeClosed {
return r.start <= r.end
} else {
return true
}
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
}

// RowRangeList is a sequence of RowRanges representing the union of the ranges.
Expand Down Expand Up @@ -424,6 +617,21 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
return ranges
}

func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet {
if lastRowKey == "" {
return r
}
// Return a list of any range that has not yet been completely processed
var ranges RowRangeList
for _, rr := range r {
retained := rr.retainRowsBefore(lastRowKey)
if retained.valid() {
ranges = append(ranges, retained.(RowRange))
}
}
return ranges
}

func (r RowRangeList) valid() bool {
for _, rr := range r {
if rr.valid() {
Expand All @@ -438,21 +646,11 @@ func SingleRow(row string) RowSet {
return RowList{row}
}

// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
func PrefixRange(prefix string) RowRange {
return RowRange{
start: prefix,
limit: prefixSuccessor(prefix),
}
}

// InfiniteRange returns the RowRange consisting of all keys at least as
// large as start.
func InfiniteRange(start string) RowRange {
return RowRange{
start: start,
limit: "",
func parseBoundType(bound string, defaultBoundType rangeBoundType) rangeBoundType {
brandtnewton marked this conversation as resolved.
Show resolved Hide resolved
if bound == "" {
return rangeUnbounded
}
return defaultBoundType
}

// prefixSuccessor returns the lexically smallest string greater than the
Expand Down Expand Up @@ -557,7 +755,7 @@ type rowFilter struct{ f Filter }

func (rf rowFilter) set(settings *readSettings) { settings.req.Filter = rf.f.proto() }

// LimitRows returns a ReadOption that will limit the number of rows to be read.
// LimitRows returns a ReadOption that will end the number of rows to be read.
func LimitRows(limit int64) ReadOption { return limitRows{limit} }

type limitRows struct{ limit int64 }
Expand All @@ -577,6 +775,25 @@ func (wrs withFullReadStats) set(settings *readSettings) {
settings.fullReadStatsFunc = wrs.f
}

// ReverseScan returns a RadOption that will reverse the results of a Scan.
// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are
// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
// will remain unchanged from the ordering forward scans. This is particularly useful to get the
// last N records before a key:
//
// table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool {
// return true
// }, bigtable.ReverseScan(), bigtable.LimitRows(10))
func ReverseScan() ReadOption {
return reverseScan{}
}

type reverseScan struct{}

func (rs reverseScan) set(settings *readSettings) {
settings.req.Reversed = true
}

// mutationsAreRetryable returns true if all mutations are idempotent
// and therefore retryable. A mutation is idempotent iff all cell timestamps
// have an explicit timestamp set and do not rely on the timestamp being set on the server.
Expand Down