Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - atxs: regossip atxs in publish epoch #5097

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 44 additions & 4 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@

// Config defines configuration for Builder.
type Config struct {
CoinbaseAccount types.Address
GoldenATXID types.ATXID
LayersPerEpoch uint32
CoinbaseAccount types.Address
GoldenATXID types.ATXID
LayersPerEpoch uint32
RegossipInterval time.Duration
}

// Builder struct is the struct that orchestrates the creation of activation transactions
Expand All @@ -79,6 +80,7 @@
coinbaseAccount types.Address
goldenATXID types.ATXID
layersPerEpoch uint32
regossipInterval time.Duration
cdb *datastore.CachedDB
atxHandler atxHandler
publisher pubsub.Publisher
Expand Down Expand Up @@ -165,6 +167,7 @@
coinbaseAccount: conf.CoinbaseAccount,
goldenATXID: conf.GoldenATXID,
layersPerEpoch: conf.LayersPerEpoch,
regossipInterval: conf.RegossipInterval,
poszu marked this conversation as resolved.
Show resolved Hide resolved
cdb: cdb,
atxHandler: hdlr,
publisher: publisher,
Expand Down Expand Up @@ -235,7 +238,22 @@
b.run(ctx)
return nil
})

if b.regossipInterval != 0 {
b.eg.Go(func() error {
ticker := time.NewTicker(b.regossipInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := b.Regossip(ctx); err != nil {
b.log.With().Warning("failed to regossip", log.Context(ctx), log.Err(err))
}

Check warning on line 252 in activation/activation.go

View check run for this annotation

Codecov / codecov/patch

activation/activation.go#L249-L252

Added lines #L249 - L252 were not covered by tests
}
}
})
}
return nil
}

Expand Down Expand Up @@ -724,6 +742,28 @@
return id, nil
}

func (b *Builder) Regossip(ctx context.Context) error {
epoch := b.layerClock.CurrentLayer().GetEpoch()
atx, err := atxs.GetIDByEpochAndNodeID(b.cdb, epoch, b.signer.NodeID())
if errors.Is(err, sql.ErrNotFound) {
return nil
} else if err != nil {
return err
}

Check warning on line 752 in activation/activation.go

View check run for this annotation

Codecov / codecov/patch

activation/activation.go#L751-L752

Added lines #L751 - L752 were not covered by tests
blob, err := atxs.GetBlob(b.cdb, atx[:])
if err != nil {
return fmt.Errorf("get blob %s: %w", atx.ShortString(), err)
}

Check warning on line 756 in activation/activation.go

View check run for this annotation

Codecov / codecov/patch

activation/activation.go#L755-L756

Added lines #L755 - L756 were not covered by tests
if len(blob) == 0 {
return nil // checkpoint
}
if err := b.publisher.Publish(ctx, pubsub.AtxProtocol, blob); err != nil {
return fmt.Errorf("republish %s: %w", atx.ShortString(), err)
}

Check warning on line 762 in activation/activation.go

View check run for this annotation

Codecov / codecov/patch

activation/activation.go#L761-L762

Added lines #L761 - L762 were not covered by tests
b.log.With().Debug("regossipped atx", log.Context(ctx), log.ShortStringer("atx", atx))
return nil
}

// SignAndFinalizeAtx signs the atx with specified signer and calculates the ID of the ATX.
func SignAndFinalizeAtx(signer *signing.EdSigner, atx *types.ActivationTx) error {
atx.Signature = signer.Sign(signing.ATX, atx.SignedBytes())
Expand Down
28 changes: 28 additions & 0 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,34 @@
}
}

func TestRegossip(t *testing.T) {
layer := types.LayerID(10)
t.Run("not found", func(t *testing.T) {
h := newTestBuilder(t)
h.mclock.EXPECT().CurrentLayer().Return(layer)
require.NoError(t, h.Regossip(context.Background()))
})
t.Run("success", func(t *testing.T) {
h := newTestBuilder(t)
atx := newActivationTx(t,
h.signer, 0, types.EmptyATXID, types.EmptyATXID, nil,
layer.GetEpoch(), 0, 1, types.Address{}, 1, &types.NIPost{})
require.NoError(t, atxs.Add(h.cdb.Database, atx))
blob, err := atxs.GetBlob(h.cdb.Database, atx.ID().Bytes())
require.NoError(t, err)
h.mclock.EXPECT().CurrentLayer().Return(layer)
h.mpub.EXPECT().Publish(gomock.Any(), gomock.Any(), blob)
dshulyak marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, h.Regossip(context.Background()))
})
t.Run("checkpointed", func(t *testing.T) {
h := newTestBuilder(t)
require.NoError(t, atxs.AddCheckpointed(h.cdb.Database, &atxs.CheckpointAtx{
ID: types.ATXID{1}, Epoch: layer.GetEpoch(), SmesherID: h.sig.NodeID()}))

Check failure on line 1296 in activation/activation_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
h.mclock.EXPECT().CurrentLayer().Return(layer)
require.NoError(t, h.Regossip(context.Background()))
})
}

func TestWaitingToBuildNipostChallengeWithJitter(t *testing.T) {
t.Run("before grace period", func(t *testing.T) {
// ┌──grace period──┐
Expand Down
6 changes: 6 additions & 0 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type atxChan struct {

// Handler processes the atxs received from all nodes and their validity status.
type Handler struct {
local p2p.Peer
cdb *datastore.CachedDB
edVerifier *signing.EdVerifier
clock layerClock
Expand All @@ -57,6 +58,7 @@ type Handler struct {

// NewHandler returns a data handler for ATX.
func NewHandler(
local p2p.Peer,
cdb *datastore.CachedDB,
edVerifier *signing.EdVerifier,
c layerClock,
Expand All @@ -71,6 +73,7 @@ func NewHandler(
poetCfg PoetConfig,
) *Handler {
return &Handler{
local: local,
cdb: cdb,
edVerifier: edVerifier,
clock: c,
Expand Down Expand Up @@ -493,6 +496,9 @@ func (h *Handler) HandleGossipAtx(ctx context.Context, peer p2p.Peer, msg []byte
log.Err(err),
)
}
if errors.Is(err, errKnownAtx) && peer == h.local {
return nil
}
dshulyak marked this conversation as resolved.
Show resolved Hide resolved
return err
}

Expand Down
27 changes: 25 additions & 2 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (
"github.com/spacemeshos/go-spacemesh/system/mocks"
)

const layersPerEpochBig = 1000
const (
layersPerEpochBig = 1000
localID = "local"
)

func newMerkleProof(t testing.TB, challenge types.Hash32, otherLeafs []types.Hash32) (types.MerkleProof, types.Hash32) {
t.Helper()
Expand Down Expand Up @@ -101,7 +104,7 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID) *testHandler {
mbeacon := NewMockAtxReceiver(ctrl)
mtortoise := mocks.NewMockTortoise(ctrl)

atxHdlr := NewHandler(cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
atxHdlr := NewHandler(localID, cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
return &testHandler{
Handler: atxHdlr,

Expand Down Expand Up @@ -1002,6 +1005,26 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.Error(t, atxHdlr.HandleGossipAtx(context.Background(), "", buf))
})
t.Run("known atx from local id is allowed", func(t *testing.T) {
t.Parallel()

atxHdlr := newTestHandler(t, goldenATXID)

atx := newActivationTx(t, sig, 0, types.EmptyATXID, types.EmptyATXID, nil, 0, 0, 0, types.Address{2, 4, 5}, 2, nil)

atxHdlr.mbeacon.EXPECT().OnAtx(gomock.Any())
atxHdlr.mtortoise.EXPECT().OnAtx(gomock.Any())
require.NoError(t, atxHdlr.ProcessAtx(context.Background(), atx))

buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf))

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), localID, buf))
})

t.Run("atx with invalid signature", func(t *testing.T) {
t.Parallel()
Expand Down
1 change: 1 addition & 0 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func validateAndPreserveData(tb testing.TB, db *sql.Database, deps []*types.Veri
mtrtl := smocks.NewMockTortoise(ctrl)
cdb := datastore.NewCachedDB(db, lg)
atxHandler := activation.NewHandler(
"",
cdb,
edVerifier,
mclock,
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type BaseConfig struct {
// MinerGoodAtxsPercent is a threshold to decide if tortoise activeset should be
// picked from first block insted of synced data.
MinerGoodAtxsPercent int `mapstructure:"miner-good-atxs-percent"`

RegossipAtxInterval time.Duration `mapstructure:"regossip-atx-interval"`
}

type PublicMetrics struct {
Expand Down
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func MainnetConfig() Config {
"https://poet-110.spacemesh.network",
"https://poet-111.spacemesh.network",
},
RegossipAtxInterval: 2 * time.Hour,
},
Genesis: &GenesisConfig{
GenesisTime: "2023-07-14T08:00:00Z",
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func fastnet() config.Config {
conf.Sync.Interval = 5 * time.Second
conf.Sync.GossipDuration = 10 * time.Second
conf.LayersPerEpoch = 4
conf.RegossipAtxInterval = 30 * time.Second

conf.Tortoise.Hdist = 4
conf.Tortoise.Zdist = 2
Expand Down
5 changes: 3 additions & 2 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func testnet() config.Config {

OptFilterThreshold: 90,

TickSize: 666514,
PoETServers: []string{},
TickSize: 666514,
PoETServers: []string{},
RegossipAtxInterval: time.Hour,
},
Genesis: &config.GenesisConfig{
GenesisTime: "2023-09-13T18:00:00Z",
Expand Down
8 changes: 5 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ func (app *App) initServices(ctx context.Context) error {

fetcherWrapped := &layerFetcher{}
atxHandler := activation.NewHandler(
app.host.ID(),
app.cachedDB,
app.edVerifier,
app.clock,
Expand Down Expand Up @@ -858,9 +859,10 @@ func (app *App) initServices(ctx context.Context) error {
}

builderConfig := activation.Config{
CoinbaseAccount: coinbaseAddr,
GoldenATXID: goldenATXID,
LayersPerEpoch: layersPerEpoch,
CoinbaseAccount: coinbaseAddr,
GoldenATXID: goldenATXID,
LayersPerEpoch: layersPerEpoch,
RegossipInterval: app.Config.RegossipAtxInterval,
}
atxBuilder := activation.NewBuilder(
builderConfig,
Expand Down