Skip to content

Commit

Permalink
Merge branch 'develop' into add-index-for-first-in-epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Sep 19, 2023
2 parents 2f4dcf9 + 2974c02 commit dc3f2cd
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 70 deletions.
37 changes: 15 additions & 22 deletions hare3/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,24 +381,24 @@ func (h *Hare) run(layer types.LayerID, beacon types.Beacon, proto *protocol) er
if err := h.onOutput(layer, current, proto.Next(vrf != nil), vrf); err != nil {
return err
}
walltime = walltime.Add(h.config.RoundDuration)
result := false
for {
walltime = walltime.Add(h.config.RoundDuration)
current := proto.IterRound
var vrf *types.HareEligibility
if current.IsMessageRound() {
start := time.Now()
vrf = h.oracle.active(h.signer.NodeID(), layer, current)
activeLatency.Observe(time.Since(start).Seconds())
}
h.tracer.OnActive(vrf)
select {
case <-h.wallclock.After(walltime.Sub(h.wallclock.Now())):
h.log.Debug("execute round",
zap.Uint32("lid", layer.Uint32()),
zap.Uint8("iter", proto.Iter), zap.Stringer("round", proto.Round),
zap.Bool("active", vrf != nil),
)
current := proto.IterRound
var vrf *types.HareEligibility
if current.IsMessageRound() {
start := time.Now()
vrf = h.oracle.active(h.signer.NodeID(), layer, current)
activeLatency.Observe(time.Since(start).Seconds())
}
h.tracer.OnActive(vrf)

out := proto.Next(vrf != nil)
if out.result != nil {
result = true
Expand All @@ -416,7 +416,6 @@ func (h *Hare) run(layer types.LayerID, beacon types.Beacon, proto *protocol) er
return fmt.Errorf("hare failed to reach consensus in %d iterations",
h.config.IterationsLimit)
}
walltime = walltime.Add(h.config.RoundDuration)
case <-h.ctx.Done():
return nil
}
Expand All @@ -428,23 +427,17 @@ func (h *Hare) onOutput(layer types.LayerID, ir IterRound, out output, vrf *type
out.message.Layer = layer
out.message.Eligibility = *vrf
out.message.Sender = h.signer.NodeID()
out.message.Signature = h.signer.Sign(signing.HARE, out.message.ToMetadata().ToBytes())
if err := h.pubsub.Publish(h.ctx, h.config.ProtocolName, out.message.ToBytes()); err != nil {
h.log.Error("failed to publish", zap.Inline(out.message), zap.Error(err))
}

Check warning on line 433 in hare3/hare.go

View check run for this annotation

Codecov / codecov/patch

hare3/hare.go#L432-L433

Added lines #L432 - L433 were not covered by tests
}
h.log.Debug("round output",
zap.Uint32("lid", layer.Uint32()),
zap.Uint8("iter", ir.Iter), zap.Stringer("round", ir.Round),
zap.Inline(&out),
zap.Bool("active", vrf != nil),
)
if out.message != nil {
h.eg.Go(func() error {
out.message.Signature = h.signer.Sign(signing.HARE, out.message.ToMetadata().ToBytes())
if err := h.pubsub.Publish(h.ctx, h.config.ProtocolName, out.message.ToBytes()); err != nil {
h.log.Error("failed to publish", zap.Inline(out.message), zap.Error(err))
}
h.tracer.OnMessageSent(out.message)
return nil
})
}
h.tracer.OnMessageSent(out.message)
if out.coin != nil {
select {
case <-h.ctx.Done():
Expand Down
88 changes: 40 additions & 48 deletions hare3/hare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ type lockstepCluster struct {
}

timestamp time.Time
start chan struct{}
complete chan struct{}
}

func (cl *lockstepCluster) addNode(n *node) {
Expand Down Expand Up @@ -313,15 +311,9 @@ func (cl *lockstepCluster) addEquivocators(n int) *lockstepCluster {
}

func (cl *lockstepCluster) nogossip() {
cl.start = make(chan struct{}, len(cl.nodes))
cl.complete = make(chan struct{}, len(cl.nodes))
for _, n := range cl.nodes {
require.NoError(cl.t, beacons.Add(n.db, cl.t.genesis.GetEpoch()+1, cl.t.beacon))
n.mpublisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, _ string, msg []byte) error {
cl.timedReceive(cl.start)
cl.timedSend(cl.complete)
return nil
}).AnyTimes()
n.mpublisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
}
}

Expand Down Expand Up @@ -380,8 +372,6 @@ func (cl *lockstepCluster) genProposals(lid types.LayerID) {
}

func (cl *lockstepCluster) setup() {
cl.start = make(chan struct{}, len(cl.nodes))
cl.complete = make(chan struct{}, len(cl.nodes))
active := cl.activeSet()
for _, n := range cl.nodes {
require.NoError(cl.t, beacons.Add(n.db, cl.t.genesis.GetEpoch()+1, cl.t.beacon))
Expand All @@ -393,11 +383,9 @@ func (cl *lockstepCluster) setup() {
}
n.oracle.UpdateActiveSet(cl.t.genesis.GetEpoch()+1, active)
n.mpublisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, _ string, msg []byte) error {
cl.timedReceive(cl.start)
for _, other := range cl.nodes {
other.hare.Handler(ctx, "self", msg)
}
cl.timedSend(cl.complete)
return nil
}).AnyTimes()
}
Expand All @@ -411,67 +399,47 @@ func (cl *lockstepCluster) movePreround(layer types.LayerID) {
n.nclock.StartLayer(layer)
n.clock.Advance(cl.timestamp.Sub(n.clock.Now()))
}
send := 0
for _, n := range cl.nodes {
if n.tracer.waitEligibility() != nil {
send++
}
}
for i := 0; i < send; i++ {
cl.timedSend(cl.start)
n.tracer.waitEligibility()
}
for i := 0; i < send; i++ {
cl.timedReceive(cl.complete)
for _, n := range cl.nodes {
n.tracer.waitSent()
}
}

func (cl *lockstepCluster) moveRound() {
cl.timestamp = cl.timestamp.Add(cl.t.cfg.RoundDuration)
send := 0
for _, n := range cl.nodes {
n.clock.Advance(cl.timestamp.Sub(n.clock.Now()))
}
for _, n := range cl.nodes {
if n.tracer.waitEligibility() != nil {
send++
}
}
for i := 0; i < send; i++ {
cl.timedSend(cl.start)
}
for i := 0; i < send; i++ {
cl.timedReceive(cl.complete)
n.tracer.waitEligibility()
}
}

func (cl *lockstepCluster) timedSend(ch chan struct{}) {
select {
case ch <- struct{}{}:
case <-time.After(time.Second):
require.FailNow(cl.t, "send timed out")
for _, n := range cl.nodes {
n.tracer.waitSent()
}
}

func (cl *lockstepCluster) timedReceive(ch chan struct{}) {
select {
case <-ch:
case <-time.After(time.Second):
require.FailNow(cl.t, "receive timed out")
func (cl *lockstepCluster) waitStopped() {
for _, n := range cl.nodes {
n.tracer.waitStopped()
}
}

func newTestTracer(tb testing.TB) *testTracer {
return &testTracer{
TB: tb,
stopped: make(chan types.LayerID, 100),
eligibility: make(chan *types.HareEligibility, 100),
eligibility: make(chan *types.HareEligibility),
sent: make(chan *Message),
}
}

type testTracer struct {
testing.TB
stopped chan types.LayerID
eligibility chan *types.HareEligibility
sent chan *Message
}

func (t *testTracer) waitStopped() types.LayerID {
Expand All @@ -496,6 +464,17 @@ func (t *testTracer) waitEligibility() *types.HareEligibility {
return nil
}

func (t *testTracer) waitSent() *Message {
wait := time.Second
select {
case <-time.After(wait):
require.FailNow(t, "no message", "wait %v", wait)
case m := <-t.sent:
return m
}
return nil
}

func (*testTracer) OnStart(types.LayerID) {}

func (t *testTracer) OnStop(lid types.LayerID) {
Expand All @@ -506,13 +485,22 @@ func (t *testTracer) OnStop(lid types.LayerID) {
}

func (t *testTracer) OnActive(el *types.HareEligibility) {
wait := time.Second
select {
case <-time.After(wait):
require.FailNow(t, "eligibility can't be sent", "wait %v", wait)
case t.eligibility <- el:
default:
}
}

func (*testTracer) OnMessageSent(*Message) {}
func (t *testTracer) OnMessageSent(m *Message) {
wait := time.Second
select {
case <-time.After(wait):
require.FailNow(t, "message can't be sent", "wait %v", wait)
case t.sent <- m:
}
}

func (*testTracer) OnMessageReceived(*Message) {}

Expand Down Expand Up @@ -540,8 +528,8 @@ func testHare(t *testing.T, active, inactive, equivocators int, opts ...clusterO
cluster.moveRound()
}
var consistent []types.ProposalID
cluster.waitStopped()
for _, n := range cluster.nodes {
n.tracer.waitStopped()
select {
case coin := <-n.hare.Coins():
require.Equal(t, coin.Layer, layer)
Expand Down Expand Up @@ -596,6 +584,7 @@ func TestIterationLimit(t *testing.T) {
for i := 0; i < int(tst.cfg.IterationsLimit)*int(notify); i++ {
cluster.moveRound()
}
cluster.waitStopped()
require.Empty(t, cluster.nodes[0].hare.Running())
require.False(t, cluster.nodes[0].patrol.IsHareInCharge(layer))
}
Expand Down Expand Up @@ -630,6 +619,9 @@ func TestHandler(t *testing.T) {
Add(tst.layerDuration * time.Duration(layer)).
Add(tst.cfg.PreroundDelay)).Sub(n.clock.Now()))
elig := n.tracer.waitEligibility()
n.tracer.waitSent()
n.tracer.waitEligibility()

t.Run("malformed", func(t *testing.T) {
require.ErrorIs(t, n.hare.Handler(context.Background(), "", []byte("malformed")),
pubsub.ErrValidationReject)
Expand Down

0 comments on commit dc3f2cd

Please sign in to comment.