Skip to content

Commit

Permalink
eth/filters: retrieve logs in async (ethereum#27135)
Browse files Browse the repository at this point in the history
This change implements async log retrievals via feeding logs in channels, instead of returning slices. This is a first step to implement ethereum#15063.  

---------

Signed-off-by: jsvisa <delweng@gmail.com>
Co-authored-by: Sina Mahmoodi <itz.s1na@gmail.com>
Co-authored-by: Martin Holst Swende <martin@swende.se>
Co-authored-by: Sina Mahmoodi <1591639+s1na@users.noreply.github.com>
  • Loading branch information
4 people authored and devopsbo3 committed Nov 10, 2023
1 parent f090ea8 commit f875ab9
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 153 deletions.
168 changes: 103 additions & 65 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,32 +106,32 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
if f.end != rpc.PendingBlockNumber.Int64() {
return nil, errors.New("invalid block range")
}
return f.pendingLogs()
}
// Figure out the limits of the filter range
header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}

var (
err error
head = header.Number.Int64()
pending = f.end == rpc.PendingBlockNumber.Int64()
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
endPending = f.end == rpc.PendingBlockNumber.Int64()
)

// special case for pending logs
if beginPending && !endPending {
return nil, errors.New("invalid block range")
}

// Short-cut if all we care about is pending logs
if beginPending && endPending {
return f.pendingLogs(), nil
}

resolveSpecial := func(number int64) (int64, error) {
var hdr *types.Header
switch number {
case rpc.LatestBlockNumber.Int64():
return head, nil
case rpc.PendingBlockNumber.Int64():
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
// we should return head here since we've already captured
// that we need to get the pending logs in the pending boolean above
return head, nil
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if hdr == nil {
return 0, errors.New("latest header not found")
}
case rpc.FinalizedBlockNumber.Int64():
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
if hdr == nil {
Expand All @@ -147,57 +147,92 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return hdr.Number.Int64(), nil
}

var err error
// range query need to resolve the special begin/end block number
if f.begin, err = resolveSpecial(f.begin); err != nil {
return nil, err
}
if f.end, err = resolveSpecial(f.end); err != nil {
return nil, err
}
// Gather all indexed logs, and finish with non indexed ones

logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
if err != nil {
// if an error occurs during extraction, we do return the extracted data
return logs, err
}
// Append the pending ones
if endPending {
pendingLogs := f.pendingLogs()
logs = append(logs, pendingLogs...)
}
return logs, nil
}
}
}

// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
// it creates and returns two channels: one for delivering log data, and one for reporting errors.
func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) {
var (
logs []*types.Log
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
logChan = make(chan *types.Log)
errChan = make(chan error)
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)
}
if err != nil {
return logs, err

go func() {
defer func() {
close(errChan)
close(logChan)
}()

// Gather all indexed logs, and finish with non indexed ones
var (
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
err error
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
indexed = end + 1
}
if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
errChan <- err
return
}
}
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
if pending {
pendingLogs, err := f.pendingLogs()
if err != nil {
return nil, err

if err := f.unindexedLogs(ctx, end, logChan); err != nil {
errChan <- err
return
}
logs = append(logs, pendingLogs...)
}
return logs, err

errChan <- nil
}()

return logChan, errChan
}

// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)

session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
if err != nil {
return nil, err
return err
}
defer session.Close()

f.sys.backend.ServiceFilter(ctx, session)

// Iterate over the matches until exhausted or context closed
var logs []*types.Log

for {
select {
case number, ok := <-matches:
Expand All @@ -207,47 +242,50 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if err == nil {
f.begin = int64(end) + 1
}
return logs, err
return err
}
f.begin = int64(number) + 1

// Retrieve the suggested block and pull any truly matching logs
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return logs, err
return err
}
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
return err
}
for _, log := range found {
logChan <- log
}
logs = append(logs, found...)

case <-ctx.Done():
return logs, ctx.Err()
return ctx.Err()
}
}
}

// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
var logs []*types.Log

func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
for ; f.begin <= int64(end); f.begin++ {
if f.begin%10 == 0 && ctx.Err() != nil {
return logs, ctx.Err()
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return logs, err
return err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
return err
}
for _, log := range found {
select {
case logChan <- log:
case <-ctx.Done():
return ctx.Err()
}
}
logs = append(logs, found...)
}
return logs, nil
return nil
}

// blockLogs returns the logs matching the filter criteria within a single block.
Expand Down Expand Up @@ -294,19 +332,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() ([]*types.Log, error) {
func (f *Filter) pendingLogs() []*types.Log {
block, receipts := f.sys.backend.PendingBlockAndReceipts()
if block == nil {
return nil, errors.New("pending state not available")
return nil
}
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
return nil, nil
return nil
}

func includes(addresses []common.Address, a common.Address) bool {
Expand Down
4 changes: 3 additions & 1 deletion eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
pendingBlock *types.Block
pendingReceipts types.Receipts
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
Expand Down Expand Up @@ -124,7 +126,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint
}

func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
return b.pendingBlock, b.pendingReceipts
}

func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
Expand Down

0 comments on commit f875ab9

Please sign in to comment.