Skip to content

Commit

Permalink
rpc: add limit for batch request items and response size (ethereum#26681
Browse files Browse the repository at this point in the history
)

This PR adds server-side limits for JSON-RPC batch requests. Before this change, batches
were limited only by processing time. The server would pick calls from the batch and
answer them until the response timeout occurred, then stop processing the remaining batch
items.

Here, we are adding two additional limits which can be configured:

- the 'item limit': batches can have at most N items
- the 'response size limit': batches can contain at most X response bytes

These limits are optional in package rpc. In Geth, we set a default limit of 1000 items
and 25MB response size.

When a batch goes over the limit, an error response is returned to the client. However,
doing this correctly isn't always possible. In JSON-RPC, only method calls with a valid
`id` can be responded to. Since batches may also contain non-call messages or
notifications, the best effort thing we can do to report an error with the batch itself is
reporting the limit violation as an error for the first method call in the batch. If a batch is
too large, but contains only notifications and responses, the error will be reported with
a null `id`.

The RPC client was also changed so it can deal with errors resulting from too large
batches. An older client connected to the server code in this PR could get stuck
until the request timeout occurred when the batch is too large. **Upgrading to a version
of the RPC client containing this change is strongly recommended to avoid timeout issues.**

For some weird reason, when writing the original client implementation, @fjl worked off of
the assumption that responses could be distributed across batches arbitrarily. So for a
batch request containing requests `[A B C]`, the server could respond with `[A B C]` but
also with `[A B] [C]` or even `[A] [B] [C]` and it wouldn't make a difference to the
client.

So in the implementation of BatchCallContext, the client waited for all requests in the
batch individually. If the server didn't respond to some of the requests in the batch, the
client would eventually just time out (if a context was used).

With the addition of batch limits into the server, we anticipate that people will hit this
kind of error way more often. To handle this properly, the client now waits for a single
response batch and expects it to contain all responses to the requests.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
3 people committed Jun 14, 2023
1 parent 23bee16 commit 2cfbc43
Show file tree
Hide file tree
Showing 25 changed files with 869 additions and 247 deletions.
1 change: 1 addition & 0 deletions cmd/clef/main.go
Expand Up @@ -656,6 +656,7 @@ func signer(c *cli.Context) error {
cors := utils.SplitAndTrim(c.GlobalString(utils.HTTPCORSDomainFlag.Name))

srv := rpc.NewServer()
srv.SetBatchLimits(node.DefaultConfig.BatchRequestLimit, node.DefaultConfig.BatchResponseMaxSize)
err := node.RegisterApis(rpcAPI, []string{"account"}, srv, false)
if err != nil {
utils.Fatalf("Could not register API: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Expand Up @@ -183,6 +183,8 @@ var (
utils.RPCGlobalEVMTimeoutFlag,
utils.RPCGlobalTxFeeCapFlag,
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
utils.BatchResponseMaxSize,
}

metricsFlags = []cli.Flag{
Expand Down
18 changes: 18 additions & 0 deletions cmd/utils/flags.go
Expand Up @@ -668,6 +668,16 @@ var (
Name: "rpc.allow-unprotected-txs",
Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC",
}
BatchRequestLimit = &cli.IntFlag{
Name: "rpc.batch-request-limit",
Usage: "Maximum number of requests in a batch",
Value: node.DefaultConfig.BatchRequestLimit,
}
BatchResponseMaxSize = &cli.IntFlag{
Name: "rpc.batch-response-max-size",
Usage: "Maximum number of bytes returned from a batched call",
Value: node.DefaultConfig.BatchResponseMaxSize,
}

// Network Settings
MaxPeersFlag = cli.IntFlag{
Expand Down Expand Up @@ -1056,6 +1066,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(AllowUnprotectedTxs.Name) {
cfg.AllowUnprotectedTxs = ctx.GlobalBool(AllowUnprotectedTxs.Name)
}

if ctx.IsSet(BatchRequestLimit.Name) {
cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name)
}

if ctx.IsSet(BatchResponseMaxSize.Name) {
cfg.BatchResponseMaxSize = ctx.Int(BatchResponseMaxSize.Name)
}
}

// setGraphQL creates the GraphQL listener interface string from the set
Expand Down
8 changes: 8 additions & 0 deletions node/api.go
Expand Up @@ -185,6 +185,10 @@ func (api *privateAdminAPI) StartHTTP(host *string, port *int, cors *string, api
CorsAllowedOrigins: api.node.config.HTTPCors,
Vhosts: api.node.config.HTTPVirtualHosts,
Modules: api.node.config.HTTPModules,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
batchResponseSizeLimit: api.node.config.BatchResponseMaxSize,
},
}
if cors != nil {
config.CorsAllowedOrigins = nil
Expand Down Expand Up @@ -259,6 +263,10 @@ func (api *privateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
Modules: api.node.config.WSModules,
Origins: api.node.config.WSOrigins,
// ExposeAll: api.node.config.WSExposeAll,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
batchResponseSizeLimit: api.node.config.BatchResponseMaxSize,
},
}
if apis != nil {
config.Modules = nil
Expand Down
6 changes: 6 additions & 0 deletions node/config.go
Expand Up @@ -203,6 +203,12 @@ type Config struct {

// JWTSecret is the hex-encoded jwt secret.
JWTSecret string `toml:",omitempty"`

// BatchRequestLimit is the maximum number of requests in a batch.
BatchRequestLimit int `toml:",omitempty"`

// BatchResponseMaxSize is the maximum number of bytes returned from a batched rpc call.
BatchResponseMaxSize int `toml:",omitempty"`
}

// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
Expand Down
24 changes: 13 additions & 11 deletions node/defaults.go
Expand Up @@ -48,17 +48,19 @@ var (

// DefaultConfig contains reasonable default settings.
var DefaultConfig = Config{
DataDir: DefaultDataDir(),
HTTPPort: DefaultHTTPPort,
AuthAddr: DefaultAuthHost,
AuthPort: DefaultAuthPort,
AuthVirtualHosts: DefaultAuthVhosts,
HTTPModules: []string{"net", "web3"},
HTTPVirtualHosts: []string{"localhost"},
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
GraphQLVirtualHosts: []string{"localhost"},
DataDir: DefaultDataDir(),
HTTPPort: DefaultHTTPPort,
AuthAddr: DefaultAuthHost,
AuthPort: DefaultAuthPort,
AuthVirtualHosts: DefaultAuthVhosts,
HTTPModules: []string{"net", "web3"},
HTTPVirtualHosts: []string{"localhost"},
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
BatchRequestLimit: 1000,
BatchResponseMaxSize: 25 * 1000 * 1000,
GraphQLVirtualHosts: []string{"localhost"},
P2P: p2p.Config{
ListenAddr: ":30303",
MaxPeers: 50,
Expand Down
31 changes: 20 additions & 11 deletions node/node.go
Expand Up @@ -101,10 +101,11 @@ func New(conf *Config) (*Node, error) {
if strings.HasSuffix(conf.Name, ".ipc") {
return nil, errors.New(`Config.Name cannot end in ".ipc"`)
}

server := rpc.NewServer()
server.SetBatchLimits(conf.BatchRequestLimit, conf.BatchResponseMaxSize)
node := &Node{
config: conf,
inprocHandler: rpc.NewServer(),
inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger,
stop: make(chan struct{}),
Expand Down Expand Up @@ -395,7 +396,10 @@ func (n *Node) startRPC() error {
servers []*httpServer
open, all = n.GetAPIs()
)

rpcConfig := rpcEndpointConfig{
batchItemLimit: n.config.BatchRequestLimit,
batchResponseSizeLimit: n.config.BatchResponseMaxSize,
}
initHttp := func(server *httpServer, apis []rpc.API, port int) error {
if err := server.setListenAddr(n.config.HTTPHost, port); err != nil {
return err
Expand All @@ -405,6 +409,7 @@ func (n *Node) startRPC() error {
Vhosts: n.config.HTTPVirtualHosts,
Modules: n.config.HTTPModules,
prefix: n.config.HTTPPathPrefix,
rpcEndpointConfig: rpcConfig,
}); err != nil {
return err
}
Expand All @@ -418,9 +423,10 @@ func (n *Node) startRPC() error {
return err
}
if err := server.enableWS(n.rpcAPIs, wsConfig{
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
prefix: n.config.WSPathPrefix,
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
rpcEndpointConfig: rpcConfig,
prefix: n.config.WSPathPrefix,
}); err != nil {
return err
}
Expand All @@ -434,26 +440,29 @@ func (n *Node) startRPC() error {
if err := server.setListenAddr(n.config.AuthAddr, port); err != nil {
return err
}
sharedConfig := rpcConfig
sharedConfig.jwtSecret = secret
if err := server.enableRPC(apis, httpConfig{
CorsAllowedOrigins: DefaultAuthCors,
Vhosts: n.config.AuthVirtualHosts,
Modules: DefaultAuthModules,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
rpcEndpointConfig: sharedConfig,
}); err != nil {
return err
}
servers = append(servers, server)

// Enable auth via WS
server = n.wsServerForPort(port, true)
if err := server.setListenAddr(n.config.AuthAddr, port); err != nil {
return err
}
if err := server.enableWS(apis, wsConfig{
Modules: DefaultAuthModules,
Origins: DefaultAuthOrigins,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
Modules: DefaultAuthModules,
Origins: DefaultAuthOrigins,
prefix: DefaultAuthPrefix,
rpcEndpointConfig: sharedConfig,
}); err != nil {
return err
}
Expand Down
17 changes: 13 additions & 4 deletions node/rpcstack.go
Expand Up @@ -40,14 +40,21 @@ type httpConfig struct {
Vhosts []string
prefix string // path prefix on which to mount http handler
jwtSecret []byte // optional JWT secret
rpcEndpointConfig
}

// wsConfig is the JSON-RPC/Websocket configuration
type wsConfig struct {
Origins []string
Modules []string
prefix string // path prefix on which to mount ws handler
jwtSecret []byte // optional JWT secret
Origins []string
Modules []string
prefix string // path prefix on which to mount ws handler
rpcEndpointConfig
}

type rpcEndpointConfig struct {
jwtSecret []byte // optional JWT secret
batchItemLimit int
batchResponseSizeLimit int
}

type rpcHandler struct {
Expand Down Expand Up @@ -281,6 +288,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {

// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
if err := RegisterApis(apis, config.Modules, srv, false); err != nil {
return err
}
Expand Down Expand Up @@ -312,6 +320,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
}
// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
if err := RegisterApis(apis, config.Modules, srv, false); err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions node/rpcstack_test.go
Expand Up @@ -314,8 +314,10 @@ func TestJWT(t *testing.T) {
ss, _ := jwt.NewWithClaims(method, testClaim(input)).SignedString(secret)
return ss
}
srv := createAndStartServer(t, &httpConfig{jwtSecret: []byte("secret")},
true, &wsConfig{Origins: []string{"*"}, jwtSecret: []byte("secret")})
cfg := rpcEndpointConfig{jwtSecret: []byte("secret")}
httpcfg := &httpConfig{rpcEndpointConfig: cfg}
wscfg := &wsConfig{Origins: []string{"*"}, rpcEndpointConfig: cfg}
srv := createAndStartServer(t, httpcfg, true, wscfg)
wsUrl := fmt.Sprintf("ws://%v", srv.listenAddr())
htUrl := fmt.Sprintf("http://%v", srv.listenAddr())

Expand Down

0 comments on commit 2cfbc43

Please sign in to comment.