Skip to content

Commit

Permalink
core: manage txpool subscription in mainpool
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Sep 24, 2023
1 parent 8bfd23f commit 1af3bf2
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 14 deletions.
10 changes: 3 additions & 7 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,8 @@ type BlobPool struct {
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full

eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination

lock sync.RWMutex // Mutex protecting the pool during reorg handling
eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
lock sync.RWMutex // Mutex protecting the pool during reorg handling
}

// New creates a new blob transaction pool to gather, sort and filter inbound
Expand Down Expand Up @@ -430,8 +428,6 @@ func (p *BlobPool) Close() error {
if err := p.store.Close(); err != nil {
errs = append(errs, err)
}
p.eventScope.Close()

switch {
case errs == nil:
return nil
Expand Down Expand Up @@ -1465,7 +1461,7 @@ func (p *BlobPool) updateLimboMetrics() {
// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
return p.eventFeed.Subscribe(ch)
}

// Nonce returns the next nonce of an account, with all transactions executable
Expand Down
6 changes: 1 addition & 5 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ type LegacyPool struct {
chain BlockChain
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

Expand Down Expand Up @@ -391,9 +390,6 @@ func (pool *LegacyPool) loop() {

// Close terminates the transaction pool.
func (pool *LegacyPool) Close() error {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()

// Terminate the pool reorger and return
close(pool.reorgShutdownCh)
pool.wg.Wait()
Expand All @@ -415,7 +411,7 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
return pool.txFeed.Subscribe(ch)
}

// SetGasTip updates the minimum gas tip required by the transaction pool for a
Expand Down
4 changes: 3 additions & 1 deletion core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ func (p *TxPool) Close() error {
if err := <-errc; err != nil {
errs = append(errs, err)
}

// Terminate each subpool if they are initialized
if p.inited.Load() {
for _, subpool := range p.subpools {
Expand All @@ -214,6 +213,9 @@ func (p *TxPool) Close() error {
}
}
}
// Terminate all the subpool subscriptions
p.subs.Close()

if len(errs) > 0 {
return fmt.Errorf("txpool close errors: %v", errs)
}
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (db *Database) Deactivate() error {

// Write the initial sync flag to persist it across restarts.
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning)
log.Info("Disabled trie database due to ongoing sync")
log.Info("Disabled trie database due to state sync")
return nil
}

Expand Down

0 comments on commit 1af3bf2

Please sign in to comment.