Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: aperturerobotics/starpc
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.33.7
Choose a base ref
...
head repository: aperturerobotics/starpc
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.33.8
Choose a head ref
  • 3 commits
  • 6 files changed
  • 1 contributor

Commits on Aug 2, 2024

  1. fix: writer cannot be nil, close writer on client side

    Signed-off-by: Christian Stewart <christian@aperture.us>
    paralin committed Aug 2, 2024

    Verified

    This commit was signed with the committer’s verified signature.
    chaospuppy Tim Seagren
    Copy the full SHA
    0a75ea3 View commit details

Commits on Aug 3, 2024

  1. fix: check if writer is nil when closing client rpc

    Signed-off-by: Christian Stewart <christian@aperture.us>
    paralin committed Aug 3, 2024

    Verified

    This commit was signed with the committer’s verified signature.
    chaospuppy Tim Seagren
    Copy the full SHA
    603c6b1 View commit details

Commits on Aug 5, 2024

  1. release: v0.33.8

    Signed-off-by: Christian Stewart <christian@aperture.us>
    paralin committed Aug 5, 2024

    Verified

    This commit was signed with the committer’s verified signature.
    chaospuppy Tim Seagren
    Copy the full SHA
    589b77f View commit details
Showing with 23 additions and 20 deletions.
  1. +1 −1 package.json
  2. +9 −5 srpc/client-rpc.go
  3. +7 −1 srpc/client.go
  4. +3 −13 srpc/common-rpc.go
  5. +2 −0 srpc/errors.go
  6. +1 −0 srpc/server-rpc.go
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "starpc",
"version": "0.33.7",
"version": "0.33.8",
"description": "Streaming protobuf RPC service protocol over any two-way channel.",
"license": "MIT",
"author": {
14 changes: 9 additions & 5 deletions srpc/client-rpc.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,10 @@ func NewClientRPC(ctx context.Context, service, method string) *ClientRPC {
// Start sets the writer and writes the MsgSend message.
// must only be called once!
func (r *ClientRPC) Start(writer PacketWriter, writeFirstMsg bool, firstMsg []byte) error {
if writer == nil {
return ErrNilWriter
}

if err := r.ctx.Err(); err != nil {
r.ctxCancel()
_ = writer.Close()
@@ -104,11 +108,11 @@ func (r *ClientRPC) HandleCallStart(pkt *CallStart) error {

// Close releases any resources held by the ClientRPC.
func (r *ClientRPC) Close() {
if r.writer != nil {
_ = r.WriteCallCancel()
}

r.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
r.closeLocked(broadcast)
// call did not start yet if writer is nil.
if r.writer != nil {
_ = r.WriteCallCancel()
r.closeLocked(broadcast)
}
})
}
8 changes: 7 additions & 1 deletion srpc/client.go
Original file line number Diff line number Diff line change
@@ -51,6 +51,7 @@ func (c *client) ExecCall(ctx context.Context, service, method string, in, out M
if err != nil {
return err
}

if err := clientRPC.Start(writer, true, firstMsg); err != nil {
return err
}
@@ -63,6 +64,7 @@ func (c *client) ExecCall(ctx context.Context, service, method string, in, out M
if err := out.UnmarshalVT(msg); err != nil {
return errors.Wrap(ErrInvalidMessage, err.Error())
}

return nil
}

@@ -83,11 +85,15 @@ func (c *client) NewStream(ctx context.Context, service, method string, firstMsg
if err != nil {
return nil, err
}

if err := clientRPC.Start(writer, firstMsg != nil, firstMsgData); err != nil {
return nil, err
}

return NewMsgStream(ctx, clientRPC, clientRPC.ctxCancel), nil
return NewMsgStream(ctx, clientRPC, func() {
clientRPC.ctxCancel()
_ = writer.Close()
}), nil
}

// _ is a type assertion
16 changes: 3 additions & 13 deletions srpc/common-rpc.go
Original file line number Diff line number Diff line change
@@ -116,9 +116,6 @@ func (c *commonRPC) ReadOne() ([]byte, error) {

// WriteCallData writes a call data packet.
func (c *commonRPC) WriteCallData(data []byte, complete bool, err error) error {
if c.writer == nil {
return ErrCompleted
}
outPkt := NewCallDataPacket(data, len(data) == 0, false, nil)
return c.writer.WritePacket(outPkt)
}
@@ -131,9 +128,7 @@ func (c *commonRPC) HandleStreamClose(closeErr error) {
}
c.dataClosed = true
c.ctxCancel()
if c.writer != nil {
_ = c.writer.Close()
}
_ = c.writer.Close()
broadcast()
})
}
@@ -175,10 +170,7 @@ func (c *commonRPC) HandleCallData(pkt *CallData) error {

// WriteCallCancel writes a call cancel packet.
func (c *commonRPC) WriteCallCancel() error {
if c.writer != nil {
return c.writer.WritePacket(NewCallCancelPacket())
}
return nil
return c.writer.WritePacket(NewCallCancelPacket())
}

// closeLocked releases resources held by the RPC.
@@ -187,9 +179,7 @@ func (c *commonRPC) closeLocked(broadcast func()) {
if c.remoteErr == nil {
c.remoteErr = context.Canceled
}
if c.writer != nil {
_ = c.writer.Close()
}
_ = c.writer.Close()
broadcast()
c.ctxCancel()
}
2 changes: 2 additions & 0 deletions srpc/errors.go
Original file line number Diff line number Diff line change
@@ -19,4 +19,6 @@ var (
ErrEmptyServiceID = errors.New("service id empty")
// ErrNoAvailableClients is returned if no clients were available.
ErrNoAvailableClients = errors.New("no available rpc clients")
// ErrNilWriter is returned if the rpc writer is nil.
ErrNilWriter = errors.New("writer cannot be nil")
)
1 change: 1 addition & 0 deletions srpc/server-rpc.go
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@ func (r *ServerRPC) HandleCallStart(pkt *CallStart) error {

// invokeRPC invokes the RPC after CallStart is received.
func (r *ServerRPC) invokeRPC(serviceID, methodID string) {
// on the server side, the writer is closed by invokeRPC.
strm := NewMsgStream(r.ctx, r, r.ctxCancel)
ok, err := r.invoker.InvokeMethod(serviceID, methodID, strm)
if err == nil && !ok {