Skip to content

Commit

Permalink
Merge pull request #2071 from guggero/integration-harness-fixes
Browse files Browse the repository at this point in the history
rpctest: integration test harness fixes
  • Loading branch information
guggero committed Dec 22, 2023
2 parents 4af0ca8 + 7644d14 commit 8766bfd
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 58 deletions.
34 changes: 26 additions & 8 deletions blockchain/sizehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,32 @@ const (
loadFactorNum = 13
loadFactorDen = 2

// maxAlloc is the maximum size of an allocation. On 64-bit,
// it's theoretically possible to allocate 1<<heapAddrBits bytes. On
// 32-bit, however, this is one less than 1<<32 because the
// number of bytes in the address space doesn't actually fit
// in a uintptr.
// _64bit = 1 on 64-bit systems, 0 on 32-bit systems
_64bit = 1 << (^uintptr(0) >> 63) / 2

// PtrSize is the size of a pointer in bytes - unsafe.Sizeof(uintptr(0))
// but as an ideal constant. It is also the size of the machine's native
// word size (that is, 4 on 32-bit systems, 8 on 64-bit).
PtrSize = 4 << (^uintptr(0) >> 63)

// heapAddrBits is the number of bits in a heap address that's actually
// available for memory allocation.
//
// NOTE (kcalvinalvin): I just took the constant for a 64 bit system.
maxAlloc = 281474976710656
// NOTE (guggero): For 64-bit systems, we just assume 40 bits of address
// space available, as that seems to be the lowest common denominator.
// See heapAddrBits in runtime/malloc.go of the standard library for
// more details
heapAddrBits = 32 + (_64bit * 8)

// maxAlloc is the maximum size of an allocation on the heap.
//
// NOTE(guggero): With the somewhat simplified heapAddrBits calculation
// above, this will currently limit the maximum allocation size of the
// UTXO cache to around 300GiB on 64-bit systems. This should be more
// than enough for the foreseeable future, but if we ever need to
// increase it, we should probably use the same calculation as the
// standard library.
maxAlloc = (1 << heapAddrBits) - (1-_64bit)*1
)

var class_to_size = [_NumSizeClasses]uint16{0, 8, 16, 24, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 288, 320, 352, 384, 416, 448, 480, 512, 576, 640, 704, 768, 896, 1024, 1152, 1280, 1408, 1536, 1792, 2048, 2304, 2688, 3072, 3200, 3456, 4096, 4864, 5376, 6144, 6528, 6784, 6912, 8192, 9472, 9728, 10240, 10880, 12288, 13568, 14336, 16384, 18432, 19072, 20480, 21760, 24576, 27264, 28672, 32768}
Expand Down Expand Up @@ -175,7 +193,7 @@ func calculateMinEntries(totalBytes int, bucketSize int) int {
// mulUintptr returns a * b and whether the multiplication overflowed.
// On supported platforms this is an intrinsic lowered by the compiler.
func mulUintptr(a, b uintptr) (uintptr, bool) {
if a|b < 1<<(4*8) || a == 0 {
if a|b < 1<<(4*PtrSize) || a == 0 {
return a * b, false
}
overflow := b > MaxUintptr/a
Expand Down
52 changes: 15 additions & 37 deletions integration/rpctest/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rpctest

import (
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
Expand All @@ -31,7 +30,7 @@ type nodeConfig struct {
profile string
debugLevel string
extra []string
prefix string
nodeDir string

exe string
endpoint string
Expand All @@ -41,7 +40,7 @@ type nodeConfig struct {
}

// newConfig returns a newConfig with all default values.
func newConfig(prefix, certFile, keyFile string, extra []string,
func newConfig(nodeDir, certFile, keyFile string, extra []string,
customExePath string) (*nodeConfig, error) {

var btcdPath string
Expand All @@ -61,7 +60,7 @@ func newConfig(prefix, certFile, keyFile string, extra []string,
rpcUser: "user",
rpcPass: "pass",
extra: extra,
prefix: prefix,
nodeDir: nodeDir,
exe: btcdPath,
endpoint: "ws",
certFile: certFile,
Expand All @@ -77,17 +76,9 @@ func newConfig(prefix, certFile, keyFile string, extra []string,
// temporary data, and log directories which must be cleaned up with a call to
// cleanup().
func (n *nodeConfig) setDefaults() error {
datadir, err := ioutil.TempDir("", n.prefix+"-data")
if err != nil {
return err
}
n.dataDir = datadir
logdir, err := ioutil.TempDir("", n.prefix+"-logs")
if err != nil {
return err
}
n.logDir = logdir
cert, err := ioutil.ReadFile(n.certFile)
n.dataDir = filepath.Join(n.nodeDir, "data")
n.logDir = filepath.Join(n.nodeDir, "logs")
cert, err := os.ReadFile(n.certFile)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,22 +154,7 @@ func (n *nodeConfig) rpcConnConfig() rpc.ConnConfig {

// String returns the string representation of this nodeConfig.
func (n *nodeConfig) String() string {
return n.prefix
}

// cleanup removes the tmp data and log directories.
func (n *nodeConfig) cleanup() error {
dirs := []string{
n.logDir,
n.dataDir,
}
var err error
for _, dir := range dirs {
if err = os.RemoveAll(dir); err != nil {
log.Printf("Cannot remove dir %s: %v", dir, err)
}
}
return err
return n.nodeDir
}

// node houses the necessary state required to configure, launch, and manage a
Expand Down Expand Up @@ -213,8 +189,7 @@ func (n *node) start() error {
return err
}

pid, err := os.Create(filepath.Join(n.dataDir,
fmt.Sprintf("%s.pid", n.config)))
pid, err := os.Create(filepath.Join(n.dataDir, "btcd.pid"))
if err != nil {
return err
}
Expand Down Expand Up @@ -258,7 +233,10 @@ func (n *node) cleanup() error {
}
}

return n.config.cleanup()
// Since the node's main data directory is passed in to the node config,
// it isn't our responsibility to clean it up. So we're done after
// removing the pid file.
return nil
}

// shutdown terminates the running btcd process, and cleans up all
Expand All @@ -283,11 +261,11 @@ func genCertPair(certFile, keyFile string) error {
}

// Write cert and key files.
if err = ioutil.WriteFile(certFile, cert, 0666); err != nil {
if err = os.WriteFile(certFile, cert, 0666); err != nil {
return err
}
if err = ioutil.WriteFile(keyFile, key, 0600); err != nil {
os.Remove(certFile)
if err = os.WriteFile(keyFile, key, 0600); err != nil {
_ = os.Remove(certFile)
return err
}

Expand Down
134 changes: 121 additions & 13 deletions integration/rpctest/rpc_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rpctest

import (
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -152,8 +151,7 @@ func New(activeNet *chaincfg.Params, handlers *rpcclient.NotificationHandlers,
return nil, err
}

harnessID := strconv.Itoa(numTestInstances)
nodeTestData, err := ioutil.TempDir(testDir, "harness-"+harnessID)
nodeTestData, err := os.MkdirTemp(testDir, "rpc-node")
if err != nil {
return nil, err
}
Expand All @@ -173,7 +171,7 @@ func New(activeNet *chaincfg.Params, handlers *rpcclient.NotificationHandlers,
extraArgs = append(extraArgs, miningAddr)

config, err := newConfig(
"rpctest", certFile, keyFile, extraArgs, customExePath,
nodeTestData, certFile, keyFile, extraArgs, customExePath,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -248,10 +246,10 @@ func (h *Harness) SetUp(createTestChain bool, numMatureOutputs uint32) error {
// Start the btcd node itself. This spawns a new process which will be
// managed
if err := h.node.start(); err != nil {
return err
return fmt.Errorf("error starting node: %w", err)
}
if err := h.connectRPCClient(); err != nil {
return err
return fmt.Errorf("error connecting RPC client: %w", err)
}

h.wallet.Start()
Expand All @@ -272,8 +270,8 @@ func (h *Harness) SetUp(createTestChain bool, numMatureOutputs uint32) error {
// Create a test chain with the desired number of mature coinbase
// outputs.
if createTestChain && numMatureOutputs != 0 {
numToGenerate := (uint32(h.ActiveNet.CoinbaseMaturity) +
numMatureOutputs)
coinbaseMaturity := uint32(h.ActiveNet.CoinbaseMaturity)
numToGenerate := coinbaseMaturity + numMatureOutputs
_, err := h.Client.Generate(numToGenerate)
if err != nil {
return err
Expand Down Expand Up @@ -351,15 +349,18 @@ func (h *Harness) connectRPCClient() error {
batchConf.HTTPPostMode = true
for i := 0; i < h.MaxConnRetries; i++ {
fail := false
timeout := time.Duration(i) * h.ConnectionRetryTimeout
if client == nil {
if client, err = rpcclient.New(&rpcConf, h.handlers); err != nil {
time.Sleep(time.Duration(i) * h.ConnectionRetryTimeout)
client, err = rpcclient.New(&rpcConf, h.handlers)
if err != nil {
time.Sleep(timeout)
fail = true
}
}
if batchClient == nil {
if batchClient, err = rpcclient.NewBatch(&batchConf); err != nil {
time.Sleep(time.Duration(i) * h.ConnectionRetryTimeout)
batchClient, err = rpcclient.NewBatch(&batchConf)
if err != nil {
time.Sleep(timeout)
fail = true
}
}
Expand All @@ -369,7 +370,9 @@ func (h *Harness) connectRPCClient() error {
}

if client == nil || batchClient == nil {
return fmt.Errorf("connection timeout")
return fmt.Errorf("connection timeout, tried %d times with "+
"timeout %v, last err: %w", h.MaxConnRetries,
h.ConnectionRetryTimeout, err)
}

h.Client = client
Expand Down Expand Up @@ -558,6 +561,111 @@ func NextAvailablePort() int {
panic("no ports available for listening")
}

// NextAvailablePortForProcess returns the first port that is available for
// listening by a new node, using a lock file to make sure concurrent access for
// parallel tasks within the same process don't re-use the same port. It panics
// if no port is found and the maximum available TCP port is reached.
func NextAvailablePortForProcess(pid int) int {
lockFile := filepath.Join(
os.TempDir(), fmt.Sprintf("rpctest-port-pid-%d.lock", pid),
)
timeout := time.After(time.Second)

var (
lockFileHandle *os.File
err error
)
for {
// Attempt to acquire the lock file. If it already exists, wait
// for a bit and retry.
lockFileHandle, err = os.OpenFile(
lockFile, os.O_CREATE|os.O_EXCL, 0600,
)
if err == nil {
// Lock acquired.
break
}

// Wait for a bit and retry.
select {
case <-timeout:
panic("timeout waiting for lock file")
case <-time.After(10 * time.Millisecond):
}
}

// Release the lock file when we're done.
defer func() {
// Always close file first, Windows won't allow us to remove it
// otherwise.
_ = lockFileHandle.Close()
err := os.Remove(lockFile)
if err != nil {
panic(fmt.Errorf("couldn't remove lock file: %w", err))
}
}()

portFile := filepath.Join(
os.TempDir(), fmt.Sprintf("rpctest-port-pid-%d", pid),
)
port, err := os.ReadFile(portFile)
if err != nil {
if !os.IsNotExist(err) {
panic(fmt.Errorf("error reading port file: %w", err))
}
port = []byte(strconv.Itoa(int(defaultNodePort)))
}

lastPort, err := strconv.Atoi(string(port))
if err != nil {
panic(fmt.Errorf("error parsing port: %w", err))
}

// We take the next one.
lastPort++
for lastPort < 65535 {
// If there are no errors while attempting to listen on this
// port, close the socket and return it as available. While it
// could be the case that some other process picks up this port
// between the time the socket is closed and it's reopened in
// the harness node, in practice in CI servers this seems much
// less likely than simply some other process already being
// bound at the start of the tests.
addr := fmt.Sprintf(ListenerFormat, lastPort)
l, err := net.Listen("tcp4", addr)
if err == nil {
err := l.Close()
if err == nil {
err := os.WriteFile(
portFile,
[]byte(strconv.Itoa(lastPort)), 0600,
)
if err != nil {
panic(fmt.Errorf("error updating "+
"port file: %w", err))
}

return lastPort
}
}
lastPort++
}

// No ports available? Must be a mistake.
panic("no ports available for listening")
}

// GenerateProcessUniqueListenerAddresses is a function that returns two
// listener addresses with unique ports per the given process id and should be
// used to overwrite rpctest's default generator which is prone to use colliding
// ports.
func GenerateProcessUniqueListenerAddresses(pid int) (string, string) {
port1 := NextAvailablePortForProcess(pid)
port2 := NextAvailablePortForProcess(pid)
return fmt.Sprintf(ListenerFormat, port1),
fmt.Sprintf(ListenerFormat, port2)
}

// baseDir is the directory path of the temp directory for all rpctest files.
func baseDir() (string, error) {
dirPath := filepath.Join(os.TempDir(), "btcd", "rpctest")
Expand Down

0 comments on commit 8766bfd

Please sign in to comment.