Skip to content

Commit

Permalink
atxs: regossip atxs in publish epoch (#5097)
Browse files Browse the repository at this point in the history
closes: #5068

there is an indication that not all atxs are propagated before target epoch starts. it results in efficiency in some data structures and in future will lead to lost rewards.

if node was offline when built an atx or experienced some other problems, the only chance it has to broadcast an atx is when peers asks for it (atx sync). that approach is in general not scalable and naive about concurrent requests.  

as atx are not large (~1KB) we can regossip them periodically. if peer already stores this atx it will not gossip it futher
  • Loading branch information
dshulyak committed Sep 27, 2023
1 parent 060a126 commit f588809
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 11 deletions.
48 changes: 44 additions & 4 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ const (

// Config defines configuration for Builder.
type Config struct {
CoinbaseAccount types.Address
GoldenATXID types.ATXID
LayersPerEpoch uint32
CoinbaseAccount types.Address
GoldenATXID types.ATXID
LayersPerEpoch uint32
RegossipInterval time.Duration
}

// Builder struct is the struct that orchestrates the creation of activation transactions
Expand All @@ -79,6 +80,7 @@ type Builder struct {
coinbaseAccount types.Address
goldenATXID types.ATXID
layersPerEpoch uint32
regossipInterval time.Duration
cdb *datastore.CachedDB
atxHandler atxHandler
publisher pubsub.Publisher
Expand Down Expand Up @@ -165,6 +167,7 @@ func NewBuilder(
coinbaseAccount: conf.CoinbaseAccount,
goldenATXID: conf.GoldenATXID,
layersPerEpoch: conf.LayersPerEpoch,
regossipInterval: conf.RegossipInterval,
cdb: cdb,
atxHandler: hdlr,
publisher: publisher,
Expand Down Expand Up @@ -235,7 +238,22 @@ func (b *Builder) StartSmeshing(coinbase types.Address, opts PostSetupOpts) erro
b.run(ctx)
return nil
})

if b.regossipInterval != 0 {
b.eg.Go(func() error {
ticker := time.NewTicker(b.regossipInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := b.Regossip(ctx); err != nil {
b.log.With().Warning("failed to regossip", log.Context(ctx), log.Err(err))
}
}
}
})
}
return nil
}

Expand Down Expand Up @@ -724,6 +742,28 @@ func (b *Builder) GetPositioningAtx() (types.ATXID, error) {
return id, nil
}

func (b *Builder) Regossip(ctx context.Context) error {
epoch := b.layerClock.CurrentLayer().GetEpoch()
atx, err := atxs.GetIDByEpochAndNodeID(b.cdb, epoch, b.signer.NodeID())
if errors.Is(err, sql.ErrNotFound) {
return nil
} else if err != nil {
return err
}
blob, err := atxs.GetBlob(b.cdb, atx[:])
if err != nil {
return fmt.Errorf("get blob %s: %w", atx.ShortString(), err)
}
if len(blob) == 0 {
return nil // checkpoint
}
if err := b.publisher.Publish(ctx, pubsub.AtxProtocol, blob); err != nil {
return fmt.Errorf("republish %s: %w", atx.ShortString(), err)
}
b.log.With().Debug("regossipped atx", log.Context(ctx), log.ShortStringer("atx", atx))
return nil
}

// SignAndFinalizeAtx signs the atx with specified signer and calculates the ID of the ATX.
func SignAndFinalizeAtx(signer *signing.EdSigner, atx *types.ActivationTx) error {
atx.Signature = signer.Sign(signing.ATX, atx.SignedBytes())
Expand Down
29 changes: 29 additions & 0 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,35 @@ func TestWaitPositioningAtx(t *testing.T) {
}
}

func TestRegossip(t *testing.T) {
layer := types.LayerID(10)
t.Run("not found", func(t *testing.T) {
h := newTestBuilder(t)
h.mclock.EXPECT().CurrentLayer().Return(layer)
require.NoError(t, h.Regossip(context.Background()))
})
t.Run("success", func(t *testing.T) {
h := newTestBuilder(t)
atx := newActivationTx(t,
h.signer, 0, types.EmptyATXID, types.EmptyATXID, nil,
layer.GetEpoch(), 0, 1, types.Address{}, 1, &types.NIPost{})
require.NoError(t, atxs.Add(h.cdb.Database, atx))
blob, err := atxs.GetBlob(h.cdb.Database, atx.ID().Bytes())
require.NoError(t, err)
h.mclock.EXPECT().CurrentLayer().Return(layer)
ctx := context.Background()
h.mpub.EXPECT().Publish(ctx, pubsub.AtxProtocol, blob)
require.NoError(t, h.Regossip(ctx))
})
t.Run("checkpointed", func(t *testing.T) {
h := newTestBuilder(t)
require.NoError(t, atxs.AddCheckpointed(h.cdb.Database, &atxs.CheckpointAtx{
ID: types.ATXID{1}, Epoch: layer.GetEpoch(), SmesherID: h.sig.NodeID()}))

Check failure on line 1297 in activation/activation_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
h.mclock.EXPECT().CurrentLayer().Return(layer)
require.NoError(t, h.Regossip(context.Background()))
})
}

func TestWaitingToBuildNipostChallengeWithJitter(t *testing.T) {
t.Run("before grace period", func(t *testing.T) {
// ┌──grace period──┐
Expand Down
6 changes: 6 additions & 0 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type atxChan struct {

// Handler processes the atxs received from all nodes and their validity status.
type Handler struct {
local p2p.Peer
cdb *datastore.CachedDB
edVerifier *signing.EdVerifier
clock layerClock
Expand All @@ -57,6 +58,7 @@ type Handler struct {

// NewHandler returns a data handler for ATX.
func NewHandler(
local p2p.Peer,
cdb *datastore.CachedDB,
edVerifier *signing.EdVerifier,
c layerClock,
Expand All @@ -71,6 +73,7 @@ func NewHandler(
poetCfg PoetConfig,
) *Handler {
return &Handler{
local: local,
cdb: cdb,
edVerifier: edVerifier,
clock: c,
Expand Down Expand Up @@ -493,6 +496,9 @@ func (h *Handler) HandleGossipAtx(ctx context.Context, peer p2p.Peer, msg []byte
log.Err(err),
)
}
if errors.Is(err, errKnownAtx) && peer == h.local {
return nil
}
return err
}

Expand Down
27 changes: 25 additions & 2 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (
"github.com/spacemeshos/go-spacemesh/system/mocks"
)

const layersPerEpochBig = 1000
const (
layersPerEpochBig = 1000
localID = "local"
)

func newMerkleProof(t testing.TB, challenge types.Hash32, otherLeafs []types.Hash32) (types.MerkleProof, types.Hash32) {
t.Helper()
Expand Down Expand Up @@ -101,7 +104,7 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID) *testHandler {
mbeacon := NewMockAtxReceiver(ctrl)
mtortoise := mocks.NewMockTortoise(ctrl)

atxHdlr := NewHandler(cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
atxHdlr := NewHandler(localID, cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
return &testHandler{
Handler: atxHdlr,

Expand Down Expand Up @@ -1002,6 +1005,26 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.Error(t, atxHdlr.HandleGossipAtx(context.Background(), "", buf))
})
t.Run("known atx from local id is allowed", func(t *testing.T) {
t.Parallel()

atxHdlr := newTestHandler(t, goldenATXID)

atx := newActivationTx(t, sig, 0, types.EmptyATXID, types.EmptyATXID, nil, 0, 0, 0, types.Address{2, 4, 5}, 2, nil)

atxHdlr.mbeacon.EXPECT().OnAtx(gomock.Any())
atxHdlr.mtortoise.EXPECT().OnAtx(gomock.Any())
require.NoError(t, atxHdlr.ProcessAtx(context.Background(), atx))

buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf))

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), localID, buf))
})

t.Run("atx with invalid signature", func(t *testing.T) {
t.Parallel()
Expand Down
1 change: 1 addition & 0 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func validateAndPreserveData(tb testing.TB, db *sql.Database, deps []*types.Veri
mtrtl := smocks.NewMockTortoise(ctrl)
cdb := datastore.NewCachedDB(db, lg)
atxHandler := activation.NewHandler(
"",
cdb,
edVerifier,
mclock,
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type BaseConfig struct {
// MinerGoodAtxsPercent is a threshold to decide if tortoise activeset should be
// picked from first block insted of synced data.
MinerGoodAtxsPercent int `mapstructure:"miner-good-atxs-percent"`

RegossipAtxInterval time.Duration `mapstructure:"regossip-atx-interval"`
}

type PublicMetrics struct {
Expand Down
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func MainnetConfig() Config {
"https://poet-110.spacemesh.network",
"https://poet-111.spacemesh.network",
},
RegossipAtxInterval: 2 * time.Hour,
},
Genesis: &GenesisConfig{
GenesisTime: "2023-07-14T08:00:00Z",
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func fastnet() config.Config {
conf.Sync.Interval = 5 * time.Second
conf.Sync.GossipDuration = 10 * time.Second
conf.LayersPerEpoch = 4
conf.RegossipAtxInterval = 30 * time.Second

conf.Tortoise.Hdist = 4
conf.Tortoise.Zdist = 2
Expand Down
5 changes: 3 additions & 2 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func testnet() config.Config {

OptFilterThreshold: 90,

TickSize: 666514,
PoETServers: []string{},
TickSize: 666514,
PoETServers: []string{},
RegossipAtxInterval: time.Hour,
},
Genesis: &config.GenesisConfig{
GenesisTime: "2023-09-13T18:00:00Z",
Expand Down
8 changes: 5 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ func (app *App) initServices(ctx context.Context) error {

fetcherWrapped := &layerFetcher{}
atxHandler := activation.NewHandler(
app.host.ID(),
app.cachedDB,
app.edVerifier,
app.clock,
Expand Down Expand Up @@ -858,9 +859,10 @@ func (app *App) initServices(ctx context.Context) error {
}

builderConfig := activation.Config{
CoinbaseAccount: coinbaseAddr,
GoldenATXID: goldenATXID,
LayersPerEpoch: layersPerEpoch,
CoinbaseAccount: coinbaseAddr,
GoldenATXID: goldenATXID,
LayersPerEpoch: layersPerEpoch,
RegossipInterval: app.Config.RegossipAtxInterval,
}
atxBuilder := activation.NewBuilder(
builderConfig,
Expand Down

0 comments on commit f588809

Please sign in to comment.