Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ccq/p2p with single host #3356

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion devnet/query-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ spec:
- "0xC89Ce4735882C9F0f0FE26686c53074E09B0D550"
# Hardcoded devnet bootstrap (generated from deterministic key in guardiand)
- --bootstrap
- /dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
- /dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
- --logLevel=info
ports:
- containerPort: 6069
Expand Down
1 change: 1 addition & 0 deletions node/cmd/ccq/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
for len(th_req.ListPeers()) < 1 {
time.Sleep(time.Millisecond * 100)
}
logger.Info("Found peers")

// Fetch the initial current guardian set
guardianSet, err := FetchCurrentGuardianSet(ethRpcUrl, ethCoreAddr)
Expand Down
3 changes: 2 additions & 1 deletion node/hack/query/send_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {

p2pNetworkID := "/wormhole/dev"
var p2pPort uint = 8998 // don't collide with spy so we can run from the same container in tilt
p2pBootstrap := "/dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
p2pBootstrap := "/dns4/guardian-0.guardian/udp/8999/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
nodeKeyPath := "./querier.key"

ctx := context.Background()
Expand Down Expand Up @@ -172,6 +172,7 @@ func main() {
for len(th_req.ListPeers()) < 1 {
time.Sleep(time.Millisecond * 100)
}
logger.Info("Done listening for peers")

//
// END SETUP
Expand Down
66 changes: 5 additions & 61 deletions node/pkg/p2p/ccq_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,9 @@ import (

gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"

"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"go.uber.org/zap"
)

Expand All @@ -45,7 +38,6 @@ var (
type ccqP2p struct {
logger *zap.Logger

h host.Host
th_req *pubsub.Topic
th_resp *pubsub.Topic
sub *pubsub.Subscription
Expand Down Expand Up @@ -73,67 +65,23 @@ func newCcqRunP2p(

func (ccq *ccqP2p) run(
ctx context.Context,
priv crypto.PrivKey,
gk *ecdsa.PrivateKey,
h host.Host,
networkID string,
bootstrapPeers string,
port uint,
signedQueryReqC chan<- *gossipv1.SignedQueryRequest,
queryResponseReadC <-chan *query.QueryResponsePublication,
errC chan error,
) error {
var err error

components := DefaultComponents()
if components == nil {
return fmt.Errorf("components is not initialized")
}
components.Port = port

ccq.h, err = libp2p.New(
// Use the keypair we generated
libp2p.Identity(priv),

// Multiple listen addresses
libp2p.ListenAddrStrings(
components.ListeningAddresses()...,
),

// Enable TLS security as the only security protocol.
libp2p.Security(libp2ptls.ID, libp2ptls.New),

// Enable QUIC transport as the only transport.
libp2p.Transport(libp2pquic.NewTransport),

// Let's prevent our peer from having too many
// connections by attaching a connection manager.
libp2p.ConnectionManager(components.ConnMgr), // TODO: Can we use the same connection manager?

// Let this host use the DHT to find other hosts
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
ccq.logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))

bootstrappers, _ := bootstrapAddrs(ccq.logger, bootstrapPeers, h.ID())

// TODO(leo): Persistent data store (i.e. address book)
idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
// This intentionally makes us incompatible with the global IPFS DHT
dht.ProtocolPrefix(protocol.ID("/"+networkID)),
dht.BootstrapPeers(bootstrappers...),
)
return idht, err
}),
)

if err != nil {
return fmt.Errorf("failed to create p2p: %w", err)
if h == nil {
return fmt.Errorf("h is not initialized")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a more descriptive error? Preferably one that doesn't require reading the source and finding the definition of h in the function arg spec?

}

topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")

ccq.logger.Info("Creating pubsub topics", zap.String("request_topic", topic_req), zap.String("response_topic", topic_resp))
ps, err := pubsub.NewGossipSub(ctx, ccq.h,
ps, err := pubsub.NewGossipSub(ctx, h,
// We only want to accept subscribes from peers in the allow list.
pubsub.WithPeerFilter(func(peerID peer.ID, topic string) bool {
if len(ccq.allowedPeers) == 0 {
Expand Down Expand Up @@ -190,7 +138,7 @@ func (ccq *ccqP2p) run(
return ccq.publisher(ctx, gk, queryResponseReadC)
})

ccq.logger.Info("Node has been started", zap.String("peer_id", ccq.h.ID().String()), zap.String("addrs", fmt.Sprintf("%v", ccq.h.Addrs())))
ccq.logger.Info("Node has been started", zap.String("peer_id", h.ID().String()), zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
return nil
}

Expand All @@ -205,10 +153,6 @@ func (ccq *ccqP2p) close() {
}

ccq.sub.Cancel()

if err := ccq.h.Close(); err != nil {
ccq.logger.Error("error closing the host", zap.Error(err))
}
}

func (ccq *ccqP2p) listener(ctx context.Context, signedQueryReqC chan<- *gossipv1.SignedQueryRequest) error {
Expand Down
36 changes: 18 additions & 18 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,27 +327,10 @@ func Run(
}
defer sub.Cancel()

// Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI
// as peer discovery can take a long time).

successes := connectToPeers(ctx, logger, h, bootstrappers)

if successes == 0 && !bootstrapNode { // If we're a bootstrap node it's okay to not have any peers.
// If we fail to connect to any bootstrap peer, kill the service
// returning from this function will lead to rootCtxCancel() being called in the defer() above. The service will then be restarted by Tilt/kubernetes.
return fmt.Errorf("failed to connect to any bootstrap peer")
}
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))

logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))

bootTime := time.Now()

if ccqEnabled {
ccqErrC := make(chan error)
ccq := newCcqRunP2p(logger, ccqAllowedPeers)
if err := ccq.run(ctx, priv, gk, networkID, ccqBootstrapPeers, ccqPort, signedQueryReqC, queryResponseReadC, ccqErrC); err != nil {
if err := ccq.run(ctx, gk, h, networkID, signedQueryReqC, queryResponseReadC, ccqErrC); err != nil {
return fmt.Errorf("failed to start p2p for CCQ: %w", err)
}
defer ccq.close()
Expand All @@ -365,6 +348,23 @@ func Run(
}()
}

// Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI
// as peer discovery can take a long time).

successes := connectToPeers(ctx, logger, h, bootstrappers)

if successes == 0 && !bootstrapNode { // If we're a bootstrap node it's okay to not have any peers.
// If we fail to connect to any bootstrap peer, kill the service
// returning from this function will lead to rootCtxCancel() being called in the defer() above. The service will then be restarted by Tilt/kubernetes.
return fmt.Errorf("failed to connect to any bootstrap peer")
}
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))

logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))

bootTime := time.Now()

// Periodically run guardian state set cleanup.
go func() {
ticker := time.NewTicker(15 * time.Second)
Expand Down