From 92f8a5e7fedff26321ce417f4e49182ed8b86f43 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 9 Aug 2023 16:00:31 +0200 Subject: [PATCH] p2p: move ping handling into pingLoop goroutine (#27887) Moving the response sending there allows tracking all peer goroutines in the peer WaitGroup. --- p2p/peer.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 482e3d5064f19..a1de9c25ddc34 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -108,6 +108,7 @@ type Peer struct { wg sync.WaitGroup protoErr chan error closed chan struct{} + pingRecv chan struct{} disc chan DiscReason // events receives message send / receive events if set @@ -178,6 +179,7 @@ func newPeer(conn *conn, protocols []Protocol) *Peer { protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), log: log.New("id", conn.id, "conn", conn.flags), + pingRecv: make(chan struct{}, 16), } return p } @@ -237,9 +239,11 @@ loop: } func (p *Peer) pingLoop() { - ping := time.NewTimer(pingInterval) defer p.wg.Done() + + ping := time.NewTimer(pingInterval) defer ping.Stop() + for { select { case <-ping.C: @@ -248,6 +252,10 @@ func (p *Peer) pingLoop() { return } ping.Reset(pingInterval) + + case <-p.pingRecv: + SendItems(p.rw, pongMsg) + case <-p.closed: return } @@ -274,7 +282,10 @@ func (p *Peer) handle(msg Msg) error { switch { case msg.Code == pingMsg: msg.Discard() - go SendItems(p.rw, pongMsg) + select { + case p.pingRecv <- struct{}{}: + case <-p.closed: + } case msg.Code == discMsg: var reason [1]DiscReason // This is the last message. We don't need to discard or