Skip to content

Commit

Permalink
Fix getting validators map for local relay (ethereum#20)
Browse files Browse the repository at this point in the history
* Fix getting validators map for local relay

* pr comments

* add timer for updating known validators

* improvement to local validator map fetching

* lock for map updating

* properly lock updates

* get current slot if the mapping is empty

* remove onForkchoiceUpdate

* graceful shutdown

* Split initial proposer sync from the proposer fetch loop (ethereum#28)

Co-authored-by: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com>
  • Loading branch information
avalonche and Ruteri committed Feb 6, 2023
1 parent 6ba82c8 commit 6bd0d57
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 130 deletions.
127 changes: 92 additions & 35 deletions builder/beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -19,82 +20,138 @@ type testBeaconClient struct {
slot uint64
}

func (b *testBeaconClient) Stop() {
return
}

func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool {
return true
}
func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil
}
func (b *testBeaconClient) onForkchoiceUpdate() (uint64, error) {
return b.slot, nil
func (b *testBeaconClient) Start() error {
return nil
}

type BeaconClient struct {
endpoint string
endpoint string
slotsInEpoch uint64
secondsInSlot uint64

mu sync.Mutex
slotProposerMap map[uint64]PubkeyHex

mu sync.Mutex
currentEpoch uint64
currentSlot uint64
nextSlotProposer PubkeyHex
slotProposerMap map[uint64]PubkeyHex
closeCh chan struct{}
}

func NewBeaconClient(endpoint string) *BeaconClient {
func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient {
return &BeaconClient{
endpoint: endpoint,
slotsInEpoch: slotsInEpoch,
secondsInSlot: secondsInSlot,
slotProposerMap: make(map[uint64]PubkeyHex),
closeCh: make(chan struct{}),
}
}

func (b *BeaconClient) Stop() {
close(b.closeCh)
}

func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool {
return true
}

func (b *BeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
/* Only returns proposer if requestedSlot is currentSlot + 1, would be a race otherwise */
b.mu.Lock()
defer b.mu.Unlock()

if b.currentSlot+1 != requestedSlot {
return PubkeyHex(""), errors.New("slot out of sync")
nextSlotProposer, found := b.slotProposerMap[requestedSlot]
if !found {
log.Error("inconsistent proposer mapping", "requestSlot", requestedSlot, "slotProposerMap", b.slotProposerMap)
return PubkeyHex(""), errors.New("inconsistent proposer mapping")
}
return b.nextSlotProposer, nil
return nextSlotProposer, nil
}

/* Returns next slot's proposer pubkey */
// TODO: what happens if no block for previous slot - should still get next slot
func (b *BeaconClient) onForkchoiceUpdate() (uint64, error) {
b.mu.Lock()
defer b.mu.Unlock()
func (b *BeaconClient) Start() error {
go b.UpdateValidatorMapForever()
return nil
}

func (b *BeaconClient) UpdateValidatorMapForever() {
durationPerSlot := time.Duration(b.secondsInSlot) * time.Second

prevFetchSlot := uint64(0)

// fetch current epoch if beacon is online
currentSlot, err := fetchCurrentSlot(b.endpoint)
if err != nil {
return 0, err
log.Error("could not get current slot", "err", err)
} else {
currentEpoch := currentSlot / b.slotsInEpoch
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err)
} else {
b.mu.Lock()
b.slotProposerMap = slotProposerMap
b.mu.Unlock()
}
}

nextSlot := currentSlot + 1
retryDelay := time.Second

b.currentSlot = currentSlot
nextSlotEpoch := nextSlot / 32
// Every half epoch request validators map, polling for the slot
// more frequently to avoid missing updates on errors
timer := time.NewTimer(retryDelay)
defer timer.Stop()
for true {
select {
case <-b.closeCh:
return
case <-timer.C:
}

if nextSlotEpoch != b.currentEpoch {
// TODO: this should be prepared in advance, possibly just fetch for next epoch in advance
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, nextSlotEpoch)
currentSlot, err := fetchCurrentSlot(b.endpoint)
if err != nil {
return 0, err
log.Error("could not get current slot", "err", err)
timer.Reset(retryDelay)
continue
}

b.currentEpoch = nextSlotEpoch
b.slotProposerMap = slotProposerMap
}
// TODO: should poll after consistent slot within the epoch (slot % slotsInEpoch/2 == 0)
nextFetchSlot := prevFetchSlot + b.slotsInEpoch/2
if currentSlot < nextFetchSlot {
timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
continue
}

nextSlotProposer, found := b.slotProposerMap[nextSlot]
if !found {
log.Error("inconsistent proposer mapping", "currentSlot", currentSlot, "slotProposerMap", b.slotProposerMap)
return 0, errors.New("inconsistent proposer mapping")
currentEpoch := currentSlot / b.slotsInEpoch
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1)
if err != nil {
log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err)
timer.Reset(retryDelay)
continue
}

prevFetchSlot = currentSlot
b.mu.Lock()
// remove previous epoch slots
for k := range b.slotProposerMap {
if k < currentEpoch*b.slotsInEpoch {
delete(b.slotProposerMap, k)
}
}
// update the slot proposer map for next epoch
for k, v := range slotProposerMap {
b.slotProposerMap[k] = v
}
b.mu.Unlock()

timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
}
b.nextSlotProposer = nextSlotProposer
return nextSlot, nil
}

func fetchCurrentSlot(endpoint string) (uint64, error) {
Expand Down
92 changes: 0 additions & 92 deletions builder/beacon_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,95 +174,3 @@ func TestFetchEpochProposersMap(t *testing.T) {
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a"), proposersMap[1])
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), proposersMap[2])
}

func TestOnForkchoiceUpdate(t *testing.T) {
mbn := newMockBeaconNode()
defer mbn.srv.Close()

mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`)

mbn.proposerDuties[1] = []byte(`{
"dependent_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
"execution_optimistic": false,
"data": [
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a",
"validator_index": "1",
"slot": "31"
},
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b",
"validator_index": "2",
"slot": "32"
},
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74c",
"validator_index": "3",
"slot": "33"
}
]
}`)

bc := NewBeaconClient(mbn.srv.URL)
slot, err := bc.onForkchoiceUpdate()
require.NoError(t, err)
require.Equal(t, slot, uint64(32))

pubkeyHex, err := bc.getProposerForNextSlot(32)
require.NoError(t, err)
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), pubkeyHex)

_, err = bc.getProposerForNextSlot(31)
require.EqualError(t, err, "slot out of sync")

_, err = bc.getProposerForNextSlot(33)
require.EqualError(t, err, "slot out of sync")

mbn.headersCode = 404
mbn.headersResp = []byte(`{ "code": 404, "message": "State not found" }`)

slot, err = NewBeaconClient(mbn.srv.URL).onForkchoiceUpdate()
require.EqualError(t, err, "State not found")
require.Equal(t, slot, uint64(0))

// Check that client does not fetch new proposers if epoch did not change
mbn.headersCode = 200
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`)
mbn.proposerDuties[1] = []byte(`{
"data": [
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d",
"validator_index": "4",
"slot": "32"
}
]
}`)

slot, err = bc.onForkchoiceUpdate()
require.NoError(t, err, "")
require.Equal(t, slot, uint64(32))

mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "63", "proposer_index": "1" } } } ] }`)
mbn.proposerDuties[2] = []byte(`{
"data": [
{
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d",
"validator_index": "4",
"slot": "64"
}
]
}`)

slot, err = bc.onForkchoiceUpdate()
require.NoError(t, err, "")
require.Equal(t, slot, uint64(64))

pubkeyHex, err = bc.getProposerForNextSlot(64)
require.NoError(t, err)
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d"), pubkeyHex)

// Check proposers map error is routed out
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "65", "proposer_index": "1" } } } ] }`)
_, err = bc.onForkchoiceUpdate()
require.EqualError(t, err, "inconsistent proposer mapping")
}
7 changes: 5 additions & 2 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ type ValidatorData struct {
type IBeaconClient interface {
isValidator(pubkey PubkeyHex) bool
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
onForkchoiceUpdate() (uint64, error)
Start() error
Stop()
}

type IRelay interface {
SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error
GetValidatorForSlot(nextSlot uint64) (ValidatorData, error)
Start() error
Stop()
}

type IBuilder interface {
Expand Down Expand Up @@ -89,7 +92,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
}

func (b *Builder) Start() error {
return nil
return b.relay.Start()
}

func (b *Builder) Stop() error {
Expand Down
4 changes: 4 additions & 0 deletions builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ type Config struct {
Enabled bool `toml:",omitempty"`
EnableValidatorChecks bool `toml:",omitempty"`
EnableLocalRelay bool `toml:",omitempty"`
SlotsInEpoch uint64 `toml:",omitempty"`
SecondsInSlot uint64 `toml:",omitempty"`
DisableBundleFetcher bool `toml:",omitempty"`
DryRun bool `toml:",omitempty"`
BuilderSecretKey string `toml:",omitempty"`
Expand All @@ -23,6 +25,8 @@ var DefaultConfig = Config{
Enabled: false,
EnableValidatorChecks: false,
EnableLocalRelay: false,
SlotsInEpoch: 32,
SecondsInSlot: 12,
DisableBundleFetcher: false,
DryRun: false,
BuilderSecretKey: "0x2fc12ae741f29701f8e30f5de6350766c020cb80768a0ff01e6838ffd2431e11",
Expand Down
9 changes: 9 additions & 0 deletions builder/local_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ func NewLocalRelay(sk *bls.SecretKey, beaconClient IBeaconClient, builderSigning
}
}

func (r *LocalRelay) Start() error {
r.beaconClient.Start()
return nil
}

func (r *LocalRelay) Stop() {
r.beaconClient.Stop()
}

func (r *LocalRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error {
log.Info("submitting block to local relay", "block", msg.ExecutionPayload.BlockHash.String())
return r.submitBlock(msg)
Expand Down
6 changes: 6 additions & 0 deletions builder/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ func (r *RemoteRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error
return ValidatorData{}, ErrValidatorNotFound
}

func (r *RemoteRelay) Start() error {
return nil
}

func (r *RemoteRelay) Stop() {}

func (r *RemoteRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error {
log.Info("submitting block to remote relay", "endpoint", r.endpoint)
code, err := server.SendHTTPRequest(context.TODO(), *http.DefaultClient, http.MethodPost, r.endpoint+"/relay/v1/builder/blocks", msg, nil)
Expand Down
16 changes: 16 additions & 0 deletions builder/relay_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ func NewRemoteRelayAggregator(primary IRelay, secondary []IRelay) *RemoteRelayAg
}
}

func (r *RemoteRelayAggregator) Start() error {
for _, relay := range r.relays {
err := relay.Start()
if err != nil {
return err
}
}
return nil
}

func (r *RemoteRelayAggregator) Stop() {
for _, relay := range r.relays {
relay.Stop()
}
}

func (r *RemoteRelayAggregator) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, registration ValidatorData) error {
r.registrationsCacheLock.RLock()
defer r.registrationsCacheLock.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions builder/relay_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (r *testRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error)
return r.gvsVd, r.gvsErr
}

func (r *testRelay) Start() error {
return nil
}

func (r *testRelay) Stop() {}

func TestRemoteRelayAggregator(t *testing.T) {
t.Run("should return error if no relays return validator data", func(t *testing.T) {
backend := newTestRelayAggBackend(3)
Expand Down
2 changes: 1 addition & 1 deletion builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
copy(bellatrixForkVersion[:], bellatrixForkVersionBytes[:4])
proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot)

beaconClient := NewBeaconClient(cfg.BeaconEndpoint)
beaconClient := NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot)

var localRelay *LocalRelay
if cfg.EnableLocalRelay {
Expand Down

0 comments on commit 6bd0d57

Please sign in to comment.