Skip to content

Commit

Permalink
Merge pull request #3 from SiaFoundation/nate/syncer-use-zap
Browse files Browse the repository at this point in the history
Use structured log for syncer
  • Loading branch information
n8maninger committed Jan 17, 2024
2 parents 11ea0f8 + 5380b37 commit 6d84936
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ go 1.21
require (
go.etcd.io/bbolt v1.3.8
go.sia.tech/core v0.2.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122
lukechampine.com/frand v1.4.2
)

require (
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
go.sia.tech/mux v1.2.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sys v0.5.0 // indirect
)
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.sia.tech/core v0.1.12 h1:nrq/BvYbTGVLtZu0MHBTExUAP5nfNbcGhaJbuK839gc=
go.sia.tech/core v0.1.12/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q=
go.sia.tech/core v0.2.0 h1:+J/QylNueFmg5kCJCIfwqnCtKKoC/JN5wasPLy85QZI=
go.sia.tech/core v0.2.0/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q=
go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU=
go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 h1:NvGWuYG8dkDHFSKksI1P9faiVJ9rayE6l0+ouWVIDs8=
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
43 changes: 21 additions & 22 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"reflect"
"sync"
Expand All @@ -14,6 +12,7 @@ import (
"go.sia.tech/core/consensus"
"go.sia.tech/core/gateway"
"go.sia.tech/core/types"
"go.uber.org/zap"
"lukechampine.com/frand"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ type config struct {
MaxSendBlocks uint64
PeerDiscoveryInterval time.Duration
SyncInterval time.Duration
Logger *log.Logger
Logger *zap.Logger
}

// An Option modifies a Syncer's configuration.
Expand Down Expand Up @@ -171,7 +170,7 @@ func WithSyncInterval(d time.Duration) Option {

// WithLogger sets the logger used by a Syncer. The default is a logger that
// outputs to io.Discard.
func WithLogger(l *log.Logger) Option {
func WithLogger(l *zap.Logger) Option {
return func(c *config) { c.Logger = l }
}

Expand All @@ -182,7 +181,7 @@ type Syncer struct {
pm PeerStore
header gateway.Header
config config
log *log.Logger // redundant, but convenient
log *zap.Logger // redundant, but convenient

mu sync.Mutex
peers map[string]*gateway.Peer
Expand All @@ -200,7 +199,7 @@ func (h *rpcHandler) resync(p *gateway.Peer, reason string) {
h.s.synced[p.Addr] = false
h.s.mu.Unlock()
if !alreadyResyncing {
h.s.log.Printf("triggering resync with %v: %v", p, reason)
h.s.log.Debug("resync triggered", zap.String("peer", p.Addr), zap.String("reason", reason))
}
}

Expand Down Expand Up @@ -278,7 +277,7 @@ func (h *rpcHandler) RelayHeader(bh gateway.BlockHeader, origin *gateway.Peer) {
// request + validate full block
if b, err := origin.SendBlock(bh.ID(), h.s.config.SendBlockTimeout); err != nil {
// log-worthy, but not ban-worthy
h.s.log.Printf("couldn't retrieve new block %v after header relay from %v: %v", bh.ID(), origin, err)
h.s.log.Warn("couldn't retrieve new block after header relay", zap.Stringer("header", bh.ID()), zap.Stringer("origin", origin), zap.Error(err))
return
} else if err := h.s.cm.AddBlocks([]types.Block{b}); err != nil {
h.s.ban(origin, err)
Expand All @@ -296,7 +295,7 @@ func (h *rpcHandler) RelayTransactionSet(txns []types.Transaction, origin *gatew
// too risky to ban here (txns are probably just outdated), but at least
// log it if we think we're synced
if b, ok := h.s.cm.Block(h.s.cm.Tip().ID); ok && time.Since(b.Timestamp) < 2*h.s.cm.TipState().BlockInterval() {
h.s.log.Printf("received an invalid transaction set from %v: %v", origin, err)
h.s.log.Debug("invalid transaction set received", zap.Stringer("origin", origin), zap.Error(err))
}
} else {
h.s.relayTransactionSet(txns, origin) // non-blocking
Expand Down Expand Up @@ -361,7 +360,7 @@ func (h *rpcHandler) RelayV2BlockOutline(bo gateway.V2BlockOutline, origin *gate
txns, v2txns, err := origin.SendTransactions(index, missing, h.s.config.SendTransactionsTimeout)
if err != nil {
// log-worthy, but not ban-worthy
h.s.log.Printf("couldn't retrieve missing transactions of %v after relay from %v: %v", bid, origin, err)
h.s.log.Debug("couldn't retrieve missing transactions from peer", zap.Stringer("blockID", bid), zap.Stringer("origin", origin), zap.Error(err))
return
}
b, missing = bo.Complete(cs, txns, v2txns)
Expand Down Expand Up @@ -393,15 +392,15 @@ func (h *rpcHandler) RelayV2TransactionSet(basis types.ChainIndex, txns []types.
h.s.ban(origin, errors.New("peer sent an empty transaction set"))
} else if known, err := h.s.cm.AddV2PoolTransactions(basis, txns); !known {
if err != nil {
h.s.log.Printf("received an invalid transaction set from %v: %v", origin, err)
h.s.log.Debug("received invalid transaction set", zap.Stringer("origin", origin), zap.Error(err))
} else {
h.s.relayV2TransactionSet(basis, txns, origin) // non-blocking
}
}
}

func (s *Syncer) ban(p *gateway.Peer, err error) {
s.log.Printf("banning %v: %v", p, err)
s.log.Debug("banning peer", zap.Stringer("peer", p), zap.Error(err))
p.SetErr(errors.New("banned"))
s.pm.Ban(p.ConnAddr, 24*time.Hour, err.Error())

Expand Down Expand Up @@ -462,7 +461,7 @@ func (s *Syncer) runPeer(p *gateway.Peer) {
// slow, fine; we don't need to worry about resource exhaustion
// unless we have tons of peers.
if err := p.HandleRPC(id, stream, h); err != nil {
s.log.Printf("incoming RPC %v from peer %v failed: %v", id, p, err)
s.log.Debug("rpc failing", zap.Stringer("peer", p), zap.Stringer("rpc", id), zap.Error(err))
}
<-inflight
}()
Expand Down Expand Up @@ -570,11 +569,11 @@ func (s *Syncer) acceptLoop() error {
go func() {
defer conn.Close()
if err := s.allowConnect(conn.RemoteAddr().String(), true); err != nil {
s.log.Printf("rejected inbound connection from %v: %v", conn.RemoteAddr(), err)
s.log.Debug("rejected inbound connection", zap.Stringer("remoteAddress", conn.RemoteAddr()), zap.Error(err))
} else if p, err := gateway.Accept(conn, s.header); err != nil {
s.log.Printf("failed to accept inbound connection from %v: %v", conn.RemoteAddr(), err)
s.log.Debug("failed to accept inbound connection", zap.Stringer("remoteAddress", conn.RemoteAddr()), zap.Error(err))
} else if s.alreadyConnected(p) {
s.log.Printf("rejected inbound connection from %v: already connected", conn.RemoteAddr())
s.log.Debug("already connected to peer", zap.Stringer("remoteAddress", conn.RemoteAddr()))
} else {
s.runPeer(p)
}
Expand Down Expand Up @@ -660,7 +659,7 @@ func (s *Syncer) peerLoop(closeChan <-chan struct{}) error {
// NOTE: we don't bother logging failure here, since it's common and
// not particularly interesting or actionable
if _, err := s.Connect(p); err == nil {
s.log.Printf("formed outbound connection to %v", p)
s.log.Debug("connected to peer", zap.String("peer", p))
}
lastTried[p] = time.Now()
}
Expand Down Expand Up @@ -702,7 +701,7 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error {
s.mu.Lock()
s.synced[p.Addr] = true
s.mu.Unlock()
s.log.Printf("starting sync with %v", p)
s.log.Debug("syncing with peer", zap.Stringer("peer", p))
oldTip := s.cm.Tip()
oldTime := time.Now()
lastPrint := time.Now()
Expand All @@ -720,7 +719,7 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error {
})
startTime, startHeight = endTime, endHeight
if time.Since(lastPrint) > 30*time.Second {
s.log.Printf("syncing with %v, tip now %v (avg %.2f blocks/s)", p, s.cm.Tip(), float64(s.cm.Tip().Height-oldTip.Height)/endTime.Sub(oldTime).Seconds())
s.log.Debug("syncing with peer", zap.Stringer("peer", p), zap.Uint64("blocks", sentBlocks), zap.Duration("elapsed", endTime.Sub(oldTime)))
lastPrint = time.Now()
}
return nil
Expand All @@ -745,11 +744,11 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error {
}
totalBlocks := s.cm.Tip().Height - oldTip.Height
if err != nil {
s.log.Printf("syncing with %v failed after %v blocks: %v", p, totalBlocks, err)
s.log.Debug("syncing with peer failed", zap.Stringer("peer", p), zap.Error(err), zap.Uint64("blocks", totalBlocks))
} else if newTip := s.cm.Tip(); newTip != oldTip {
s.log.Printf("finished syncing %v blocks with %v, tip now %v", totalBlocks, p, newTip)
s.log.Debug("finished syncing with peer", zap.Stringer("peer", p), zap.Stringer("newTip", newTip), zap.Uint64("blocks", totalBlocks))
} else {
s.log.Printf("finished syncing %v blocks with %v, tip unchanged", sentBlocks, p)
s.log.Debug("finished syncing with peer, tip unchanged", zap.Stringer("peer", p), zap.Uint64("blocks", sentBlocks))
}
}
}
Expand Down Expand Up @@ -899,7 +898,7 @@ func New(l net.Listener, cm ChainManager, pm PeerStore, header gateway.Header, o
MaxSendBlocks: 10,
PeerDiscoveryInterval: 5 * time.Second,
SyncInterval: 5 * time.Second,
Logger: log.New(io.Discard, "", 0),
Logger: zap.NewNop(),
}
for _, opt := range opts {
opt(&config)
Expand Down

0 comments on commit 6d84936

Please sign in to comment.