Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: add limit for batch request and response size #26681

Merged
merged 32 commits into from Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7f3d7e6
limit the number of batch requests to 100
mmsqe Feb 14, 2023
667a408
limit the size of the response packet to 10MB
mmsqe Feb 14, 2023
21aec8f
add batch limit related config
mmsqe Feb 14, 2023
6b8b39d
update doc
mmsqe Feb 14, 2023
b6993c4
Merge branch 'master' into add-rpc-limit
mmsqe Feb 14, 2023
c4ac65c
apply limit for server & client
mmsqe Feb 14, 2023
c9015fa
make batch related limit configurable
mmsqe Feb 15, 2023
6d2ce24
Merge branch 'master' into add-rpc-limit
mmsqe Feb 15, 2023
2c04aa0
add SetBatchLimits for client with default limit
mmsqe Feb 16, 2023
22bc552
Merge branch 'master' into add-rpc-limit
mmsqe Feb 16, 2023
a43fda5
rename namespace
mmsqe Feb 16, 2023
754137c
Merge branch 'master' into add-rpc-limit
mmsqe Feb 16, 2023
d7c8673
allow set limit with dial after client get init
mmsqe Feb 16, 2023
7fd2b77
set limit when init client
mmsqe Feb 16, 2023
733910c
rpc: configure client batch limits through options
fjl Feb 17, 2023
bae5a2f
node: refactor passing around rpc config
fjl Feb 17, 2023
6c6b8b1
rpc: increase default batch limits
fjl Feb 17, 2023
cebe226
rpc: simplify sending error response
fjl Feb 17, 2023
333dffb
rpc: rename variable
fjl Feb 17, 2023
b91f08a
rpc: add test for batch size limit
fjl Feb 17, 2023
fdf1b20
handle msg id for batch too large
mmsqe Feb 18, 2023
127079b
test batch request limit for non-call
mmsqe Mar 3, 2023
e82658a
rm non-call test
mmsqe Mar 6, 2023
bd5dfa6
Merge branch 'master' into add-rpc-limit
holiman May 31, 2023
47557d1
cmd/utils: fix docs on flags
holiman May 31, 2023
8e6018f
rpc: minor refactor of tests
holiman May 31, 2023
acf5730
rpc: improve client batch response handling
fjl Jun 8, 2023
82b5208
rpc: attach "batch too large" error to the first call
fjl Jun 8, 2023
f0688d6
rpc: remove default limits
fjl Jun 8, 2023
cd73291
rpc: remove added blank lines in invalid-batch.js
fjl Jun 8, 2023
7048bfc
rpc: remove special error handling for HTTP batch response length
fjl Jun 8, 2023
6841858
rpc: rename error
fjl Jun 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/clef/main.go
Expand Up @@ -732,6 +732,7 @@ func signer(c *cli.Context) error {
cors := utils.SplitAndTrim(c.String(utils.HTTPCORSDomainFlag.Name))

srv := rpc.NewServer()
srv.SetBatchLimits(utils.BatchRequestLimit.Value, utils.BatchResponseMaxSize.Value)
err := node.RegisterApis(rpcAPI, []string{"account"}, srv)
if err != nil {
utils.Fatalf("Could not register API: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Expand Up @@ -180,6 +180,8 @@ var (
utils.RPCGlobalEVMTimeoutFlag,
utils.RPCGlobalTxFeeCapFlag,
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
utils.BatchResponseMaxSize,
}

metricsFlags = []cli.Flag{
Expand Down
20 changes: 20 additions & 0 deletions cmd/utils/flags.go
Expand Up @@ -786,6 +786,18 @@ var (
Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC",
Category: flags.APICategory,
}
BatchRequestLimit = &cli.IntFlag{
Name: "rpc.batch-request-limit",
Usage: "Maximum number of requests in a batch",
Value: rpc.DefaultBatchRequestLimit,
Category: flags.APICategory,
}
BatchResponseMaxSize = &cli.IntFlag{
Name: "rpc.batch-response-max-size",
Usage: "Maximum number of bytes returned from calls (10MB)",
Value: rpc.DefaultBatchResponseMaxSize,
Category: flags.APICategory,
}
EnablePersonal = &cli.BoolFlag{
Name: "rpc.enabledeprecatedpersonal",
Usage: "Enables the (deprecated) personal namespace",
Expand Down Expand Up @@ -1209,6 +1221,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.IsSet(AllowUnprotectedTxs.Name) {
cfg.AllowUnprotectedTxs = ctx.Bool(AllowUnprotectedTxs.Name)
}

if ctx.IsSet(BatchRequestLimit.Name) {
cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name)
}

if ctx.IsSet(BatchResponseMaxSize.Name) {
cfg.BatchResponseMaxSize = ctx.Int(BatchResponseMaxSize.Name)
}
}

// setGraphQL creates the GraphQL listener interface string from the set
Expand Down
8 changes: 8 additions & 0 deletions node/api.go
Expand Up @@ -176,6 +176,10 @@ func (api *adminAPI) StartHTTP(host *string, port *int, cors *string, apis *stri
CorsAllowedOrigins: api.node.config.HTTPCors,
Vhosts: api.node.config.HTTPVirtualHosts,
Modules: api.node.config.HTTPModules,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
batchResponseSizeLimit: api.node.config.BatchResponseMaxSize,
},
}
if cors != nil {
config.CorsAllowedOrigins = nil
Expand Down Expand Up @@ -250,6 +254,10 @@ func (api *adminAPI) StartWS(host *string, port *int, allowedOrigins *string, ap
Modules: api.node.config.WSModules,
Origins: api.node.config.WSOrigins,
// ExposeAll: api.node.config.WSExposeAll,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
batchResponseSizeLimit: api.node.config.BatchResponseMaxSize,
},
}
if apis != nil {
config.Modules = nil
Expand Down
6 changes: 6 additions & 0 deletions node/config.go
Expand Up @@ -197,6 +197,12 @@ type Config struct {
// AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC.
AllowUnprotectedTxs bool `toml:",omitempty"`

// BatchRequestLimit is the maximum number of requests in a batch.
BatchRequestLimit int `toml:",omitempty"`

// BatchResponseMaxSize is the maximum number of bytes returned from calls (10MB).
BatchResponseMaxSize int `toml:",omitempty"`

// JWTSecret is the path to the hex-encoded jwt secret.
JWTSecret string `toml:",omitempty"`

Expand Down
31 changes: 21 additions & 10 deletions node/node.go
Expand Up @@ -101,10 +101,11 @@ func New(conf *Config) (*Node, error) {
if strings.HasSuffix(conf.Name, ".ipc") {
return nil, errors.New(`Config.Name cannot end in ".ipc"`)
}

server := rpc.NewServer()
server.SetBatchLimits(conf.BatchRequestLimit, conf.BatchResponseMaxSize)
node := &Node{
config: conf,
inprocHandler: rpc.NewServer(),
inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger,
stop: make(chan struct{}),
Expand Down Expand Up @@ -403,6 +404,11 @@ func (n *Node) startRPC() error {
openAPIs, allAPIs = n.getAPIs()
)

rpcConfig := rpcEndpointConfig{
batchItemLimit: n.config.BatchRequestLimit,
batchResponseSizeLimit: n.config.BatchResponseMaxSize,
}

initHttp := func(server *httpServer, port int) error {
if err := server.setListenAddr(n.config.HTTPHost, port); err != nil {
return err
Expand All @@ -412,6 +418,7 @@ func (n *Node) startRPC() error {
Vhosts: n.config.HTTPVirtualHosts,
Modules: n.config.HTTPModules,
prefix: n.config.HTTPPathPrefix,
rpcEndpointConfig: rpcConfig,
}); err != nil {
return err
}
Expand All @@ -425,9 +432,10 @@ func (n *Node) startRPC() error {
return err
}
if err := server.enableWS(openAPIs, wsConfig{
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
prefix: n.config.WSPathPrefix,
Modules: n.config.WSModules,
Origins: n.config.WSOrigins,
prefix: n.config.WSPathPrefix,
rpcEndpointConfig: rpcConfig,
}); err != nil {
return err
}
Expand All @@ -441,26 +449,29 @@ func (n *Node) startRPC() error {
if err := server.setListenAddr(n.config.AuthAddr, port); err != nil {
return err
}
sharedConfig := rpcConfig
sharedConfig.jwtSecret = secret
if err := server.enableRPC(allAPIs, httpConfig{
CorsAllowedOrigins: DefaultAuthCors,
Vhosts: n.config.AuthVirtualHosts,
Modules: DefaultAuthModules,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
rpcEndpointConfig: sharedConfig,
}); err != nil {
return err
}
servers = append(servers, server)

// Enable auth via WS
server = n.wsServerForPort(port, true)
if err := server.setListenAddr(n.config.AuthAddr, port); err != nil {
return err
}
if err := server.enableWS(allAPIs, wsConfig{
Modules: DefaultAuthModules,
Origins: DefaultAuthOrigins,
prefix: DefaultAuthPrefix,
jwtSecret: secret,
Modules: DefaultAuthModules,
Origins: DefaultAuthOrigins,
prefix: DefaultAuthPrefix,
rpcEndpointConfig: sharedConfig,
}); err != nil {
return err
}
Expand Down
18 changes: 13 additions & 5 deletions node/rpcstack.go
Expand Up @@ -41,15 +41,21 @@ type httpConfig struct {
CorsAllowedOrigins []string
Vhosts []string
prefix string // path prefix on which to mount http handler
jwtSecret []byte // optional JWT secret
rpcEndpointConfig
}

// wsConfig is the JSON-RPC/Websocket configuration
type wsConfig struct {
Origins []string
Modules []string
prefix string // path prefix on which to mount ws handler
jwtSecret []byte // optional JWT secret
Origins []string
Modules []string
prefix string // path prefix on which to mount ws handler
rpcEndpointConfig
}

type rpcEndpointConfig struct {
jwtSecret []byte // optional JWT secret
batchItemLimit int
batchResponseSizeLimit int
}

type rpcHandler struct {
Expand Down Expand Up @@ -297,6 +303,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {

// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
if err := RegisterApis(apis, config.Modules, srv); err != nil {
return err
}
Expand Down Expand Up @@ -328,6 +335,7 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
}
// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
if err := RegisterApis(apis, config.Modules, srv); err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions node/rpcstack_test.go
Expand Up @@ -338,8 +338,10 @@ func TestJWT(t *testing.T) {
ss, _ := jwt.NewWithClaims(method, testClaim(input)).SignedString(secret)
return ss
}
srv := createAndStartServer(t, &httpConfig{jwtSecret: []byte("secret")},
true, &wsConfig{Origins: []string{"*"}, jwtSecret: []byte("secret")}, nil)
cfg := rpcEndpointConfig{jwtSecret: []byte("secret")}
httpcfg := &httpConfig{rpcEndpointConfig: cfg}
wscfg := &wsConfig{Origins: []string{"*"}, rpcEndpointConfig: cfg}
srv := createAndStartServer(t, httpcfg, true, wscfg, nil)
wsUrl := fmt.Sprintf("ws://%v", srv.listenAddr())
htUrl := fmt.Sprintf("http://%v", srv.listenAddr())

Expand Down
68 changes: 48 additions & 20 deletions rpc/client.go
Expand Up @@ -40,12 +40,18 @@ var (
errDead = errors.New("connection lost")
)

// Timeouts
const (
// Timeouts
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
)

// Batch limits
const (
DefaultBatchRequestLimit = 1000 // Maximum number of items in a batch.
DefaultBatchResponseMaxSize = 25 * 1000 * 1000 // Maximum number of bytes returned from calls.
)

const (
// Subscriptions are removed when the subscriber cannot keep up.
//
Expand Down Expand Up @@ -84,6 +90,10 @@ type Client struct {
// This function, if non-nil, is called when the connection is lost.
reconnectFunc reconnectFunc

// config fields
batchItemLimit int
batchResponseMaxSize int

// writeConn is used for writing to the connection on the caller's goroutine. It should
// only be accessed outside of dispatch, with the write lock held. The write lock is
// taken by sending on reqInit and released by sending on reqSent.
Expand Down Expand Up @@ -114,7 +124,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.Background()
ctx = context.WithValue(ctx, clientContextKey{}, c)
ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo())
handler := newHandler(ctx, conn, c.idgen, c.services)
handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit, c.batchResponseMaxSize)
return &clientConn{conn, handler}
}

Expand Down Expand Up @@ -211,7 +221,7 @@ func DialOptions(ctx context.Context, rawurl string, options ...ClientOption) (*
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}

return newClient(ctx, reconnect)
return newClient(ctx, cfg, reconnect)
}

// ClientFromContext retrieves the client from the context, if any. This can be used to perform
Expand All @@ -221,33 +231,48 @@ func ClientFromContext(ctx context.Context) (*Client, bool) {
return client, ok
}

func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) {
func newClient(initctx context.Context, cfg *clientConfig, connect reconnectFunc) (*Client, error) {
conn, err := connect(initctx)
if err != nil {
return nil, err
}
c := initClient(conn, randomIDGenerator(), new(serviceRegistry))
c := initClient(conn, new(serviceRegistry), cfg)
c.reconnectFunc = connect
return c, nil
}

func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
func initClient(conn ServerCodec, services *serviceRegistry, cfg *clientConfig) *Client {
_, isHTTP := conn.(*httpConn)
c := &Client{
isHTTP: isHTTP,
idgen: idgen,
services: services,
writeConn: conn,
close: make(chan struct{}),
closing: make(chan struct{}),
didClose: make(chan struct{}),
reconnected: make(chan ServerCodec),
readOp: make(chan readOp),
readErr: make(chan error),
reqInit: make(chan *requestOp),
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
}
isHTTP: isHTTP,
services: services,
idgen: cfg.idgen,
batchItemLimit: cfg.batchItemLimit,
batchResponseMaxSize: cfg.batchResponseLimit,
writeConn: conn,
close: make(chan struct{}),
closing: make(chan struct{}),
didClose: make(chan struct{}),
reconnected: make(chan ServerCodec),
readOp: make(chan readOp),
readErr: make(chan error),
reqInit: make(chan *requestOp),
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
}

// Set defaults.
if c.idgen == nil {
c.idgen = randomIDGenerator()
}
if c.batchItemLimit == 0 {
c.batchItemLimit = DefaultBatchRequestLimit
}
if c.batchResponseMaxSize == 0 {
c.batchResponseMaxSize = DefaultBatchResponseMaxSize
}

// Launch the main loop.
if !isHTTP {
go c.dispatch(conn)
}
Expand Down Expand Up @@ -408,6 +433,9 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
// only sends valid IDs to our channel.
elem := &b[byID[string(resp.ID)]]
if resp.Error != nil {
if resp.Error.Message == errMsgBatchTooLarge {
return resp.Error
}
elem.Error = resp.Error
continue
}
Expand Down
29 changes: 29 additions & 0 deletions rpc/client_opt.go
Expand Up @@ -28,11 +28,18 @@ type ClientOption interface {
}

type clientConfig struct {
// HTTP settings
httpClient *http.Client
httpHeaders http.Header
httpAuth HTTPAuth

// WebSocket options
wsDialer *websocket.Dialer

// RPC handler options
idgen func() ID
batchItemLimit int
batchResponseLimit int
}

func (cfg *clientConfig) initHeaders() {
Expand Down Expand Up @@ -104,3 +111,25 @@ func WithHTTPAuth(a HTTPAuth) ClientOption {
// Usually, HTTPAuth functions will call h.Set("authorization", "...") to add
// auth information to the request.
type HTTPAuth func(h http.Header) error

// WithBatchItemLimit changes the maximum number of items allowed in batch requests.
//
// Note: this option applies when processing incoming batch requests. It does not affect
// batch requests sent by the client.
func WithBatchItemLimit(limit int) ClientOption {
return optionFunc(func(cfg *clientConfig) {
cfg.batchItemLimit = limit
})
}

// WithBatchResponseSizeLimit changes the maximum number of response bytes that can be
// generated for batch requests. When this limit is reached, further calls in the batch
// will not be processed.
//
// Note: this option applies when processing incoming batch requests. It does not affect
// batch requests sent by the client.
func WithBatchResponseSizeLimit(sizeLimit int) ClientOption {
return optionFunc(func(cfg *clientConfig) {
cfg.batchResponseLimit = sizeLimit
})
}