Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync: new protocol to fetch layer hash and certificate #4846

Closed
4 changes: 4 additions & 0 deletions cmd/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func EnsureCLIFlags(cmd *cobra.Command, appCFG *config.Config) error {
elem = reflect.ValueOf(&appCFG.Recovery).Elem()
assignFields(ff, elem, name)

ff = reflect.TypeOf(appCFG.Sync)
elem = reflect.ValueOf(&appCFG.Sync).Elem()
assignFields(ff, elem, name)

ff = reflect.TypeOf(appCFG.TestConfig)
elem = reflect.ValueOf(&appCFG.TestConfig).Elem()
assignFields(ff, elem, name)
Expand Down
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func AddCommands(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&cfg.TestConfig.SmesherKey, "testing-smesher-key",
"", "import private smesher key for testing",
)
cmd.PersistentFlags().Uint32Var(&cfg.Sync.UpdateLayer, "sync-update-layer",
cfg.Sync.UpdateLayer, "the layer at which sync protocol is updated",
)

// Bind Flags to config
err := viper.BindPFlags(cmd.PersistentFlags())
Expand Down
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func MainnetConfig() Config {
Interval: time.Minute,
EpochEndFraction: 0.8,
MaxStaleDuration: time.Hour,
UpdateLayer: 10000000,
countvonzero marked this conversation as resolved.
Show resolved Hide resolved
Standalone: false,
},
Recovery: checkpoint.DefaultConfig(),
Expand Down
5 changes: 5 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
meshHashProtocol = "mh/1"
malProtocol = "ml/1"

lyrOpnsProtocol2 = "lp/2"
certProtocol = "ct/1"

cacheSize = 1000
)

Expand Down Expand Up @@ -194,6 +197,8 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[lyrOpnsProtocol2] = server.New(host, lyrOpnsProtocol2, h.handleLayerOpinionsReq2, srvOpts...)
f.servers[certProtocol] = server.New(host, certProtocol, h.handleCertReq, srvOpts...)
}
return f
}
Expand Down
6 changes: 6 additions & 0 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type testFetch struct {
mOpnS *mocks.Mockrequester
mHashS *mocks.Mockrequester
mMHashS *mocks.Mockrequester
mOpn2S *mocks.Mockrequester
mCertS *mocks.Mockrequester

mMesh *mocks.MockmeshProvider
mMalH *mocks.MockSyncValidator
Expand All @@ -54,6 +56,8 @@ func createFetch(tb testing.TB) *testFetch {
mOpnS: mocks.NewMockrequester(ctrl),
mHashS: mocks.NewMockrequester(ctrl),
mMHashS: mocks.NewMockrequester(ctrl),
mOpn2S: mocks.NewMockrequester(ctrl),
mCertS: mocks.NewMockrequester(ctrl),
mMalH: mocks.NewMockSyncValidator(ctrl),
mAtxH: mocks.NewMockSyncValidator(ctrl),
mBallotH: mocks.NewMockSyncValidator(ctrl),
Expand Down Expand Up @@ -83,6 +87,8 @@ func createFetch(tb testing.TB) *testFetch {
lyrOpnsProtocol: tf.mOpnS,
hashProtocol: tf.mHashS,
meshHashProtocol: tf.mMHashS,
lyrOpnsProtocol2: tf.mOpn2S,
certProtocol: tf.mCertS,
}),
withHost(tf.mh))
tf.Fetch.SetValidators(tf.mAtxH, tf.mPoetH, tf.mBallotH, tf.mBlocksH, tf.mProposalH, tf.mTxBlocksH, tf.mTxProposalH, tf.mMalH)
Expand Down
104 changes: 83 additions & 21 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@
func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ []byte) ([]byte, error) {
nodes, err := identities.GetMalicious(h.cdb)
if err != nil {
h.logger.WithContext(ctx).With().Warning("failed to get malicious IDs", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get malicious IDs", log.Err(err))

Check warning on line 42 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L42

Added line #L42 was not covered by tests
return nil, err
}
h.logger.WithContext(ctx).With().Debug("responded to malicious IDs request", log.Int("num_malicious", len(nodes)))
h.logger.WithContext(ctx).With().Debug("serve: responded to malicious IDs request", log.Int("num_malicious", len(nodes)))
malicious := &MaliciousIDs{
NodeIDs: nodes,
}
data, err := codec.Encode(malicious)
if err != nil {
h.logger.With().Fatal("failed to encode malicious IDs", log.Err(err))
h.logger.With().Fatal("serve: failed to encode malicious IDs", log.Err(err))

Check warning on line 51 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L51

Added line #L51 was not covered by tests
}
return data, nil
}
Expand All @@ -61,18 +61,18 @@
}
atxids, err := atxs.GetIDsByEpoch(h.cdb, epoch)
if err != nil {
h.logger.WithContext(ctx).With().Warning("failed to get epoch atx IDs", epoch, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get epoch atx IDs", epoch, log.Err(err))

Check warning on line 64 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L64

Added line #L64 was not covered by tests
return nil, err
}
ed := EpochData{
AtxIDs: atxids,
}
h.logger.WithContext(ctx).With().Debug("responded to epoch info request",
h.logger.WithContext(ctx).With().Debug("serve: responded to epoch info request",
epoch,
log.Int("atx_count", len(ed.AtxIDs)))
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize epoch atx", epoch, log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize epoch atx", epoch, log.Err(err))

Check warning on line 75 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L75

Added line #L75 was not covered by tests
}
return bts, nil
}
Expand All @@ -89,19 +89,20 @@
}
ld.Ballots, err = ballots.IDsInLayer(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("failed to get layer ballots", lid, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get layer ballots", lid, log.Err(err))

Check warning on line 92 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L92

Added line #L92 was not covered by tests
return nil, err
}

out, err := codec.Encode(&ld)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize layer data response", log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer data response", log.Err(err))

Check warning on line 98 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L98

Added line #L98 was not covered by tests
}
return out, nil
}

// handleLayerOpinionsReq returns the opinions on data in the specified layer, described in LayerOpinion.
func (h *handler) handleLayerOpinionsReq(ctx context.Context, req []byte) ([]byte, error) {
opnReqV1.Inc()
var (
lid types.LayerID
lo LayerOpinion
Expand All @@ -113,13 +114,13 @@
}
lo.PrevAggHash, err = layers.GetAggregatedHash(h.cdb, lid.Sub(1))
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("failed to get prev agg hash", lid, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get prev agg hash", lid, log.Err(err))

Check warning on line 117 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L117

Added line #L117 was not covered by tests
return nil, err
}

certs, err := certificates.Get(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("failed to get certificate", lid, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get certificate", lid, log.Err(err))

Check warning on line 123 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L123

Added line #L123 was not covered by tests
return nil, err
}
if err == nil {
Expand All @@ -139,15 +140,76 @@

out, err = codec.Encode(&lo)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize layer opinions response", log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode layer opinions response", log.Err(err))

Check warning on line 143 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L143

Added line #L143 was not covered by tests
}
return out, nil
}

func (h *handler) handleLayerOpinionsReq2(ctx context.Context, req []byte) ([]byte, error) {
opnReqV2.Inc()
var (
lid types.LayerID
lo LayerOpinion2
out []byte
err error
)
if err := codec.Decode(req, &lid); err != nil {
return nil, err
}

Check warning on line 158 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L157-L158

Added lines #L157 - L158 were not covered by tests
lo.PrevAggHash, err = layers.GetAggregatedHash(h.cdb, lid.Sub(1))
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("serve: failed to get prev agg hash", lid, log.Err(err))
return nil, err
}

Check warning on line 163 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L161-L163

Added lines #L161 - L163 were not covered by tests
bid, err := certificates.CertifiedBlock(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("serve: failed to get layer certified block", lid, log.Err(err))
return nil, err
}

Check warning on line 168 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L166-L168

Added lines #L166 - L168 were not covered by tests
if err == nil {
lo.Certified = true
lo.CertBlock = bid
}
out, err = codec.Encode(&lo)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer opinions response", log.Err(err))
}

Check warning on line 176 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L175-L176

Added lines #L175 - L176 were not covered by tests
return out, nil
}

func (h *handler) handleCertReq(ctx context.Context, req []byte) ([]byte, error) {
certReq.Inc()
var (
cr CertRequest
out []byte
err error
)
if err := codec.Decode(req, &cr); err != nil {
return nil, err
}

Check warning on line 189 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L188-L189

Added lines #L188 - L189 were not covered by tests
certs, err := certificates.Get(h.cdb, cr.Layer)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("serve: failed to get certificate", cr.Layer, log.Err(err))
return nil, err
}

Check warning on line 194 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L192-L194

Added lines #L192 - L194 were not covered by tests
if err == nil {
for _, cert := range certs {
if cert.Block == cr.Block {
out, err = codec.Encode(cert.Cert)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode cert", log.Err(err))
}

Check warning on line 201 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L200-L201

Added lines #L200 - L201 were not covered by tests
return out, nil
}
}
}
return out, err
}

func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error) {
var requestBatch RequestBatch
if err := codec.Decode(data, &requestBatch); err != nil {
h.logger.WithContext(ctx).With().Warning("failed to parse request", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to parse request", log.Err(err))

Check warning on line 212 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L212

Added line #L212 was not covered by tests
return nil, errBadRequest
}
resBatch := ResponseBatch{
Expand All @@ -160,20 +222,20 @@
totalHashReqs.WithLabelValues(string(r.Hint)).Add(1)
res, err := h.bs.Get(r.Hint, r.Hash.Bytes())
if err != nil {
h.logger.WithContext(ctx).With().Debug("remote peer requested nonexistent hash",
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested nonexistent hash",

Check warning on line 225 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L225

Added line #L225 was not covered by tests
log.String("hash", r.Hash.ShortString()),
log.String("hint", string(r.Hint)),
log.Err(err))
hashMissing.WithLabelValues(string(r.Hint)).Add(1)
continue
} else if res == nil {
h.logger.WithContext(ctx).With().Debug("remote peer requested golden",
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested golden",

Check warning on line 232 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L232

Added line #L232 was not covered by tests
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
hashEmptyData.WithLabelValues(string(r.Hint)).Add(1)
continue
} else {
h.logger.WithContext(ctx).With().Debug("responded to hash request",
h.logger.WithContext(ctx).With().Debug("serve: responded to hash request",

Check warning on line 238 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L238

Added line #L238 was not covered by tests
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
}
Expand All @@ -187,11 +249,11 @@

bts, err := codec.Encode(&resBatch)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize batch id",
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode batch id",

Check warning on line 252 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L252

Added line #L252 was not covered by tests
log.Err(err),
log.String("batch_hash", resBatch.ID.ShortString()))
}
h.logger.WithContext(ctx).With().Debug("returning response for batch",
h.logger.WithContext(ctx).With().Debug("serve: returning response for batch",

Check warning on line 256 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L256

Added line #L256 was not covered by tests
log.String("batch_hash", resBatch.ID.ShortString()),
log.Int("count_responses", len(resBatch.Responses)),
log.Int("data_size", len(bts)))
Expand All @@ -206,7 +268,7 @@
err error
)
if err = codec.Decode(reqData, &req); err != nil {
h.logger.WithContext(ctx).With().Warning("failed to parse mesh hash request", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to parse mesh hash request", log.Err(err))

Check warning on line 271 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L271

Added line #L271 was not covered by tests
return nil, errBadRequest
}
if err := req.Validate(); err != nil {
Expand All @@ -215,14 +277,14 @@
}
hashes, err = layers.GetAggHashes(h.cdb, req.From, req.To, req.Step)
if err != nil {
h.logger.WithContext(ctx).With().Warning("failed to get mesh hashes", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get mesh hashes", log.Err(err))
return nil, err
}
data, err = codec.EncodeSlice(hashes)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize hashes", log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode hashes", log.Err(err))

Check warning on line 285 in fetch/handler.go

View check run for this annotation

Codecov / codecov/patch

fetch/handler.go#L285

Added line #L285 was not covered by tests
}
h.logger.WithContext(ctx).With().Debug("returning response for mesh hashes",
h.logger.WithContext(ctx).With().Debug("serve: returning response for mesh hashes",
log.Stringer("layer_from", req.From),
log.Stringer("layer_to", req.To),
log.Uint32("by", req.Step),
Expand Down
81 changes: 81 additions & 0 deletions fetch/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,87 @@ func TestHandleLayerOpinionsReq(t *testing.T) {
}
}

func TestHandleLayerOpinionsReq2(t *testing.T) {
tt := []struct {
name string
missingCert, multipleCerts bool
}{
{
name: "all good",
},
{
name: "cert missing",
missingCert: true,
},
{
name: "multiple certs",
multipleCerts: true,
},
}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

th := createTestHandler(t)
lid := types.LayerID(111)
certified, aggHash := createOpinions(t, th.cdb, lid, !tc.missingCert)
if tc.multipleCerts {
bid := types.RandomBlockID()
require.NoError(t, certificates.Add(th.cdb, lid, &types.Certificate{
BlockID: bid,
}))
require.NoError(t, certificates.SetInvalid(th.cdb, lid, bid))
}

lidBytes, err := codec.Encode(&lid)
require.NoError(t, err)

out, err := th.handleLayerOpinionsReq2(context.Background(), lidBytes)
require.NoError(t, err)

var got LayerOpinion2
err = codec.Decode(out, &got)
require.NoError(t, err)
require.Equal(t, aggHash, got.PrevAggHash)
if tc.missingCert {
require.False(t, got.Certified)
require.Equal(t, types.EmptyBlockID, got.CertBlock)
} else {
require.True(t, got.Certified)
require.Equal(t, certified, got.CertBlock)
}
})
}
}

func TestHandleCertReq(t *testing.T) {
th := createTestHandler(t)
lid := types.LayerID(111)
bid := types.RandomBlockID()
req := &CertRequest{
Layer: lid,
Block: bid,
}
reqData, err := codec.Encode(req)
require.NoError(t, err)

resp, err := th.handleCertReq(context.Background(), reqData)
require.ErrorIs(t, err, sql.ErrNotFound)
require.Nil(t, resp)

cert := &types.Certificate{BlockID: bid}
require.NoError(t, certificates.Add(th.cdb, lid, cert))

resp, err = th.handleCertReq(context.Background(), reqData)
require.NoError(t, err)
require.NotNil(t, resp)
var got types.Certificate
require.NoError(t, codec.Decode(resp, &got))
require.Equal(t, *cert, got)
}

func TestHandleMeshHashReq(t *testing.T) {
tt := []struct {
name string
Expand Down