Skip to content

Commit

Permalink
don't track latency for messages that are not gossipped futher (#5064)
Browse files Browse the repository at this point in the history
this is mainly to prevent duplicates from skewing results. 
hare1 is not fixed in this change
latency metering is dropped for atx protocol, as we don't have good reference for it

closes: #4623
  • Loading branch information
dshulyak committed Sep 28, 2023
1 parent fe23c05 commit 56b4794
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 26 deletions.
6 changes: 0 additions & 6 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/metrics"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/signing"
Expand Down Expand Up @@ -509,11 +508,6 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
return fmt.Errorf("%w: %w", errMalformedData, err)
}

epochStart := h.clock.LayerToTime(atx.PublishEpoch.FirstLayer())
poetRoundEnd := epochStart.Add(h.poetCfg.PhaseShift - h.poetCfg.CycleGap)
latency := receivedTime.Sub(poetRoundEnd)
metrics.ReportMessageLatency(pubsub.AtxProtocol, pubsub.AtxProtocol, latency)

atx.SetReceived(receivedTime.Local())
if err := atx.Initialize(); err != nil {
return fmt.Errorf("failed to derive ID from atx: %w", err)
Expand Down
15 changes: 3 additions & 12 deletions beacon/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg
return errMalformedMessage
}

latency := receivedTime.Sub(pd.msgTimes.proposalSendTime(m.EpochID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, latency)

if !pd.isProposalTimely(&m, receivedTime) {
logger.With().Debug("proposal too early", m.EpochID, log.Time("received_at", receivedTime))
return errUntimelyMessage
Expand Down Expand Up @@ -94,6 +91,7 @@ func (pd *ProtocolDriver) HandleProposal(ctx context.Context, peer p2p.Peer, msg
logger.With().Debug("malicious miner proposal potentially valid", log.Stringer("smesher", m.NodeID))
cat = potentiallyValid
}
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconProposalProtocol, time.Since(pd.msgTimes.proposalSendTime(m.EpochID)))
return pd.addProposal(m, cat)
}

Expand Down Expand Up @@ -223,17 +221,12 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m
logger := pd.logger.WithContext(ctx).WithFields(types.FirstRound, log.Stringer("sender", peer))
logger.Debug("new first votes")

receivedTime := time.Now()

var m FirstVotingMessage
if err := codec.Decode(msg, &m); err != nil {
logger.With().Warning("received invalid first votes", log.Err(err))
return errMalformedMessage
}

latency := receivedTime.Sub(pd.msgTimes.firstVoteSendTime(m.EpochID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, latency)

currentEpoch := pd.currentEpoch()
if m.EpochID != currentEpoch {
logger.With().Debug("first votes from different epoch",
Expand All @@ -257,6 +250,7 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m
}

logger.Debug("received first voting message, storing its votes")
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFirstVotesProtocol, time.Since(pd.msgTimes.firstVoteSendTime(m.EpochID)))
return pd.storeFirstVotes(m, minerPK)
}

Expand Down Expand Up @@ -342,9 +336,6 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee
return errEpochNotActive
}

latency := receivedTime.Sub(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, latency)

// don't accept votes from future rounds
if !pd.isVoteTimely(&m, receivedTime) {
logger.With().Debug("following votes too early", m.RoundID, log.Time("received_at", receivedTime))
Expand All @@ -356,12 +347,12 @@ func (pd *ProtocolDriver) HandleFollowingVotes(ctx context.Context, peer p2p.Pee
return err
}

metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconFollowingVotesProtocol, time.Since(pd.msgTimes.followupVoteSendTime(m.EpochID, m.RoundID)))
logger.Debug("received following voting message, counting its votes")
if err = pd.storeFollowingVotes(m, nodeID); err != nil {
logger.With().Warning("failed to store following votes", log.Err(err))
return err
}

return nil
}

Expand Down
5 changes: 1 addition & 4 deletions beacon/weakcoin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

// HandleProposal defines method to handle Beacon Weak Coin Messages from gossip.
func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byte) error {
receivedTime := time.Now()
logger := wc.logger.WithContext(ctx)

var message Message
Expand All @@ -25,9 +24,6 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt
return pubsub.ErrValidationReject
}

latency := receivedTime.Sub(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round))
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, latency)

if err := wc.receiveMessage(ctx, message); err != nil {
if !errors.Is(err, errNotSmallest) {
logger.With().Debug("invalid proposal",
Expand All @@ -39,6 +35,7 @@ func (wc *WeakCoin) HandleProposal(ctx context.Context, peer p2p.Peer, msg []byt
}
return err
}
metrics.ReportMessageLatency(pubsub.BeaconProtocol, pubsub.BeaconWeakCoinProtocol, time.Since(wc.msgTime.WeakCoinProposalSendTime(message.Epoch, message.Round)))
return nil
}

Expand Down
5 changes: 1 addition & 4 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (h *Handler) HandleProposal(ctx context.Context, peer p2p.Peer, data []byte

// HandleProposal is the gossip receiver for Proposal.
func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error {
receivedTime := time.Now()
logger := h.logger.WithContext(ctx)

t0 := time.Now()
Expand All @@ -268,9 +267,6 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
return fmt.Errorf("proposal from future: %d/%s", p.Layer, p.ID().String())
}

latency := receivedTime.Sub(h.clock.LayerToTime(p.Layer))
metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, latency)

if !h.edVerifier.Verify(signing.PROPOSAL, p.SmesherID, p.SignedBytes(), p.Signature) {
badSigProposal.Inc()
return fmt.Errorf("failed to verify proposal signature")
Expand Down Expand Up @@ -367,6 +363,7 @@ func (h *Handler) handleProposal(ctx context.Context, expHash types.Hash32, peer
}
return errMaliciousBallot
}
metrics.ReportMessageLatency(pubsub.ProposalProtocol, pubsub.ProposalProtocol, time.Since(h.clock.LayerToTime(p.Layer)))
return nil
}

Expand Down

0 comments on commit 56b4794

Please sign in to comment.