Skip to content

Commit

Permalink
Try #4846:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Aug 18, 2023
2 parents bfa734e + 6adca45 commit 1f19dff
Show file tree
Hide file tree
Showing 30 changed files with 1,415 additions and 126 deletions.
4 changes: 4 additions & 0 deletions cmd/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func EnsureCLIFlags(cmd *cobra.Command, appCFG *config.Config) error {
elem = reflect.ValueOf(&appCFG.Recovery).Elem()
assignFields(ff, elem, name)

ff = reflect.TypeOf(appCFG.Sync)
elem = reflect.ValueOf(&appCFG.Sync).Elem()
assignFields(ff, elem, name)

ff = reflect.TypeOf(appCFG.TestConfig)
elem = reflect.ValueOf(&appCFG.TestConfig).Elem()
assignFields(ff, elem, name)
Expand Down
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func AddCommands(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&cfg.TestConfig.SmesherKey, "testing-smesher-key",
"", "import private smesher key for testing",
)
cmd.PersistentFlags().BoolVar(&cfg.Sync.UseNewProtocol, "use-new-protocol",
cfg.Sync.UseNewProtocol, "use new opinions sync protocol",
)

// Bind Flags to config
err := viper.BindPFlags(cmd.PersistentFlags())
Expand Down
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func MainnetConfig() Config {
Interval: time.Minute,
EpochEndFraction: 0.8,
MaxStaleDuration: time.Hour,
UseNewProtocol: true,
Standalone: false,
},
Recovery: checkpoint.DefaultConfig(),
Expand Down
20 changes: 19 additions & 1 deletion fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
Expand All @@ -28,6 +29,8 @@ const (
meshHashProtocol = "mh/1"
malProtocol = "ml/1"

OpnProtocol = "lp/2"

cacheSize = 1000
)

Expand Down Expand Up @@ -125,6 +128,12 @@ func WithLogger(log log.Log) Option {
}
}

func WithServeNewOpn(serve bool) Option {
return func(f *Fetch) {
f.serveNewOpn = serve
}
}

func withServers(s map[string]requester) Option {
return func(f *Fetch) {
f.servers = s
Expand All @@ -147,6 +156,8 @@ type Fetch struct {
servers map[string]requester
validators *dataValidators

serveNewOpn bool

// unprocessed contains requests that are not processed
unprocessed map[types.Hash32]*request
// ongoing contains requests that have been processed and are waiting for responses
Expand Down Expand Up @@ -187,13 +198,16 @@ func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter,
server.WithLog(f.logger),
}
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
h := newHandler(cdb, bs, msh, b, f.serveNewOpn, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(host, lyrDataProtocol, h.handleLayerDataReq, srvOpts...)
f.servers[lyrOpnsProtocol] = server.New(host, lyrOpnsProtocol, h.handleLayerOpinionsReq, srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
if f.serveNewOpn {
f.servers[OpnProtocol] = server.New(host, OpnProtocol, h.handleLayerOpinionsReq2, srvOpts...)
}
}
return f
}
Expand Down Expand Up @@ -644,3 +658,7 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) {
func (f *Fetch) GetPeers() []p2p.Peer {
return f.host.GetPeers()
}

func (f *Fetch) PeerProtocols(p p2p.Peer) ([]protocol.ID, error) {
return f.host.PeerProtocols(p)
}
3 changes: 3 additions & 0 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type testFetch struct {
mOpnS *mocks.Mockrequester
mHashS *mocks.Mockrequester
mMHashS *mocks.Mockrequester
mOpn2S *mocks.Mockrequester

mMesh *mocks.MockmeshProvider
mMalH *mocks.MockSyncValidator
Expand All @@ -54,6 +55,7 @@ func createFetch(tb testing.TB) *testFetch {
mOpnS: mocks.NewMockrequester(ctrl),
mHashS: mocks.NewMockrequester(ctrl),
mMHashS: mocks.NewMockrequester(ctrl),
mOpn2S: mocks.NewMockrequester(ctrl),
mMalH: mocks.NewMockSyncValidator(ctrl),
mAtxH: mocks.NewMockSyncValidator(ctrl),
mBallotH: mocks.NewMockSyncValidator(ctrl),
Expand Down Expand Up @@ -83,6 +85,7 @@ func createFetch(tb testing.TB) *testFetch {
lyrOpnsProtocol: tf.mOpnS,
hashProtocol: tf.mHashS,
meshHashProtocol: tf.mMHashS,
OpnProtocol: tf.mOpn2S,
}),
withHost(tf.mh))
tf.Fetch.SetValidators(tf.mAtxH, tf.mPoetH, tf.mBallotH, tf.mBlocksH, tf.mProposalH, tf.mTxBlocksH, tf.mTxProposalH, tf.mMalH)
Expand Down
119 changes: 92 additions & 27 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,35 @@ type handler struct {
bs *datastore.BlobStore
msh meshProvider
beacon system.BeaconGetter

serveNewOpn bool
}

func newHandler(cdb *datastore.CachedDB, bs *datastore.BlobStore, m meshProvider, b system.BeaconGetter, lg log.Log) *handler {
func newHandler(cdb *datastore.CachedDB, bs *datastore.BlobStore, m meshProvider, b system.BeaconGetter, newOpn bool, lg log.Log) *handler {
return &handler{
logger: lg,
cdb: cdb,
bs: bs,
msh: m,
beacon: b,
logger: lg,
cdb: cdb,
bs: bs,
msh: m,
beacon: b,
serveNewOpn: newOpn,
}
}

// handleMaliciousIDsReq returns the IDs of all known malicious nodes.
func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ []byte) ([]byte, error) {
nodes, err := identities.GetMalicious(h.cdb)
if err != nil {
h.logger.WithContext(ctx).With().Warning("failed to get malicious IDs", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get malicious IDs", log.Err(err))
return nil, err
}
h.logger.WithContext(ctx).With().Debug("responded to malicious IDs request", log.Int("num_malicious", len(nodes)))
h.logger.WithContext(ctx).With().Debug("serve: responded to malicious IDs request", log.Int("num_malicious", len(nodes)))
malicious := &MaliciousIDs{
NodeIDs: nodes,
}
data, err := codec.Encode(malicious)
if err != nil {
h.logger.With().Fatal("failed to encode malicious IDs", log.Err(err))
h.logger.With().Fatal("serve: failed to encode malicious IDs", log.Err(err))
}
return data, nil
}
Expand All @@ -61,18 +64,18 @@ func (h *handler) handleEpochInfoReq(ctx context.Context, msg []byte) ([]byte, e
}
atxids, err := atxs.GetIDsByEpoch(h.cdb, epoch)
if err != nil {
h.logger.WithContext(ctx).With().Warning("failed to get epoch atx IDs", epoch, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get epoch atx IDs", epoch, log.Err(err))
return nil, err
}
ed := EpochData{
AtxIDs: atxids,
}
h.logger.WithContext(ctx).With().Debug("responded to epoch info request",
h.logger.WithContext(ctx).With().Debug("serve: responded to epoch info request",
epoch,
log.Int("atx_count", len(ed.AtxIDs)))
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize epoch atx", epoch, log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize epoch atx", epoch, log.Err(err))
}
return bts, nil
}
Expand All @@ -89,19 +92,20 @@ func (h *handler) handleLayerDataReq(ctx context.Context, req []byte) ([]byte, e
}
ld.Ballots, err = ballots.IDsInLayer(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("failed to get layer ballots", lid, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get layer ballots", lid, log.Err(err))
return nil, err
}

out, err := codec.Encode(&ld)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize layer data response", log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer data response", log.Err(err))
}
return out, nil
}

// handleLayerOpinionsReq returns the opinions on data in the specified layer, described in LayerOpinion.
func (h *handler) handleLayerOpinionsReq(ctx context.Context, req []byte) ([]byte, error) {
opnReqV1.Inc()
var (
lid types.LayerID
lo LayerOpinion
Expand All @@ -113,13 +117,13 @@ func (h *handler) handleLayerOpinionsReq(ctx context.Context, req []byte) ([]byt
}
lo.PrevAggHash, err = layers.GetAggregatedHash(h.cdb, lid.Sub(1))
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("failed to get prev agg hash", lid, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get prev agg hash", lid, log.Err(err))
return nil, err
}

certs, err := certificates.Get(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("failed to get certificate", lid, log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get certificate", lid, log.Err(err))
return nil, err
}
if err == nil {
Expand All @@ -139,15 +143,76 @@ func (h *handler) handleLayerOpinionsReq(ctx context.Context, req []byte) ([]byt

out, err = codec.Encode(&lo)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize layer opinions response", log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode layer opinions response", log.Err(err))
}
return out, nil
}

func (h *handler) handleLayerOpinionsReq2(ctx context.Context, data []byte) ([]byte, error) {
if !h.serveNewOpn {
return nil, errors.New("new opn protocol not supported")
}
var req OpinionRequest
if err := codec.Decode(data, &req); err != nil {
return nil, err
}
if req.Block != nil {
return h.handleCertReq(ctx, req.Layer, *req.Block)
}

var (
lid = req.Layer
lo LayerOpinion2
out []byte
err error
)

opnReqV2.Inc()
lo.PrevAggHash, err = layers.GetAggregatedHash(h.cdb, lid.Sub(1))
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get prev agg hash", lid, log.Err(err))
return nil, err
}
bid, err := certificates.CertifiedBlock(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get layer certified block", lid, log.Err(err))
return nil, err
}
if err == nil {
lo.Certified = &bid
}
out, err = codec.Encode(&lo)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer opinions response", log.Err(err))
}
return out, nil
}

func (h *handler) handleCertReq(ctx context.Context, lid types.LayerID, bid types.BlockID) ([]byte, error) {
certReq.Inc()
certs, err := certificates.Get(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get certificate", lid, log.Err(err))
return nil, err
}
if err == nil {
for _, cert := range certs {
if cert.Block == bid {
out, err := codec.Encode(cert.Cert)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode cert", log.Err(err))
}
return out, nil
}
}
}
return nil, err
}

func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error) {
var requestBatch RequestBatch
if err := codec.Decode(data, &requestBatch); err != nil {
h.logger.WithContext(ctx).With().Warning("failed to parse request", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to parse request", log.Err(err))
return nil, errBadRequest
}
resBatch := ResponseBatch{
Expand All @@ -160,20 +225,20 @@ func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error
totalHashReqs.WithLabelValues(string(r.Hint)).Add(1)
res, err := h.bs.Get(r.Hint, r.Hash.Bytes())
if err != nil {
h.logger.WithContext(ctx).With().Debug("remote peer requested nonexistent hash",
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested nonexistent hash",
log.String("hash", r.Hash.ShortString()),
log.String("hint", string(r.Hint)),
log.Err(err))
hashMissing.WithLabelValues(string(r.Hint)).Add(1)
continue
} else if res == nil {
h.logger.WithContext(ctx).With().Debug("remote peer requested golden",
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested golden",
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
hashEmptyData.WithLabelValues(string(r.Hint)).Add(1)
continue
} else {
h.logger.WithContext(ctx).With().Debug("responded to hash request",
h.logger.WithContext(ctx).With().Debug("serve: responded to hash request",
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
}
Expand All @@ -187,11 +252,11 @@ func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error

bts, err := codec.Encode(&resBatch)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize batch id",
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode batch id",
log.Err(err),
log.String("batch_hash", resBatch.ID.ShortString()))
}
h.logger.WithContext(ctx).With().Debug("returning response for batch",
h.logger.WithContext(ctx).With().Debug("serve: returning response for batch",
log.String("batch_hash", resBatch.ID.ShortString()),
log.Int("count_responses", len(resBatch.Responses)),
log.Int("data_size", len(bts)))
Expand All @@ -206,7 +271,7 @@ func (h *handler) handleMeshHashReq(ctx context.Context, reqData []byte) ([]byte
err error
)
if err = codec.Decode(reqData, &req); err != nil {
h.logger.WithContext(ctx).With().Warning("failed to parse mesh hash request", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to parse mesh hash request", log.Err(err))
return nil, errBadRequest
}
if err := req.Validate(); err != nil {
Expand All @@ -215,14 +280,14 @@ func (h *handler) handleMeshHashReq(ctx context.Context, reqData []byte) ([]byte
}
hashes, err = layers.GetAggHashes(h.cdb, req.From, req.To, req.Step)
if err != nil {
h.logger.WithContext(ctx).With().Warning("failed to get mesh hashes", log.Err(err))
h.logger.WithContext(ctx).With().Warning("serve: failed to get mesh hashes", log.Err(err))
return nil, err
}
data, err = codec.EncodeSlice(hashes)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("failed to serialize hashes", log.Err(err))
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode hashes", log.Err(err))
}
h.logger.WithContext(ctx).With().Debug("returning response for mesh hashes",
h.logger.WithContext(ctx).With().Debug("serve: returning response for mesh hashes",
log.Stringer("layer_from", req.From),
log.Stringer("layer_to", req.To),
log.Uint32("by", req.Step),
Expand Down

0 comments on commit 1f19dff

Please sign in to comment.