Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - don't track latency for messages that are not gossipped futher #5064

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 0 additions & 6 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/metrics"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/signing"
Expand Down Expand Up @@ -509,11 +508,6 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
return fmt.Errorf("%w: %w", errMalformedData, err)
}

epochStart := h.clock.LayerToTime(atx.PublishEpoch.FirstLayer())
poetRoundEnd := epochStart.Add(h.poetCfg.PhaseShift - h.poetCfg.CycleGap)
latency := receivedTime.Sub(poetRoundEnd)
metrics.ReportMessageLatency(pubsub.AtxProtocol, pubsub.AtxProtocol, latency)

atx.SetReceived(receivedTime.Local())
if err := atx.Initialize(); err != nil {
return fmt.Errorf("failed to derive ID from atx: %w", err)
Expand Down
14 changes: 0 additions & 14 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,15 +924,13 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
require.NoError(t, err)

atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer())
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef()).Return(errors.New("woof"))
err = atxHdlr.HandleGossipAtx(context.Background(), "", secondData)
require.Error(t, err)
require.ErrorContains(t, err, "missing poet proof")

atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer())
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).DoAndReturn(
Expand All @@ -941,7 +939,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
data, err := codec.Encode(first)
require.NoError(t, err)
atxHdlr.mclock.EXPECT().CurrentLayer().Return(first.PublishEpoch.FirstLayer())
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mValidator.EXPECT().Post(gomock.Any(), nodeID1, goldenATXID, first.InitialPost, gomock.Any(), first.NumUnits)
atxHdlr.mValidator.EXPECT().VRFNonce(nodeID1, goldenATXID, &vrfNonce, gomock.Any(), first.NumUnits)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
Expand Down Expand Up @@ -981,7 +978,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), fmt.Sprintf("nil nipst for atx %v", atx.ID()))
})

Expand All @@ -999,10 +995,8 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf))

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.Error(t, atxHdlr.HandleGossipAtx(context.Background(), "", buf))
})
t.Run("known atx from local id is allowed", func(t *testing.T) {
Expand All @@ -1018,11 +1012,7 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {

buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf))

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), localID, buf))
})

Expand All @@ -1036,7 +1026,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), "failed to verify atx signature")
})
}
Expand Down Expand Up @@ -1263,7 +1252,6 @@ func TestHandler_AtxWeight(t *testing.T) {

peer := p2p.Peer("buddy")
proofRef := atx1.GetPoetProofRef()
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef})
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef)
Expand Down Expand Up @@ -1304,7 +1292,6 @@ func TestHandler_AtxWeight(t *testing.T) {
require.NoError(t, err)

proofRef = atx2.GetPoetProofRef()
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, gomock.Any()).Do(
func(_ p2p.Peer, got []types.Hash32) {
Expand Down Expand Up @@ -1361,7 +1348,6 @@ func TestHandler_WrongHash(t *testing.T) {

peer := p2p.Peer("buddy")
proofRef := atx.GetPoetProofRef()
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef})
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef)
Expand Down
15 changes: 3 additions & 12 deletions beacon/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg
return errMalformedMessage
}

latency := receivedTime.Sub(pd.msgTimes.proposalSendTime(m.EpochID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, latency)

if !pd.isProposalTimely(&m, receivedTime) {
logger.With().Debug("proposal too early", m.EpochID, log.Time("received_at", receivedTime))
return errUntimelyMessage
Expand Down Expand Up @@ -94,6 +91,7 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg
logger.With().Debug("malicious miner proposal potentially valid", log.Stringer("smesher", m.NodeID))
cat = potentiallyValid
}
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, time.Since(pd.msgTimes.proposalSendTime(m.EpochID)))
return pd.addProposal(m, cat)
}

Expand Down Expand Up @@ -223,17 +221,12 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m
logger := pd.logger.WithContext(ctx).WithFields(types.FirstRound, log.Stringer("sender", peer))
logger.Debug("new first votes")

receivedTime := time.Now()

var m FirstVotingMessage
if err := codec.Decode(msg, &m); err != nil {
logger.With().Warning("received invalid first votes", log.Err(err))
return errMalformedMessage
}

latency := receivedTime.Sub(pd.msgTimes.firstVoteSendTime(m.EpochID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, latency)

currentEpoch := pd.currentEpoch()
if m.EpochID != currentEpoch {
logger.With().Debug("first votes from different epoch",
Expand All @@ -257,6 +250,7 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m
}

logger.Debug("received first voting message, storing its votes")
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, time.Since(pd.msgTimes.firstVoteSendTime(m.EpochID)))
return pd.storeFirstVotes(m, minerPK)
}

Expand Down Expand Up @@ -342,9 +336,6 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee
return errEpochNotActive
}

latency := receivedTime.Sub(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, latency)

// don't accept votes from future rounds
if !pd.isVoteTimely(&m, receivedTime) {
logger.With().Debug("following votes too early", m.RoundID, log.Time("received_at", receivedTime))
Expand All @@ -356,12 +347,12 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee
return err
}

metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, time.Since(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID)))
logger.Debug("received following voting message, counting its votes")
if err = pd.storeFollowingVotes(m, nodeID); err != nil {
logger.With().Warning("failed to store following votes", log.Err(err))
return err
}

return nil
}

Expand Down
5 changes: 1 addition & 4 deletions beacon/weakcoin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

// HandleProposal defines method to handle Beacon Weak Coin Messages from gossip.
func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byte) error {
receivedTime := time.Now()
logger := wc.logger.WithContext(ctx)

var message Message
Expand All @@ -25,9 +24,6 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt
return pubsub.ErrValidationReject
}

latency := receivedTime.Sub(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, latency)

if err := wc.receiveMessage(ctx, message); err != nil {
if !errors.Is(err, errNotSmallest) {
logger.With().Debug("invalid proposal",
Expand All @@ -39,6 +35,7 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt
}
return err
}
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, time.Since(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round)))
return nil
}

Expand Down
1 change: 0 additions & 1 deletion checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ func validateAndPreserveData(tb testing.TB, db *sql.Database, deps []*types.Veri
for i, vatx := range deps {
encoded, err := codec.Encode(vatx)
require.NoError(tb, err)
mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
mclock.EXPECT().CurrentLayer().Return(vatx.PublishEpoch.FirstLayer())
mfetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
mfetch.EXPECT().GetPoetProof(gomock.Any(), vatx.GetPoetProofRef())
Expand Down
5 changes: 1 addition & 4 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (h *Handler) HandleProposal(ctx context.Context, peer p2p.Peer, data []byte

// HandleProposal is the gossip receiver for Proposal.
func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error {
receivedTime := time.Now()
logger := h.logger.WithContext(ctx)

t0 := time.Now()
Expand All @@ -268,9 +267,6 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
return fmt.Errorf("proposal from future: %d/%s", p.Layer, p.ID().String())
}

latency := receivedTime.Sub(h.clock.LayerToTime(p.Layer))
metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, latency)

if !h.edVerifier.Verify(signing.PROPOSAL, p.SmesherID, p.SignedBytes(), p.Signature) {
badSigProposal.Inc()
return fmt.Errorf("failed to verify proposal signature")
Expand Down Expand Up @@ -367,6 +363,7 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
}
return errMaliciousBallot
}
metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, time.Since(h.clock.LayerToTime(p.Layer)))
return nil
}

Expand Down