Skip to content

Commit

Permalink
don't track latency for messages that are not gossipped futher (#5064)
Browse files Browse the repository at this point in the history
this is mainly to prevent duplicates from skewing results. 
hare1 is not fixed in this change
latency metering is dropped for atx protocol, as we don't have good reference for it

closes: #4623
  • Loading branch information
dshulyak committed Sep 28, 2023
1 parent fe23c05 commit ed0fe76
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 41 deletions.
6 changes: 0 additions & 6 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/metrics"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/signing"
Expand Down Expand Up @@ -509,11 +508,6 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
return fmt.Errorf("%w: %w", errMalformedData, err)
}

epochStart := h.clock.LayerToTime(atx.PublishEpoch.FirstLayer())
poetRoundEnd := epochStart.Add(h.poetCfg.PhaseShift - h.poetCfg.CycleGap)
latency := receivedTime.Sub(poetRoundEnd)
metrics.ReportMessageLatency(pubsub.AtxProtocol, pubsub.AtxProtocol, latency)

atx.SetReceived(receivedTime.Local())
if err := atx.Initialize(); err != nil {
return fmt.Errorf("failed to derive ID from atx: %w", err)
Expand Down
14 changes: 0 additions & 14 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,15 +924,13 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
require.NoError(t, err)

atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer())
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef()).Return(errors.New("woof"))
err = atxHdlr.HandleGossipAtx(context.Background(), "", secondData)
require.Error(t, err)
require.ErrorContains(t, err, "missing poet proof")

atxHdlr.mclock.EXPECT().CurrentLayer().Return(second.PublishEpoch.FirstLayer())
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), second.GetPoetProofRef())
atxHdlr.mockFetch.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).DoAndReturn(
Expand All @@ -941,7 +939,6 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
data, err := codec.Encode(first)
require.NoError(t, err)
atxHdlr.mclock.EXPECT().CurrentLayer().Return(first.PublishEpoch.FirstLayer())
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mValidator.EXPECT().Post(gomock.Any(), nodeID1, goldenATXID, first.InitialPost, gomock.Any(), first.NumUnits)
atxHdlr.mValidator.EXPECT().VRFNonce(nodeID1, goldenATXID, &vrfNonce, gomock.Any(), first.NumUnits)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
Expand Down Expand Up @@ -981,7 +978,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), fmt.Sprintf("nil nipst for atx %v", atx.ID()))
})

Expand All @@ -999,10 +995,8 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf))

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.Error(t, atxHdlr.HandleGossipAtx(context.Background(), "", buf))
})
t.Run("known atx from local id is allowed", func(t *testing.T) {
Expand All @@ -1018,11 +1012,7 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {

buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf))

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), localID, buf))
})

Expand All @@ -1036,7 +1026,6 @@ func TestHandler_HandleSyncedAtx(t *testing.T) {
buf, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
require.ErrorContains(t, atxHdlr.HandleSyncedAtx(context.Background(), atx.ID().Hash32(), p2p.NoPeer, buf), "failed to verify atx signature")
})
}
Expand Down Expand Up @@ -1263,7 +1252,6 @@ func TestHandler_AtxWeight(t *testing.T) {

peer := p2p.Peer("buddy")
proofRef := atx1.GetPoetProofRef()
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef})
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef)
Expand Down Expand Up @@ -1304,7 +1292,6 @@ func TestHandler_AtxWeight(t *testing.T) {
require.NoError(t, err)

proofRef = atx2.GetPoetProofRef()
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, gomock.Any()).Do(
func(_ p2p.Peer, got []types.Hash32) {
Expand Down Expand Up @@ -1361,7 +1348,6 @@ func TestHandler_WrongHash(t *testing.T) {

peer := p2p.Peer("buddy")
proofRef := atx.GetPoetProofRef()
atxHdlr.mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
atxHdlr.mclock.EXPECT().CurrentLayer().Return(currentLayer)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(peer, []types.Hash32{proofRef})
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), proofRef)
Expand Down
15 changes: 3 additions & 12 deletions beacon/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg
return errMalformedMessage
}

latency := receivedTime.Sub(pd.msgTimes.proposalSendTime(m.EpochID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, latency)

if !pd.isProposalTimely(&m, receivedTime) {
logger.With().Debug("proposal too early", m.EpochID, log.Time("received_at", receivedTime))
return errUntimelyMessage
Expand Down Expand Up @@ -94,6 +91,7 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg
logger.With().Debug("malicious miner proposal potentially valid", log.Stringer("smesher", m.NodeID))
cat = potentiallyValid
}
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, time.Since(pd.msgTimes.proposalSendTime(m.EpochID)))
return pd.addProposal(m, cat)
}

Expand Down Expand Up @@ -223,17 +221,12 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m
logger := pd.logger.WithContext(ctx).WithFields(types.FirstRound, log.Stringer("sender", peer))
logger.Debug("new first votes")

receivedTime := time.Now()

var m FirstVotingMessage
if err := codec.Decode(msg, &m); err != nil {
logger.With().Warning("received invalid first votes", log.Err(err))
return errMalformedMessage
}

latency := receivedTime.Sub(pd.msgTimes.firstVoteSendTime(m.EpochID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, latency)

currentEpoch := pd.currentEpoch()
if m.EpochID != currentEpoch {
logger.With().Debug("first votes from different epoch",
Expand All @@ -257,6 +250,7 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m
}

logger.Debug("received first voting message, storing its votes")
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, time.Since(pd.msgTimes.firstVoteSendTime(m.EpochID)))
return pd.storeFirstVotes(m, minerPK)
}

Expand Down Expand Up @@ -342,9 +336,6 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee
return errEpochNotActive
}

latency := receivedTime.Sub(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, latency)

// don't accept votes from future rounds
if !pd.isVoteTimely(&m, receivedTime) {
logger.With().Debug("following votes too early", m.RoundID, log.Time("received_at", receivedTime))
Expand All @@ -356,12 +347,12 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee
return err
}

metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, time.Since(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID)))
logger.Debug("received following voting message, counting its votes")
if err = pd.storeFollowingVotes(m, nodeID); err != nil {
logger.With().Warning("failed to store following votes", log.Err(err))
return err
}

return nil
}

Expand Down
5 changes: 1 addition & 4 deletions beacon/weakcoin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

// HandleProposal defines method to handle Beacon Weak Coin Messages from gossip.
func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byte) error {
receivedTime := time.Now()
logger := wc.logger.WithContext(ctx)

var message Message
Expand All @@ -25,9 +24,6 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt
return pubsub.ErrValidationReject
}

latency := receivedTime.Sub(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, latency)

if err := wc.receiveMessage(ctx, message); err != nil {
if !errors.Is(err, errNotSmallest) {
logger.With().Debug("invalid proposal",
Expand All @@ -39,6 +35,7 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt
}
return err
}
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, time.Since(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round)))
return nil
}

Expand Down
1 change: 0 additions & 1 deletion checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ func validateAndPreserveData(tb testing.TB, db *sql.Database, deps []*types.Veri
for i, vatx := range deps {
encoded, err := codec.Encode(vatx)
require.NoError(tb, err)
mclock.EXPECT().LayerToTime(gomock.Any()).Return(time.Now())
mclock.EXPECT().CurrentLayer().Return(vatx.PublishEpoch.FirstLayer())
mfetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
mfetch.EXPECT().GetPoetProof(gomock.Any(), vatx.GetPoetProofRef())
Expand Down
5 changes: 1 addition & 4 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (h *Handler) HandleProposal(ctx context.Context, peer p2p.Peer, data []byte

// HandleProposal is the gossip receiver for Proposal.
func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error {
receivedTime := time.Now()
logger := h.logger.WithContext(ctx)

t0 := time.Now()
Expand All @@ -268,9 +267,6 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
return fmt.Errorf("proposal from future: %d/%s", p.Layer, p.ID().String())
}

latency := receivedTime.Sub(h.clock.LayerToTime(p.Layer))
metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, latency)

if !h.edVerifier.Verify(signing.PROPOSAL, p.SmesherID, p.SignedBytes(), p.Signature) {
badSigProposal.Inc()
return fmt.Errorf("failed to verify proposal signature")
Expand Down Expand Up @@ -367,6 +363,7 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
}
return errMaliciousBallot
}
metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, time.Since(h.clock.LayerToTime(p.Layer)))
return nil
}

Expand Down

0 comments on commit ed0fe76

Please sign in to comment.