Skip to content

Commit

Permalink
set limit when init client
Browse files Browse the repository at this point in the history
  • Loading branch information
mmsqe committed Feb 16, 2023
1 parent d7c8673 commit 7fd2b77
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
37 changes: 22 additions & 15 deletions rpc/client.go
Expand Up @@ -132,7 +132,8 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {
}

// SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls
func (c *Client) SetBatchLimits(limit int, size int) {
// And update non-http connection with limit
func (c *Client) SetBatchLimits(limit, size int) {
c.batchRequestLimit = limit
c.batchResponseMaxSize = size
select {
Expand Down Expand Up @@ -254,29 +255,35 @@ func newClient(initctx context.Context, connect reconnectFunc) (*Client, error)
return c, nil
}

func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
func initClientWithBatchLimits(conn ServerCodec, idgen func() ID, services *serviceRegistry, limit, size int) *Client {
_, isHTTP := conn.(*httpConn)
c := &Client{
isHTTP: isHTTP,
idgen: idgen,
services: services,
writeConn: conn,
close: make(chan struct{}),
closing: make(chan struct{}),
didClose: make(chan struct{}),
reconnected: make(chan ServerCodec),
readOp: make(chan readOp),
readErr: make(chan error),
reqInit: make(chan *requestOp),
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
isHTTP: isHTTP,
idgen: idgen,
services: services,
writeConn: conn,
close: make(chan struct{}),
closing: make(chan struct{}),
didClose: make(chan struct{}),
reconnected: make(chan ServerCodec),
readOp: make(chan readOp),
readErr: make(chan error),
reqInit: make(chan *requestOp),
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
batchRequestLimit: limit,
batchResponseMaxSize: size,
}
if !isHTTP {
go c.dispatch(conn)
}
return c
}

func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
return initClientWithBatchLimits(conn, idgen, services, 0, 0)
}

// RegisterName creates a service for the given receiver type under the given name. When no
// methods on the given receiver match the criteria to be either a RPC method or a
// subscription an error is returned. Otherwise a new service is created and added to the
Expand Down
5 changes: 2 additions & 3 deletions rpc/server.go
Expand Up @@ -68,7 +68,7 @@ func NewServer() *Server {
}

// SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls
func (s *Server) SetBatchLimits(limit int, size int) {
func (s *Server) SetBatchLimits(limit, size int) {
s.batchRequestLimit = limit
s.batchResponseMaxSize = size
}
Expand All @@ -94,8 +94,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
}
defer s.untrackCodec(codec)

c := initClient(codec, s.idgen, &s.services)
c.SetBatchLimits(s.batchRequestLimit, s.batchResponseMaxSize)
c := initClientWithBatchLimits(codec, s.idgen, &s.services, s.batchRequestLimit, s.batchResponseMaxSize)
<-codec.closed()
c.Close()
}
Expand Down

0 comments on commit 7fd2b77

Please sign in to comment.