From d8911d6fef854063da3156bc478bcd0e044cea21 Mon Sep 17 00:00:00 2001 From: devopsbo3 <69951731+devopsbo3@users.noreply.github.com> Date: Fri, 10 Nov 2023 11:21:03 -0600 Subject: [PATCH] Revert "rpc: add limit for batch request items and response size (#26681)" This reverts commit 189a756134746e9e99c03fd565c834ade111253f. --- cmd/clef/main.go | 1 - cmd/geth/main.go | 2 - cmd/utils/flags.go | 20 -- node/api.go | 8 - node/config.go | 6 - node/defaults.go | 24 +-- node/node.go | 31 +-- node/rpcstack.go | 18 +- node/rpcstack_test.go | 6 +- rpc/client.go | 131 ++++-------- rpc/client_opt.go | 29 --- rpc/client_test.go | 104 ++------- rpc/errors.go | 5 +- rpc/handler.go | 283 ++++++++++--------------- rpc/http.go | 19 +- rpc/inproc.go | 3 +- rpc/ipc.go | 3 +- rpc/server.go | 28 +-- rpc/server_test.go | 39 ---- rpc/stdio.go | 3 +- rpc/testdata/invalid-batch-toolarge.js | 13 -- rpc/websocket.go | 4 +- 22 files changed, 226 insertions(+), 554 deletions(-) delete mode 100644 rpc/testdata/invalid-batch-toolarge.js diff --git a/cmd/clef/main.go b/cmd/clef/main.go index 14f09dc1a8103..cebe74797235e 100644 --- a/cmd/clef/main.go +++ b/cmd/clef/main.go @@ -732,7 +732,6 @@ func signer(c *cli.Context) error { cors := utils.SplitAndTrim(c.String(utils.HTTPCORSDomainFlag.Name)) srv := rpc.NewServer() - srv.SetBatchLimits(node.DefaultConfig.BatchRequestLimit, node.DefaultConfig.BatchResponseMaxSize) err := node.RegisterApis(rpcAPI, []string{"account"}, srv) if err != nil { utils.Fatalf("Could not register API: %w", err) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2794e37e380d1..2289a72a197b9 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -168,8 +168,6 @@ var ( utils.RPCGlobalEVMTimeoutFlag, utils.RPCGlobalTxFeeCapFlag, utils.AllowUnprotectedTxs, - utils.BatchRequestLimit, - utils.BatchResponseMaxSize, } metricsFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 692970477d073..1ebc998a40ed3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -713,18 +713,6 @@ var ( Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC", Category: flags.APICategory, } - BatchRequestLimit = &cli.IntFlag{ - Name: "rpc.batch-request-limit", - Usage: "Maximum number of requests in a batch", - Value: node.DefaultConfig.BatchRequestLimit, - Category: flags.APICategory, - } - BatchResponseMaxSize = &cli.IntFlag{ - Name: "rpc.batch-response-max-size", - Usage: "Maximum number of bytes returned from a batched call", - Value: node.DefaultConfig.BatchResponseMaxSize, - Category: flags.APICategory, - } EnablePersonal = &cli.BoolFlag{ Name: "rpc.enabledeprecatedpersonal", Usage: "Enables the (deprecated) personal namespace", @@ -1142,14 +1130,6 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) { if ctx.IsSet(AllowUnprotectedTxs.Name) { cfg.AllowUnprotectedTxs = ctx.Bool(AllowUnprotectedTxs.Name) } - - if ctx.IsSet(BatchRequestLimit.Name) { - cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name) - } - - if ctx.IsSet(BatchResponseMaxSize.Name) { - cfg.BatchResponseMaxSize = ctx.Int(BatchResponseMaxSize.Name) - } } // setGraphQL creates the GraphQL listener interface string from the set diff --git a/node/api.go b/node/api.go index f81f394beb24a..15892a270b66e 100644 --- a/node/api.go +++ b/node/api.go @@ -176,10 +176,6 @@ func (api *adminAPI) StartHTTP(host *string, port *int, cors *string, apis *stri CorsAllowedOrigins: api.node.config.HTTPCors, Vhosts: api.node.config.HTTPVirtualHosts, Modules: api.node.config.HTTPModules, - rpcEndpointConfig: rpcEndpointConfig{ - batchItemLimit: api.node.config.BatchRequestLimit, - batchResponseSizeLimit: api.node.config.BatchResponseMaxSize, - }, } if cors != nil { config.CorsAllowedOrigins = nil @@ -254,10 +250,6 @@ func (api *adminAPI) StartWS(host *string, port *int, allowedOrigins *string, ap Modules: api.node.config.WSModules, Origins: api.node.config.WSOrigins, // ExposeAll: api.node.config.WSExposeAll, - rpcEndpointConfig: rpcEndpointConfig{ - batchItemLimit: api.node.config.BatchRequestLimit, - batchResponseSizeLimit: api.node.config.BatchResponseMaxSize, - }, } if apis != nil { config.Modules = nil diff --git a/node/config.go b/node/config.go index 37c1e4882b847..1765811e8c375 100644 --- a/node/config.go +++ b/node/config.go @@ -197,12 +197,6 @@ type Config struct { // AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC. AllowUnprotectedTxs bool `toml:",omitempty"` - // BatchRequestLimit is the maximum number of requests in a batch. - BatchRequestLimit int `toml:",omitempty"` - - // BatchResponseMaxSize is the maximum number of bytes returned from a batched rpc call. - BatchResponseMaxSize int `toml:",omitempty"` - // JWTSecret is the path to the hex-encoded jwt secret. JWTSecret string `toml:",omitempty"` diff --git a/node/defaults.go b/node/defaults.go index d8f718121e802..fcfbc934bfd4c 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -46,19 +46,17 @@ var ( // DefaultConfig contains reasonable default settings. var DefaultConfig = Config{ - DataDir: DefaultDataDir(), - HTTPPort: DefaultHTTPPort, - AuthAddr: DefaultAuthHost, - AuthPort: DefaultAuthPort, - AuthVirtualHosts: DefaultAuthVhosts, - HTTPModules: []string{"net", "web3"}, - HTTPVirtualHosts: []string{"localhost"}, - HTTPTimeouts: rpc.DefaultHTTPTimeouts, - WSPort: DefaultWSPort, - WSModules: []string{"net", "web3"}, - BatchRequestLimit: 1000, - BatchResponseMaxSize: 25 * 1000 * 1000, - GraphQLVirtualHosts: []string{"localhost"}, + DataDir: DefaultDataDir(), + HTTPPort: DefaultHTTPPort, + AuthAddr: DefaultAuthHost, + AuthPort: DefaultAuthPort, + AuthVirtualHosts: DefaultAuthVhosts, + HTTPModules: []string{"net", "web3"}, + HTTPVirtualHosts: []string{"localhost"}, + HTTPTimeouts: rpc.DefaultHTTPTimeouts, + WSPort: DefaultWSPort, + WSModules: []string{"net", "web3"}, + GraphQLVirtualHosts: []string{"localhost"}, P2P: p2p.Config{ ListenAddr: ":30303", MaxPeers: 50, diff --git a/node/node.go b/node/node.go index 553d451ab8608..e8494ac3b29e1 100644 --- a/node/node.go +++ b/node/node.go @@ -101,11 +101,10 @@ func New(conf *Config) (*Node, error) { if strings.HasSuffix(conf.Name, ".ipc") { return nil, errors.New(`Config.Name cannot end in ".ipc"`) } - server := rpc.NewServer() - server.SetBatchLimits(conf.BatchRequestLimit, conf.BatchResponseMaxSize) + node := &Node{ config: conf, - inprocHandler: server, + inprocHandler: rpc.NewServer(), eventmux: new(event.TypeMux), log: conf.Logger, stop: make(chan struct{}), @@ -404,11 +403,6 @@ func (n *Node) startRPC() error { openAPIs, allAPIs = n.getAPIs() ) - rpcConfig := rpcEndpointConfig{ - batchItemLimit: n.config.BatchRequestLimit, - batchResponseSizeLimit: n.config.BatchResponseMaxSize, - } - initHttp := func(server *httpServer, port int) error { if err := server.setListenAddr(n.config.HTTPHost, port); err != nil { return err @@ -418,7 +412,6 @@ func (n *Node) startRPC() error { Vhosts: n.config.HTTPVirtualHosts, Modules: n.config.HTTPModules, prefix: n.config.HTTPPathPrefix, - rpcEndpointConfig: rpcConfig, }); err != nil { return err } @@ -432,10 +425,9 @@ func (n *Node) startRPC() error { return err } if err := server.enableWS(openAPIs, wsConfig{ - Modules: n.config.WSModules, - Origins: n.config.WSOrigins, - prefix: n.config.WSPathPrefix, - rpcEndpointConfig: rpcConfig, + Modules: n.config.WSModules, + Origins: n.config.WSOrigins, + prefix: n.config.WSPathPrefix, }); err != nil { return err } @@ -449,29 +441,26 @@ func (n *Node) startRPC() error { if err := server.setListenAddr(n.config.AuthAddr, port); err != nil { return err } - sharedConfig := rpcConfig - sharedConfig.jwtSecret = secret if err := server.enableRPC(allAPIs, httpConfig{ CorsAllowedOrigins: DefaultAuthCors, Vhosts: n.config.AuthVirtualHosts, Modules: DefaultAuthModules, prefix: DefaultAuthPrefix, - rpcEndpointConfig: sharedConfig, + jwtSecret: secret, }); err != nil { return err } servers = append(servers, server) - // Enable auth via WS server = n.wsServerForPort(port, true) if err := server.setListenAddr(n.config.AuthAddr, port); err != nil { return err } if err := server.enableWS(allAPIs, wsConfig{ - Modules: DefaultAuthModules, - Origins: DefaultAuthOrigins, - prefix: DefaultAuthPrefix, - rpcEndpointConfig: sharedConfig, + Modules: DefaultAuthModules, + Origins: DefaultAuthOrigins, + prefix: DefaultAuthPrefix, + jwtSecret: secret, }); err != nil { return err } diff --git a/node/rpcstack.go b/node/rpcstack.go index e91585a2b630f..97d591642c093 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -41,21 +41,15 @@ type httpConfig struct { CorsAllowedOrigins []string Vhosts []string prefix string // path prefix on which to mount http handler - rpcEndpointConfig + jwtSecret []byte // optional JWT secret } // wsConfig is the JSON-RPC/Websocket configuration type wsConfig struct { - Origins []string - Modules []string - prefix string // path prefix on which to mount ws handler - rpcEndpointConfig -} - -type rpcEndpointConfig struct { - jwtSecret []byte // optional JWT secret - batchItemLimit int - batchResponseSizeLimit int + Origins []string + Modules []string + prefix string // path prefix on which to mount ws handler + jwtSecret []byte // optional JWT secret } type rpcHandler struct { @@ -303,7 +297,6 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { // Create RPC server and handler. srv := rpc.NewServer() - srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } @@ -335,7 +328,6 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error { } // Create RPC server and handler. srv := rpc.NewServer() - srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } diff --git a/node/rpcstack_test.go b/node/rpcstack_test.go index e41cc51ad318d..0790dddec4114 100644 --- a/node/rpcstack_test.go +++ b/node/rpcstack_test.go @@ -339,10 +339,8 @@ func TestJWT(t *testing.T) { ss, _ := jwt.NewWithClaims(method, testClaim(input)).SignedString(secret) return ss } - cfg := rpcEndpointConfig{jwtSecret: []byte("secret")} - httpcfg := &httpConfig{rpcEndpointConfig: cfg} - wscfg := &wsConfig{Origins: []string{"*"}, rpcEndpointConfig: cfg} - srv := createAndStartServer(t, httpcfg, true, wscfg, nil) + srv := createAndStartServer(t, &httpConfig{jwtSecret: []byte("secret")}, + true, &wsConfig{Origins: []string{"*"}, jwtSecret: []byte("secret")}, nil) wsUrl := fmt.Sprintf("ws://%v", srv.listenAddr()) htUrl := fmt.Sprintf("http://%v", srv.listenAddr()) diff --git a/rpc/client.go b/rpc/client.go index c3114ef1d20f6..fae8536b26d80 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -34,15 +34,14 @@ import ( var ( ErrBadResult = errors.New("bad result in JSON-RPC response") ErrClientQuit = errors.New("client is closed") - ErrNoResult = errors.New("JSON-RPC response has no result") - ErrMissingBatchResponse = errors.New("response batch did not contain a response to this call") + ErrNoResult = errors.New("no result in JSON-RPC response") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") errClientReconnected = errors.New("client reconnected") errDead = errors.New("connection lost") ) -// Timeouts const ( + // Timeouts defaultDialTimeout = 10 * time.Second // used if context has no deadline subscribeTimeout = 10 * time.Second // overall timeout eth_subscribe, rpc_modules calls ) @@ -85,10 +84,6 @@ type Client struct { // This function, if non-nil, is called when the connection is lost. reconnectFunc reconnectFunc - // config fields - batchItemLimit int - batchResponseMaxSize int - // writeConn is used for writing to the connection on the caller's goroutine. It should // only be accessed outside of dispatch, with the write lock held. The write lock is // taken by sending on reqInit and released by sending on reqSent. @@ -119,7 +114,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.Background() ctx = context.WithValue(ctx, clientContextKey{}, c) ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo()) - handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit, c.batchResponseMaxSize) + handler := newHandler(ctx, conn, c.idgen, c.services) return &clientConn{conn, handler} } @@ -133,17 +128,14 @@ type readOp struct { batch bool } -// requestOp represents a pending request. This is used for both batch and non-batch -// requests. type requestOp struct { - ids []json.RawMessage - err error - resp chan []*jsonrpcMessage // the response goes here - sub *ClientSubscription // set for Subscribe requests. - hadResponse bool // true when the request was responded to + ids []json.RawMessage + err error + resp chan *jsonrpcMessage // receives up to len(ids) responses + sub *ClientSubscription // only set for EthSubscribe requests } -func (op *requestOp) wait(ctx context.Context, c *Client) ([]*jsonrpcMessage, error) { +func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { select { case <-ctx.Done(): // Send the timeout to dispatch so it can remove the request IDs. @@ -219,7 +211,7 @@ func DialOptions(ctx context.Context, rawurl string, options ...ClientOption) (* return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) } - return newClient(ctx, cfg, reconnect) + return newClient(ctx, reconnect) } // ClientFromContext retrieves the client from the context, if any. This can be used to perform @@ -229,42 +221,33 @@ func ClientFromContext(ctx context.Context) (*Client, bool) { return client, ok } -func newClient(initctx context.Context, cfg *clientConfig, connect reconnectFunc) (*Client, error) { +func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { conn, err := connect(initctx) if err != nil { return nil, err } - c := initClient(conn, new(serviceRegistry), cfg) + c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) c.reconnectFunc = connect return c, nil } -func initClient(conn ServerCodec, services *serviceRegistry, cfg *clientConfig) *Client { +func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { _, isHTTP := conn.(*httpConn) c := &Client{ - isHTTP: isHTTP, - services: services, - idgen: cfg.idgen, - batchItemLimit: cfg.batchItemLimit, - batchResponseMaxSize: cfg.batchResponseLimit, - writeConn: conn, - close: make(chan struct{}), - closing: make(chan struct{}), - didClose: make(chan struct{}), - reconnected: make(chan ServerCodec), - readOp: make(chan readOp), - readErr: make(chan error), - reqInit: make(chan *requestOp), - reqSent: make(chan error, 1), - reqTimeout: make(chan *requestOp), - } - - // Set defaults. - if c.idgen == nil { - c.idgen = randomIDGenerator() - } - - // Launch the main loop. + isHTTP: isHTTP, + idgen: idgen, + services: services, + writeConn: conn, + close: make(chan struct{}), + closing: make(chan struct{}), + didClose: make(chan struct{}), + reconnected: make(chan ServerCodec), + readOp: make(chan readOp), + readErr: make(chan error), + reqInit: make(chan *requestOp), + reqSent: make(chan error, 1), + reqTimeout: make(chan *requestOp), + } if !isHTTP { go c.dispatch(conn) } @@ -342,10 +325,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str if err != nil { return err } - op := &requestOp{ - ids: []json.RawMessage{msg.ID}, - resp: make(chan []*jsonrpcMessage, 1), - } + op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} if c.isHTTP { err = c.sendHTTP(ctx, op, msg) @@ -357,12 +337,9 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str } // dispatch has accepted the request and will close the channel when it quits. - batchresp, err := op.wait(ctx, c) - if err != nil { + switch resp, err := op.wait(ctx, c); { + case err != nil: return err - } - resp := batchresp[0] - switch { case resp.Error != nil: return resp.Error case len(resp.Result) == 0: @@ -403,7 +380,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { ) op := &requestOp{ ids: make([]json.RawMessage, len(b)), - resp: make(chan []*jsonrpcMessage, 1), + resp: make(chan *jsonrpcMessage, len(b)), } for i, elem := range b { msg, err := c.newMessage(elem.Method, elem.Args...) @@ -421,48 +398,28 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { } else { err = c.send(ctx, op, msgs) } - if err != nil { - return err - } - - batchresp, err := op.wait(ctx, c) - if err != nil { - return err - } // Wait for all responses to come back. - for n := 0; n < len(batchresp) && err == nil; n++ { - resp := batchresp[n] - if resp == nil { - // Ignore null responses. These can happen for batches sent via HTTP. - continue + for n := 0; n < len(b) && err == nil; n++ { + var resp *jsonrpcMessage + resp, err = op.wait(ctx, c) + if err != nil { + break } - // Find the element corresponding to this response. - index, ok := byID[string(resp.ID)] - if !ok { + // The element is guaranteed to be present because dispatch + // only sends valid IDs to our channel. + elem := &b[byID[string(resp.ID)]] + if resp.Error != nil { + elem.Error = resp.Error continue } - delete(byID, string(resp.ID)) - - // Assign result and error. - elem := &b[index] - switch { - case resp.Error != nil: - elem.Error = resp.Error - case resp.Result == nil: + if len(resp.Result) == 0 { elem.Error = ErrNoResult - default: - elem.Error = json.Unmarshal(resp.Result, elem.Result) + continue } + elem.Error = json.Unmarshal(resp.Result, elem.Result) } - - // Check that all expected responses have been received. - for _, index := range byID { - elem := &b[index] - elem.Error = ErrMissingBatchResponse - } - return err } @@ -523,7 +480,7 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf } op := &requestOp{ ids: []json.RawMessage{msg.ID}, - resp: make(chan []*jsonrpcMessage, 1), + resp: make(chan *jsonrpcMessage), sub: newClientSubscription(c, namespace, chanVal), } diff --git a/rpc/client_opt.go b/rpc/client_opt.go index 5bef08cca8410..5ad7c22b3ce76 100644 --- a/rpc/client_opt.go +++ b/rpc/client_opt.go @@ -28,18 +28,11 @@ type ClientOption interface { } type clientConfig struct { - // HTTP settings httpClient *http.Client httpHeaders http.Header httpAuth HTTPAuth - // WebSocket options wsDialer *websocket.Dialer - - // RPC handler options - idgen func() ID - batchItemLimit int - batchResponseLimit int } func (cfg *clientConfig) initHeaders() { @@ -111,25 +104,3 @@ func WithHTTPAuth(a HTTPAuth) ClientOption { // Usually, HTTPAuth functions will call h.Set("authorization", "...") to add // auth information to the request. type HTTPAuth func(h http.Header) error - -// WithBatchItemLimit changes the maximum number of items allowed in batch requests. -// -// Note: this option applies when processing incoming batch requests. It does not affect -// batch requests sent by the client. -func WithBatchItemLimit(limit int) ClientOption { - return optionFunc(func(cfg *clientConfig) { - cfg.batchItemLimit = limit - }) -} - -// WithBatchResponseSizeLimit changes the maximum number of response bytes that can be -// generated for batch requests. When this limit is reached, further calls in the batch -// will not be processed. -// -// Note: this option applies when processing incoming batch requests. It does not affect -// batch requests sent by the client. -func WithBatchResponseSizeLimit(sizeLimit int) ClientOption { - return optionFunc(func(cfg *clientConfig) { - cfg.batchResponseLimit = sizeLimit - }) -} diff --git a/rpc/client_test.go b/rpc/client_test.go index 7c96b2d6667b7..a94a54929b31b 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -169,12 +169,10 @@ func TestClientBatchRequest(t *testing.T) { } } -// This checks that, for HTTP connections, the length of batch responses is validated to -// match the request exactly. func TestClientBatchRequest_len(t *testing.T) { b, err := json.Marshal([]jsonrpcMessage{ - {Version: "2.0", ID: json.RawMessage("1"), Result: json.RawMessage(`"0x1"`)}, - {Version: "2.0", ID: json.RawMessage("2"), Result: json.RawMessage(`"0x2"`)}, + {Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)}, + {Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)}, }) if err != nil { t.Fatal("failed to encode jsonrpc message:", err) @@ -187,102 +185,37 @@ func TestClientBatchRequest_len(t *testing.T) { })) t.Cleanup(s.Close) - t.Run("too-few", func(t *testing.T) { - client, err := Dial(s.URL) - if err != nil { - t.Fatal("failed to dial test server:", err) - } - defer client.Close() + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + t.Run("too-few", func(t *testing.T) { batch := []BatchElem{ - {Method: "foo", Result: new(string)}, - {Method: "bar", Result: new(string)}, - {Method: "baz", Result: new(string)}, + {Method: "foo"}, + {Method: "bar"}, + {Method: "baz"}, } ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) defer cancelFn() - - if err := client.BatchCallContext(ctx, batch); err != nil { - t.Fatal("error:", err) - } - for i, elem := range batch[:2] { - if elem.Error != nil { - t.Errorf("expected no error for batch element %d, got %q", i, elem.Error) - } - } - for i, elem := range batch[2:] { - if elem.Error != ErrMissingBatchResponse { - t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) - } + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) } }) t.Run("too-many", func(t *testing.T) { - client, err := Dial(s.URL) - if err != nil { - t.Fatal("failed to dial test server:", err) - } - defer client.Close() - batch := []BatchElem{ - {Method: "foo", Result: new(string)}, + {Method: "foo"}, } ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) defer cancelFn() - - if err := client.BatchCallContext(ctx, batch); err != nil { - t.Fatal("error:", err) - } - for i, elem := range batch[:1] { - if elem.Error != nil { - t.Errorf("expected no error for batch element %d, got %q", i, elem.Error) - } - } - for i, elem := range batch[1:] { - if elem.Error != ErrMissingBatchResponse { - t.Errorf("wrong error %q for batch element %d", elem.Error, i+2) - } + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) } }) } -// This checks that the client can handle the case where the server doesn't -// respond to all requests in a batch. -func TestClientBatchRequestLimit(t *testing.T) { - server := newTestServer() - defer server.Stop() - server.SetBatchLimits(2, 100000) - client := DialInProc(server) - - batch := []BatchElem{ - {Method: "foo"}, - {Method: "bar"}, - {Method: "baz"}, - } - err := client.BatchCall(batch) - if err != nil { - t.Fatal("unexpected error:", err) - } - - // Check that the first response indicates an error with batch size. - var err0 Error - if !errors.As(batch[0].Error, &err0) { - t.Log("error zero:", batch[0].Error) - t.Fatalf("batch elem 0 has wrong error type: %T", batch[0].Error) - } else { - if err0.ErrorCode() != -32600 || err0.Error() != errMsgBatchTooLarge { - t.Fatalf("wrong error on batch elem zero: %v", err0) - } - } - - // Check that remaining response batch elements are reported as absent. - for i, elem := range batch[1:] { - if elem.Error != ErrMissingBatchResponse { - t.Fatalf("batch elem %d has unexpected error: %v", i+1, elem.Error) - } - } -} - func TestClientNotify(t *testing.T) { server := newTestServer() defer server.Stop() @@ -377,7 +310,7 @@ func testClientCancel(transport string, t *testing.T) { _, hasDeadline := ctx.Deadline() t.Errorf("no error for call with %v wait time (deadline: %v)", timeout, hasDeadline) // default: - // t.Logf("got expected error with %v wait time: %v", timeout, err) + // t.Logf("got expected error with %v wait time: %v", timeout, err) } cancel() } @@ -554,8 +487,7 @@ func TestClientSubscriptionUnsubscribeServer(t *testing.T) { defer srv.Stop() // Create the client on the other end of the pipe. - cfg := new(clientConfig) - client, _ := newClient(context.Background(), cfg, func(context.Context) (ServerCodec, error) { + client, _ := newClient(context.Background(), func(context.Context) (ServerCodec, error) { return NewCodec(p2), nil }) defer client.Close() diff --git a/rpc/errors.go b/rpc/errors.go index abb698af75c13..7188332d551eb 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -61,15 +61,12 @@ const ( errcodeDefault = -32000 errcodeNotificationsUnsupported = -32001 errcodeTimeout = -32002 - errcodeResponseTooLarge = -32003 errcodePanic = -32603 errcodeMarshalError = -32603 ) const ( - errMsgTimeout = "request timed out" - errMsgResponseTooLarge = "response too large" - errMsgBatchTooLarge = "batch too large" + errMsgTimeout = "request timed out" ) type methodNotFoundError struct{ method string } diff --git a/rpc/handler.go b/rpc/handler.go index 4f48c7931c651..c2e7d7dc08c60 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -49,19 +49,17 @@ import ( // h.removeRequestOp(op) // timeout, etc. // } type handler struct { - reg *serviceRegistry - unsubscribeCb *callback - idgen func() ID // subscription ID generator - respWait map[string]*requestOp // active client requests - clientSubs map[string]*ClientSubscription // active client subscriptions - callWG sync.WaitGroup // pending call goroutines - rootCtx context.Context // canceled by close() - cancelRoot func() // cancel function for rootCtx - conn jsonWriter // where responses will be sent - log log.Logger - allowSubscribe bool - batchRequestLimit int - batchResponseMaxSize int + reg *serviceRegistry + unsubscribeCb *callback + idgen func() ID // subscription ID generator + respWait map[string]*requestOp // active client requests + clientSubs map[string]*ClientSubscription // active client subscriptions + callWG sync.WaitGroup // pending call goroutines + rootCtx context.Context // canceled by close() + cancelRoot func() // cancel function for rootCtx + conn jsonWriter // where responses will be sent + log log.Logger + allowSubscribe bool subLock sync.Mutex serverSubs map[ID]*Subscription @@ -72,21 +70,19 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, batchRequestLimit, batchResponseMaxSize int) *handler { +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ - reg: reg, - idgen: idgen, - conn: conn, - respWait: make(map[string]*requestOp), - clientSubs: make(map[string]*ClientSubscription), - rootCtx: rootCtx, - cancelRoot: cancelRoot, - allowSubscribe: true, - serverSubs: make(map[ID]*Subscription), - log: log.Root(), - batchRequestLimit: batchRequestLimit, - batchResponseMaxSize: batchResponseMaxSize, + reg: reg, + idgen: idgen, + conn: conn, + respWait: make(map[string]*requestOp), + clientSubs: make(map[string]*ClientSubscription), + rootCtx: rootCtx, + cancelRoot: cancelRoot, + allowSubscribe: true, + serverSubs: make(map[ID]*Subscription), + log: log.Root(), } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) @@ -138,15 +134,16 @@ func (b *batchCallBuffer) write(ctx context.Context, conn jsonWriter) { b.doWrite(ctx, conn, false) } -// respondWithError sends the responses added so far. For the remaining unanswered call -// messages, it responds with the given error. -func (b *batchCallBuffer) respondWithError(ctx context.Context, conn jsonWriter, err error) { +// timeout sends the responses added so far. For the remaining unanswered call +// messages, it sends a timeout error response. +func (b *batchCallBuffer) timeout(ctx context.Context, conn jsonWriter) { b.mutex.Lock() defer b.mutex.Unlock() for _, msg := range b.calls { if !msg.isNotification() { - b.resp = append(b.resp, msg.errorResponse(err)) + resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) + b.resp = append(b.resp, resp) } } b.doWrite(ctx, conn, true) @@ -174,24 +171,17 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { }) return } - // Apply limit on total number of requests. - if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit { - h.startCallProc(func(cp *callProc) { - h.respondWithBatchTooLarge(cp, msgs) - }) - return - } - // Handle non-call messages first. - // Here we need to find the requestOp that sent the request batch. + // Handle non-call messages first: calls := make([]*jsonrpcMessage, 0, len(msgs)) - h.handleResponses(msgs, func(msg *jsonrpcMessage) { - calls = append(calls, msg) - }) + for _, msg := range msgs { + if handled := h.handleImmediate(msg); !handled { + calls = append(calls, msg) + } + } if len(calls) == 0 { return } - // Process calls on a goroutine because they may block indefinitely: h.startCallProc(func(cp *callProc) { var ( @@ -209,12 +199,10 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { if timeout, ok := ContextRequestTimeout(cp.ctx); ok { timer = time.AfterFunc(timeout, func() { cancel() - err := &internalServerError{errcodeTimeout, errMsgTimeout} - callBuffer.respondWithError(cp.ctx, h.conn, err) + callBuffer.timeout(cp.ctx, h.conn) }) } - responseBytes := 0 for { // No need to handle rest of calls if timed out. if cp.ctx.Err() != nil { @@ -226,86 +214,59 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } resp := h.handleCallMsg(cp, msg) callBuffer.pushResponse(resp) - if resp != nil && h.batchResponseMaxSize != 0 { - responseBytes += len(resp.Result) - if responseBytes > h.batchResponseMaxSize { - err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge} - callBuffer.respondWithError(cp.ctx, h.conn, err) - break - } - } } if timer != nil { timer.Stop() } - - h.addSubscriptions(cp.notifiers) callBuffer.write(cp.ctx, h.conn) + h.addSubscriptions(cp.notifiers) for _, n := range cp.notifiers { n.activate() } }) } -func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage) { - resp := errorMessage(&invalidRequestError{errMsgBatchTooLarge}) - // Find the first call and add its "id" field to the error. - // This is the best we can do, given that the protocol doesn't have a way - // of reporting an error for the entire batch. - for _, msg := range batch { - if msg.isCall() { - resp.ID = msg.ID - break - } +// handleMsg handles a single message. +func (h *handler) handleMsg(msg *jsonrpcMessage) { + if ok := h.handleImmediate(msg); ok { + return } - h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true) -} + h.startCallProc(func(cp *callProc) { + var ( + responded sync.Once + timer *time.Timer + cancel context.CancelFunc + ) + cp.ctx, cancel = context.WithCancel(cp.ctx) + defer cancel() -// handleMsg handles a single non-batch message. -func (h *handler) handleMsg(msg *jsonrpcMessage) { - msgs := []*jsonrpcMessage{msg} - h.handleResponses(msgs, func(msg *jsonrpcMessage) { - h.startCallProc(func(cp *callProc) { - h.handleNonBatchCall(cp, msg) - }) - }) -} + // Cancel the request context after timeout and send an error response. Since the + // running method might not return immediately on timeout, we must wait for the + // timeout concurrently with processing the request. + if timeout, ok := ContextRequestTimeout(cp.ctx); ok { + timer = time.AfterFunc(timeout, func() { + cancel() + responded.Do(func() { + resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) + h.conn.writeJSON(cp.ctx, resp, true) + }) + }) + } -func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) { - var ( - responded sync.Once - timer *time.Timer - cancel context.CancelFunc - ) - cp.ctx, cancel = context.WithCancel(cp.ctx) - defer cancel() - - // Cancel the request context after timeout and send an error response. Since the - // running method might not return immediately on timeout, we must wait for the - // timeout concurrently with processing the request. - if timeout, ok := ContextRequestTimeout(cp.ctx); ok { - timer = time.AfterFunc(timeout, func() { - cancel() + answer := h.handleCallMsg(cp, msg) + if timer != nil { + timer.Stop() + } + h.addSubscriptions(cp.notifiers) + if answer != nil { responded.Do(func() { - resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) - h.conn.writeJSON(cp.ctx, resp, true) + h.conn.writeJSON(cp.ctx, answer, false) }) - }) - } - - answer := h.handleCallMsg(cp, msg) - if timer != nil { - timer.Stop() - } - h.addSubscriptions(cp.notifiers) - if answer != nil { - responded.Do(func() { - h.conn.writeJSON(cp.ctx, answer, false) - }) - } - for _, n := range cp.notifiers { - n.activate() - } + } + for _, n := range cp.notifiers { + n.activate() + } + }) } // close cancels all requests except for inflightReq and waits for @@ -388,60 +349,23 @@ func (h *handler) startCallProc(fn func(*callProc)) { }() } -// handleResponse processes method call responses. -func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*jsonrpcMessage)) { - var resolvedops []*requestOp - handleResp := func(msg *jsonrpcMessage) { - op := h.respWait[string(msg.ID)] - if op == nil { - h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) - return - } - resolvedops = append(resolvedops, op) - delete(h.respWait, string(msg.ID)) - - // For subscription responses, start the subscription if the server - // indicates success. EthSubscribe gets unblocked in either case through - // the op.resp channel. - if op.sub != nil { - if msg.Error != nil { - op.err = msg.Error - } else { - op.err = json.Unmarshal(msg.Result, &op.sub.subid) - if op.err == nil { - go op.sub.run() - h.clientSubs[op.sub.subid] = op.sub - } - } - } - - if !op.hadResponse { - op.hadResponse = true - op.resp <- batch - } - } - - for _, msg := range batch { - start := time.Now() - switch { - case msg.isResponse(): - handleResp(msg) - h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "duration", time.Since(start)) - - case msg.isNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { - h.handleSubscriptionResult(msg) - continue - } - handleCall(msg) - - default: - handleCall(msg) +// handleImmediate executes non-call messages. It returns false if the message is a +// call or requires a reply. +func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { + start := time.Now() + switch { + case msg.isNotification(): + if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + h.handleSubscriptionResult(msg) + return true } - } - - for _, op := range resolvedops { - h.removeRequestOp(op) + return false + case msg.isResponse(): + h.handleResponse(msg) + h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "duration", time.Since(start)) + return true + default: + return false } } @@ -457,6 +381,33 @@ func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { } } +// handleResponse processes method call responses. +func (h *handler) handleResponse(msg *jsonrpcMessage) { + op := h.respWait[string(msg.ID)] + if op == nil { + h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) + return + } + delete(h.respWait, string(msg.ID)) + // For normal responses, just forward the reply to Call/BatchCall. + if op.sub == nil { + op.resp <- msg + return + } + // For subscription responses, start the subscription if the server + // indicates success. EthSubscribe gets unblocked in either case through + // the op.resp channel. + defer close(op.resp) + if msg.Error != nil { + op.err = msg.Error + return + } + if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { + go op.sub.run() + h.clientSubs[op.sub.subid] = op.sub + } +} + // handleCallMsg executes a call message and returns the answer. func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { start := time.Now() @@ -465,7 +416,6 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess h.handleCall(ctx, msg) h.log.Debug("Served "+msg.Method, "duration", time.Since(start)) return nil - case msg.isCall(): resp := h.handleCall(ctx, msg) var ctx []interface{} @@ -480,10 +430,8 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess h.log.Debug("Served "+msg.Method, ctx...) } return resp - case msg.hasValidID(): return msg.errorResponse(&invalidRequestError{"invalid request"}) - default: return errorMessage(&invalidRequestError{"invalid request"}) } @@ -503,14 +451,12 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage if callb == nil { return msg.errorResponse(&methodNotFoundError{method: msg.Method}) } - args, err := parsePositionalArguments(msg.Params, callb.argTypes) if err != nil { return msg.errorResponse(&invalidParamsError{err.Error()}) } start := time.Now() answer := h.runMethod(cp.ctx, msg, callb, args) - // Collect the statistics for RPC calls if metrics is enabled. // We only care about pure rpc call. Filter out subscription. if callb != h.unsubscribeCb { @@ -523,7 +469,6 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage rpcServingTimer.UpdateSince(start) updateServeTimeHistogram(msg.Method, answer.Error == nil, time.Since(start)) } - return answer } diff --git a/rpc/http.go b/rpc/http.go index 741fa1c0eb4f6..8712f99610b5a 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -139,7 +139,7 @@ func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { var cfg clientConfig cfg.httpClient = client fn := newClientTransportHTTP(endpoint, &cfg) - return newClient(context.Background(), &cfg, fn) + return newClient(context.Background(), fn) } func newClientTransportHTTP(endpoint string, cfg *clientConfig) reconnectFunc { @@ -176,12 +176,11 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e } defer respBody.Close() - var resp jsonrpcMessage - batch := [1]*jsonrpcMessage{&resp} - if err := json.NewDecoder(respBody).Decode(&resp); err != nil { + var respmsg jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { return err } - op.resp <- batch[:] + op.resp <- &respmsg return nil } @@ -192,12 +191,16 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr return err } defer respBody.Close() - - var respmsgs []*jsonrpcMessage + var respmsgs []jsonrpcMessage if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } - op.resp <- respmsgs + if len(respmsgs) != len(msgs) { + return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult) + } + for i := 0; i < len(respmsgs); i++ { + op.resp <- &respmsgs[i] + } return nil } diff --git a/rpc/inproc.go b/rpc/inproc.go index 306974e04b81f..fbe9a40ceca9f 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -24,8 +24,7 @@ import ( // DialInProc attaches an in-process connection to the given RPC server. func DialInProc(handler *Server) *Client { initctx := context.Background() - cfg := new(clientConfig) - c, _ := newClient(initctx, cfg, func(context.Context) (ServerCodec, error) { + c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { p1, p2 := net.Pipe() go handler.ServeCodec(NewCodec(p1), 0) return NewCodec(p2), nil diff --git a/rpc/ipc.go b/rpc/ipc.go index a08245b270891..d9e0de62e877d 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -46,8 +46,7 @@ func (s *Server) ServeListener(l net.Listener) error { // The context is used for the initial connection establishment. It does not // affect subsequent interactions with the client. func DialIPC(ctx context.Context, endpoint string) (*Client, error) { - cfg := new(clientConfig) - return newClient(ctx, cfg, newClientTransportIPC(endpoint)) + return newClient(ctx, newClientTransportIPC(endpoint)) } func newClientTransportIPC(endpoint string) reconnectFunc { diff --git a/rpc/server.go b/rpc/server.go index 2742adf07b823..089bbb1fd5d2b 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -46,11 +46,9 @@ type Server struct { services serviceRegistry idgen func() ID - mutex sync.Mutex - codecs map[ServerCodec]struct{} - run atomic.Bool - batchItemLimit int - batchResponseLimit int + mutex sync.Mutex + codecs map[ServerCodec]struct{} + run atomic.Bool } // NewServer creates a new server instance with no registered handlers. @@ -67,17 +65,6 @@ func NewServer() *Server { return server } -// SetBatchLimits sets limits applied to batch requests. There are two limits: 'itemLimit' -// is the maximum number of items in a batch. 'maxResponseSize' is the maximum number of -// response bytes across all requests in a batch. -// -// This method should be called before processing any requests via ServeCodec, ServeHTTP, -// ServeListener etc. -func (s *Server) SetBatchLimits(itemLimit, maxResponseSize int) { - s.batchItemLimit = itemLimit - s.batchResponseLimit = maxResponseSize -} - // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the @@ -99,12 +86,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { } defer s.untrackCodec(codec) - cfg := &clientConfig{ - idgen: s.idgen, - batchItemLimit: s.batchItemLimit, - batchResponseLimit: s.batchResponseLimit, - } - c := initClient(codec, &s.services, cfg) + c := initClient(codec, s.idgen, &s.services) <-codec.closed() c.Close() } @@ -136,7 +118,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { return } - h := newHandler(ctx, codec, s.idgen, &s.services, s.batchItemLimit, s.batchResponseLimit) + h := newHandler(ctx, codec, s.idgen, &s.services) h.allowSubscribe = false defer h.close(io.EOF, nil) diff --git a/rpc/server_test.go b/rpc/server_test.go index 5d3929dfdc692..f1a9b3d5cd4ac 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -70,7 +70,6 @@ func TestServer(t *testing.T) { func runTestScript(t *testing.T, file string) { server := newTestServer() - server.SetBatchLimits(4, 100000) content, err := os.ReadFile(file) if err != nil { t.Fatal(err) @@ -153,41 +152,3 @@ func TestServerShortLivedConn(t *testing.T) { } } } - -func TestServerBatchResponseSizeLimit(t *testing.T) { - server := newTestServer() - defer server.Stop() - server.SetBatchLimits(100, 60) - var ( - batch []BatchElem - client = DialInProc(server) - ) - for i := 0; i < 5; i++ { - batch = append(batch, BatchElem{ - Method: "test_echo", - Args: []any{"x", 1}, - Result: new(echoResult), - }) - } - if err := client.BatchCall(batch); err != nil { - t.Fatal("error sending batch:", err) - } - for i := range batch { - // We expect the first two queries to be ok, but after that the size limit takes effect. - if i < 2 { - if batch[i].Error != nil { - t.Fatalf("batch elem %d has unexpected error: %v", i, batch[i].Error) - } - continue - } - // After two, we expect an error. - re, ok := batch[i].Error.(Error) - if !ok { - t.Fatalf("batch elem %d has wrong error: %v", i, batch[i].Error) - } - wantedCode := errcodeResponseTooLarge - if re.ErrorCode() != wantedCode { - t.Errorf("batch elem %d wrong error code, have %d want %d", i, re.ErrorCode(), wantedCode) - } - } -} diff --git a/rpc/stdio.go b/rpc/stdio.go index 084e5f0700ced..ae32db26ef1c7 100644 --- a/rpc/stdio.go +++ b/rpc/stdio.go @@ -32,8 +32,7 @@ func DialStdIO(ctx context.Context) (*Client, error) { // DialIO creates a client which uses the given IO channels func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) { - cfg := new(clientConfig) - return newClient(ctx, cfg, newClientTransportIO(in, out)) + return newClient(ctx, newClientTransportIO(in, out)) } func newClientTransportIO(in io.Reader, out io.Writer) reconnectFunc { diff --git a/rpc/testdata/invalid-batch-toolarge.js b/rpc/testdata/invalid-batch-toolarge.js deleted file mode 100644 index 218fea58aaac2..0000000000000 --- a/rpc/testdata/invalid-batch-toolarge.js +++ /dev/null @@ -1,13 +0,0 @@ -// This file checks the behavior of the batch item limit code. -// In tests, the batch item limit is set to 4. So to trigger the error, -// all batches in this file have 5 elements. - -// For batches that do not contain any calls, a response message with "id" == null -// is returned. - ---> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] -<-- [{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"batch too large"}}] - -// For batches with at least one call, the call's "id" is used. ---> [{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","id":3,"method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]},{"jsonrpc":"2.0","method":"test_echo","params":["x",99]}] -<-- [{"jsonrpc":"2.0","id":3,"error":{"code":-32600,"message":"batch too large"}}] diff --git a/rpc/websocket.go b/rpc/websocket.go index b1213fdfa663e..889562d1ab557 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -197,7 +197,7 @@ func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, diale if err != nil { return nil, err } - return newClient(ctx, cfg, connect) + return newClient(ctx, connect) } // DialWebsocket creates a new RPC client that communicates with a JSON-RPC server @@ -214,7 +214,7 @@ func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error if err != nil { return nil, err } - return newClient(ctx, cfg, connect) + return newClient(ctx, connect) } func newClientTransportWS(endpoint string, cfg *clientConfig) (reconnectFunc, error) {