Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: add limit for batch request and response size #26681

Merged
merged 32 commits into from Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7f3d7e6
limit the number of batch requests to 100
mmsqe Feb 14, 2023
667a408
limit the size of the response packet to 10MB
mmsqe Feb 14, 2023
21aec8f
add batch limit related config
mmsqe Feb 14, 2023
6b8b39d
update doc
mmsqe Feb 14, 2023
b6993c4
Merge branch 'master' into add-rpc-limit
mmsqe Feb 14, 2023
c4ac65c
apply limit for server & client
mmsqe Feb 14, 2023
c9015fa
make batch related limit configurable
mmsqe Feb 15, 2023
6d2ce24
Merge branch 'master' into add-rpc-limit
mmsqe Feb 15, 2023
2c04aa0
add SetBatchLimits for client with default limit
mmsqe Feb 16, 2023
22bc552
Merge branch 'master' into add-rpc-limit
mmsqe Feb 16, 2023
a43fda5
rename namespace
mmsqe Feb 16, 2023
754137c
Merge branch 'master' into add-rpc-limit
mmsqe Feb 16, 2023
d7c8673
allow set limit with dial after client get init
mmsqe Feb 16, 2023
7fd2b77
set limit when init client
mmsqe Feb 16, 2023
733910c
rpc: configure client batch limits through options
fjl Feb 17, 2023
bae5a2f
node: refactor passing around rpc config
fjl Feb 17, 2023
6c6b8b1
rpc: increase default batch limits
fjl Feb 17, 2023
cebe226
rpc: simplify sending error response
fjl Feb 17, 2023
333dffb
rpc: rename variable
fjl Feb 17, 2023
b91f08a
rpc: add test for batch size limit
fjl Feb 17, 2023
fdf1b20
handle msg id for batch too large
mmsqe Feb 18, 2023
127079b
test batch request limit for non-call
mmsqe Mar 3, 2023
e82658a
rm non-call test
mmsqe Mar 6, 2023
bd5dfa6
Merge branch 'master' into add-rpc-limit
holiman May 31, 2023
47557d1
cmd/utils: fix docs on flags
holiman May 31, 2023
8e6018f
rpc: minor refactor of tests
holiman May 31, 2023
acf5730
rpc: improve client batch response handling
fjl Jun 8, 2023
82b5208
rpc: attach "batch too large" error to the first call
fjl Jun 8, 2023
f0688d6
rpc: remove default limits
fjl Jun 8, 2023
cd73291
rpc: remove added blank lines in invalid-batch.js
fjl Jun 8, 2023
7048bfc
rpc: remove special error handling for HTTP batch response length
fjl Jun 8, 2023
6841858
rpc: rename error
fjl Jun 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/clef/main.go
Expand Up @@ -732,6 +732,7 @@ func signer(c *cli.Context) error {
cors := utils.SplitAndTrim(c.String(utils.HTTPCORSDomainFlag.Name))

srv := rpc.NewServer()
srv.SetBatchLimits(utils.BatchRequestLimit.Value, utils.BatchResponseMaxSize.Value)
err := node.RegisterApis(rpcAPI, []string{"account"}, srv)
if err != nil {
utils.Fatalf("Could not register API: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Expand Up @@ -180,6 +180,8 @@ var (
utils.RPCGlobalEVMTimeoutFlag,
utils.RPCGlobalTxFeeCapFlag,
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
utils.BatchResponseMaxSize,
}

metricsFlags = []cli.Flag{
Expand Down
20 changes: 20 additions & 0 deletions cmd/utils/flags.go
Expand Up @@ -786,6 +786,18 @@ var (
Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC",
Category: flags.APICategory,
}
BatchRequestLimit = &cli.IntFlag{
Name: "batch.request-limit",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these flags should be somwhere in the rpc. namespace

Usage: "Maximum number of requests in a batch",
Value: rpc.BatchRequestLimit,
Category: flags.APICategory,
}
BatchResponseMaxSize = &cli.IntFlag{
Name: "batch.response-max-size",
Usage: "Maximum number of bytes returned from calls (10MB)",
Value: rpc.BatchResponseMaxSize,
Category: flags.APICategory,
}
EnablePersonal = &cli.BoolFlag{
Name: "rpc.enabledeprecatedpersonal",
Usage: "Enables the (deprecated) personal namespace",
Expand Down Expand Up @@ -1209,6 +1221,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.IsSet(AllowUnprotectedTxs.Name) {
cfg.AllowUnprotectedTxs = ctx.Bool(AllowUnprotectedTxs.Name)
}

if ctx.IsSet(BatchRequestLimit.Name) {
cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name)
}

if ctx.IsSet(BatchResponseMaxSize.Name) {
cfg.BatchResponseMaxSize = ctx.Int(BatchResponseMaxSize.Name)
}
}

// setGraphQL creates the GraphQL listener interface string from the set
Expand Down
4 changes: 2 additions & 2 deletions node/api.go
Expand Up @@ -199,7 +199,7 @@ func (api *adminAPI) StartHTTP(host *string, port *int, cors *string, apis *stri
if err := api.node.http.setListenAddr(*host, *port); err != nil {
return false, err
}
if err := api.node.http.enableRPC(api.node.rpcAPIs, config); err != nil {
if err := api.node.http.enableRPC(api.node.rpcAPIs, config, api.node.config.BatchRequestLimit, api.node.config.BatchResponseMaxSize); err != nil {
return false, err
}
if err := api.node.http.start(); err != nil {
Expand Down Expand Up @@ -270,7 +270,7 @@ func (api *adminAPI) StartWS(host *string, port *int, allowedOrigins *string, ap
return false, err
}
openApis, _ := api.node.getAPIs()
if err := server.enableWS(openApis, config); err != nil {
if err := server.enableWS(openApis, config, api.node.config.BatchRequestLimit, api.node.config.BatchResponseMaxSize); err != nil {
return false, err
}
if err := server.start(); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions node/config.go
Expand Up @@ -197,6 +197,12 @@ type Config struct {
// AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC.
AllowUnprotectedTxs bool `toml:",omitempty"`

// BatchRequestLimit is the maximum number of requests in a batch.
BatchRequestLimit int `toml:",omitempty"`

// BatchResponseMaxSize is the maximum number of bytes returned from calls (10MB).
BatchResponseMaxSize int `toml:",omitempty"`

// JWTSecret is the path to the hex-encoded jwt secret.
JWTSecret string `toml:",omitempty"`

Expand Down
13 changes: 7 additions & 6 deletions node/node.go
Expand Up @@ -101,10 +101,11 @@ func New(conf *Config) (*Node, error) {
if strings.HasSuffix(conf.Name, ".ipc") {
return nil, errors.New(`Config.Name cannot end in ".ipc"`)
}

server := rpc.NewServer()
server.SetBatchLimits(conf.BatchRequestLimit, conf.BatchResponseMaxSize)
node := &Node{
config: conf,
inprocHandler: rpc.NewServer(),
inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger,
stop: make(chan struct{}),
Expand Down Expand Up @@ -412,7 +413,7 @@ func (n *Node) startRPC() error {
Vhosts: n.config.HTTPVirtualHosts,
Modules: n.config.HTTPModules,
prefix: n.config.HTTPPathPrefix,
}); err != nil {
}, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil {
return err
}
servers = append(servers, server)
Expand All @@ -428,7 +429,7 @@ func (n *Node) startRPC() error {
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
prefix: n.config.WSPathPrefix,
}); err != nil {
}, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil {
return err
}
servers = append(servers, server)
Expand All @@ -447,7 +448,7 @@ func (n *Node) startRPC() error {
Modules: DefaultAuthModules,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
}); err != nil {
}, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil {
return err
}
servers = append(servers, server)
Expand All @@ -461,7 +462,7 @@ func (n *Node) startRPC() error {
Origins: DefaultAuthOrigins,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
}); err != nil {
}, n.config.BatchRequestLimit, n.config.BatchResponseMaxSize); err != nil {
return err
}
servers = append(servers, server)
Expand Down
6 changes: 4 additions & 2 deletions node/rpcstack.go
Expand Up @@ -287,7 +287,7 @@ func (h *httpServer) doStop() {
}

// enableRPC turns on JSON-RPC over HTTP on the server.
func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {
func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, batchRequestLimit, batchResponseMaxSize int) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -297,6 +297,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {

// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(batchRequestLimit, batchResponseMaxSize)
if err := RegisterApis(apis, config.Modules, srv); err != nil {
return err
}
Expand All @@ -319,7 +320,7 @@ func (h *httpServer) disableRPC() bool {
}

// enableWS turns on JSON-RPC over WebSocket on the server.
func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, batchRequestLimit, batchResponseMaxSize int) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -328,6 +329,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
}
// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(batchRequestLimit, batchResponseMaxSize)
if err := RegisterApis(apis, config.Modules, srv); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions node/rpcstack_test.go
Expand Up @@ -242,9 +242,9 @@ func createAndStartServer(t *testing.T, conf *httpConfig, ws bool, wsConf *wsCon
timeouts = &rpc.DefaultHTTPTimeouts
}
srv := newHTTPServer(testlog.Logger(t, log.LvlDebug), *timeouts)
assert.NoError(t, srv.enableRPC(apis(), *conf))
assert.NoError(t, srv.enableRPC(apis(), *conf, 0, 0))
if ws {
assert.NoError(t, srv.enableWS(nil, *wsConf))
assert.NoError(t, srv.enableWS(nil, *wsConf, 0, 0))
}
assert.NoError(t, srv.setListenAddr("localhost", 0))
assert.NoError(t, srv.start())
Expand Down
21 changes: 20 additions & 1 deletion rpc/client.go
Expand Up @@ -44,6 +44,10 @@ const (
// Timeouts
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls

// Batch limits
BatchRequestLimit = 100 // Maximum number of requests in a batch
BatchResponseMaxSize = 10 * 1000 * 1000 // Maximum number of bytes returned from calls (10MB)
)

const (
Expand Down Expand Up @@ -99,6 +103,9 @@ type Client struct {
reqInit chan *requestOp // register response IDs, takes write lock
reqSent chan error // signals write completion, releases write lock
reqTimeout chan *requestOp // removes response IDs when call timeout expires

batchRequestLimit int
batchResponseMaxSize int
}

type reconnectFunc func(context.Context) (ServerCodec, error)
Expand All @@ -114,10 +121,22 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.Background()
ctx = context.WithValue(ctx, clientContextKey{}, c)
ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo())
handler := newHandler(ctx, conn, c.idgen, c.services)
if c.batchRequestLimit == 0 {
c.batchRequestLimit = BatchRequestLimit
}
if c.batchResponseMaxSize == 0 {
c.batchResponseMaxSize = BatchResponseMaxSize
}
handler := newHandler(ctx, conn, c.idgen, c.services, c.batchRequestLimit, c.batchResponseMaxSize)
return &clientConn{conn, handler}
}

// SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls
func (c *Client) SetBatchLimits(limit int, size int) {
c.batchRequestLimit = limit
c.batchResponseMaxSize = size
}

func (cc *clientConn) close(err error, inflightReq *requestOp) {
cc.handler.close(err, inflightReq)
cc.codec.close()
Expand Down
4 changes: 3 additions & 1 deletion rpc/errors.go
Expand Up @@ -61,12 +61,14 @@ const (
errcodeDefault = -32000
errcodeNotificationsUnsupported = -32001
errcodeTimeout = -32002
errcodeResponseTooLarge = -32003
holiman marked this conversation as resolved.
Show resolved Hide resolved
errcodePanic = -32603
errcodeMarshalError = -32603
)

const (
errMsgTimeout = "request timed out"
errMsgTimeout = "request timed out"
errMsgResponseTooLarge = "response too large"
)

type methodNotFoundError struct{ method string }
Expand Down
78 changes: 56 additions & 22 deletions rpc/handler.go
Expand Up @@ -49,17 +49,19 @@ import (
// h.removeRequestOp(op) // timeout, etc.
// }
type handler struct {
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
batchRequestLimit int
batchResponseMaxSize int

subLock sync.Mutex
serverSubs map[ID]*Subscription
Expand All @@ -70,19 +72,21 @@ type callProc struct {
notifiers []*Notifier
}

func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, batchRequestLimit, batchResponseMaxSize int) *handler {
rootCtx, cancelRoot := context.WithCancel(connCtx)
h := &handler{
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
batchRequestLimit: batchRequestLimit,
batchResponseMaxSize: batchResponseMaxSize,
}
if conn.remoteAddr() != "" {
h.log = h.log.New("conn", conn.remoteAddr())
Expand Down Expand Up @@ -149,6 +153,21 @@ func (b *batchCallBuffer) timeout(ctx context.Context, conn jsonWriter) {
b.doWrite(ctx, conn, true)
}

// responseTooLarge sends the responses added so far. For the remaining unanswered call
// messages, it sends a response too large error response.
func (b *batchCallBuffer) responseTooLarge(ctx context.Context, conn jsonWriter) {
b.mutex.Lock()
defer b.mutex.Unlock()

for _, msg := range b.calls {
if !msg.isNotification() {
resp := msg.errorResponse(&internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge})
b.resp = append(b.resp, resp)
}
}
b.doWrite(ctx, conn, true)
}

// doWrite actually writes the response.
// This assumes b.mutex is held.
func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorResponse bool) {
Expand All @@ -172,6 +191,14 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
return
}

if len(msgs) > h.batchRequestLimit && h.batchRequestLimit != 0 {
h.startCallProc(func(cp *callProc) {
resp := errorMessage(&invalidRequestError{"batch too large"})
h.conn.writeJSON(cp.ctx, resp, true)
})
return
}

// Handle non-call messages first:
calls := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range msgs {
Expand Down Expand Up @@ -203,6 +230,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
})
}

resBytes := 0
for {
// No need to handle rest of calls if timed out.
if cp.ctx.Err() != nil {
Expand All @@ -214,6 +242,12 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
resp := h.handleCallMsg(cp, msg)
callBuffer.pushResponse(resp)
if resp != nil && h.batchResponseMaxSize != 0 {
if resBytes += len(resp.Result); resBytes > h.batchResponseMaxSize {
callBuffer.responseTooLarge(cp.ctx, h.conn)
break
}
}
}
if timer != nil {
timer.Stop()
Expand Down