Skip to content

Commit

Permalink
refactor: make NewClient return an error
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 21, 2023
1 parent 41d44f4 commit 9bd25ad
Show file tree
Hide file tree
Showing 22 changed files with 254 additions and 154 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ build/
dist/
__pycache__/
.vscode/
.idea/
108 changes: 61 additions & 47 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ import (
// GetEndpoints returns the available endpoint descriptions for the server.
func GetEndpoints(ctx context.Context, endpoint string, opts ...Option) ([]*ua.EndpointDescription, error) {
opts = append(opts, AutoReconnect(false))
c := NewClient(endpoint, opts...)
c, err := NewClient(endpoint, opts...)
if err != nil {
return nil, err
}
if err := c.Dial(ctx); err != nil {
return nil, err
}
defer c.Close(ctx)
defer func(c *Client, ctx context.Context) {
if err := c.Close(ctx); err != nil {
// TODO(sruehl): log error
}
}(c, ctx)
res, err := c.GetEndpoints(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -133,15 +140,6 @@ type Client struct {

// monitorOnce ensures only one connection monitor is running
monitorOnce sync.Once

// cfgerr contains an error that was captured in ApplyConfig.
// Since the API does not allow to bubble the error up in NewClient
// and we don't want to break existing code right away we carry the
// error here and bubble it up during Dial and Connect.
//
// Note: Starting with v0.5 NewClient will return the error and this
// variable needs to be removed.
cfgerr error
}

// NewClient creates a new Client.
Expand All @@ -155,10 +153,11 @@ type Client struct {
// #Option for details.
//
// https://godoc.org/github.com/gopcua/opcua#Option
//
// Note: Starting with v0.5 this function will return an error.
func NewClient(endpoint string, opts ...Option) *Client {
cfg := ApplyConfig(opts...)
func NewClient(endpoint string, opts ...Option) (*Client, error) {
cfg, err := ApplyConfig(opts...)
if err != nil {
return nil, errors.Errorf("error applying config %w", err)
}
c := Client{
endpointURL: endpoint,
cfg: cfg,
Expand All @@ -167,15 +166,14 @@ func NewClient(endpoint string, opts ...Option) *Client {
pendingAcks: make([]*ua.SubscriptionAcknowledgement, 0),
pausech: make(chan struct{}, 2),
resumech: make(chan struct{}, 2),
cfgerr: cfg.Error(), // todo(fs): remove with v0.5.0 and return the error
}
c.pauseSubscriptions(context.Background())
c.setPublishTimeout(uasc.MaxTimeout)
c.setState(Closed)
c.setSecureChannel(nil)
c.setSession(nil)
c.setNamespaces([]string{})
return &c
return &c, nil
}

// reconnectAction is a list of actions for the client reconnection logic.
Expand All @@ -194,11 +192,6 @@ const (

// Connect establishes a secure channel and creates a new session.
func (c *Client) Connect(ctx context.Context) error {
// todo(fs): remove with v0.5.0
if c.cfgerr != nil {
return c.cfgerr
}

// todo(fs): the secure channel is 'nil' during a re-connect
// todo(fs): but we expect this method to be called once during startup
// todo(fs): so this is probably safe
Expand All @@ -215,14 +208,18 @@ func (c *Client) Connect(ctx context.Context) error {

s, err := c.CreateSession(ctx, c.cfg.session)
if err != nil {
c.Close(ctx)
if err := c.Close(ctx); err != nil {
// TODO(sruehl): log error
}
stats.RecordError(err)

return err
}

if err := c.ActivateSession(ctx, s); err != nil {
c.Close(ctx)
if err := c.Close(ctx); err != nil {
// TODO(sruehl): log error
}
stats.RecordError(err)

return err
Expand All @@ -241,7 +238,9 @@ func (c *Client) Connect(ctx context.Context) error {
// todo(fs): see the discussion in https://github.com/gopcua/opcua/pull/512
// todo(fs): and you should find a commit that implements this option.
if err := c.UpdateNamespaces(ctx); err != nil {
c.Close(ctx)
if err := c.Close(ctx); err != nil {
// TODO(sruehl): log error
}
stats.RecordError(err)

return err
Expand Down Expand Up @@ -357,9 +356,13 @@ func (c *Client) monitor(ctx context.Context) {
// todo(fs): down.
//
// https://github.com/gopcua/opcua/pull/470
c.conn.Close()
if err := c.conn.Close(); err != nil {
// TODO(sruehl): log error
}
if sc := c.SecureChannel(); sc != nil {
sc.Close()
if err := sc.Close(); err != nil {
// TODO(sruehl): log error
}
c.setSecureChannel(nil)
}

Expand Down Expand Up @@ -503,16 +506,16 @@ func (c *Client) monitor(ctx context.Context) {
// populated in the previous step.

activeSubs = 0
for _, id := range subsToRepublish {
if err := c.republishSubscription(ctx, id, availableSeqs[id]); err != nil {
dlog.Printf("republish of subscription %d failed", id)
subsToRecreate = append(subsToRecreate, id)
for _, subId := range subsToRepublish {
if err := c.republishSubscription(ctx, subId, availableSeqs[subId]); err != nil {
dlog.Printf("republish of subscription %d failed", subId)
subsToRecreate = append(subsToRecreate, subId)
}
activeSubs++
}

for _, id := range subsToRecreate {
if err := c.recreateSubscription(ctx, id); err != nil {
for _, subId := range subsToRecreate {
if err := c.recreateSubscription(ctx, subId); err != nil {
dlog.Printf("recreate subscripitions failed: %v", err)
action = recreateSession
continue
Expand Down Expand Up @@ -555,11 +558,6 @@ func (c *Client) monitor(ctx context.Context) {

// Dial establishes a secure channel.
func (c *Client) Dial(ctx context.Context) error {
// todo(fs): remove with v0.5.0
if c.cfgerr != nil {
return c.cfgerr
}

stats.Client().Add("Dial", 1)

if c.SecureChannel() != nil {
Expand All @@ -575,12 +573,16 @@ func (c *Client) Dial(ctx context.Context) error {

sc, err := uasc.NewSecureChannel(c.endpointURL, c.conn, c.cfg.sechan, c.sechanErr)
if err != nil {
c.conn.Close()
if err := c.conn.Close(); err != nil {
// TODO(sruehl): log error
}
return err
}

if err := sc.Open(ctx); err != nil {
c.conn.Close()
if err := c.conn.Close(); err != nil {
// TODO(sruehl): log error
}
return err
}
c.setSecureChannel(sc)
Expand All @@ -594,14 +596,18 @@ func (c *Client) Close(ctx context.Context) error {

// try to close the session but ignore any error
// so that we close the underlying channel and connection.
c.CloseSession(ctx)
if err := c.CloseSession(ctx); err != nil {
// TODO(sruehl): log error
}
c.setState(Closed)

if c.mcancel != nil {
c.mcancel()
}
if sc := c.SecureChannel(); sc != nil {
sc.Close()
if err := sc.Close(); err != nil {
// TODO(sruehl): log error
}
c.setSecureChannel(nil)
}

Expand All @@ -616,7 +622,9 @@ func (c *Client) Close(ctx context.Context) error {
// close the connection but ignore the error since there isn't
// anything we can do about it anyway
if c.conn != nil {
c.conn.Close()
if err := c.conn.Close(); err != nil {
// TODO(sruehl): log error
}
}

return nil
Expand Down Expand Up @@ -754,11 +762,15 @@ func (c *Client) CreateSession(ctx context.Context, cfg *uasc.SessionConfig) (*S
// Ensure we have a valid identity token that the server will accept before trying to activate a session
if c.cfg.session.UserIdentityToken == nil {
opt := AuthAnonymous()
opt(c.cfg)
if err := opt(c.cfg); err != nil {
// TODO(sruehl): log error
}

p := anonymousPolicyID(res.ServerEndpoints)
opt = AuthPolicyID(p)
opt(c.cfg)
if err := opt(c.cfg); err != nil {
// TODO(sruehl): log error
}
}

s = &Session{
Expand Down Expand Up @@ -862,7 +874,9 @@ func (c *Client) ActivateSession(ctx context.Context, s *Session) error {
// We decided not to check the error of CloseSession() since we
// can't do much about it anyway and it creates a race in the
// re-connection logic.
c.CloseSession(ctx)
if err := c.CloseSession(ctx); err != nil {
// TODO(sruehl): log error
}

c.setSession(s)
return nil
Expand Down Expand Up @@ -896,7 +910,7 @@ func (c *Client) closeSession(ctx context.Context, s *Session) error {
// DetachSession removes the session from the client without closing it. The
// caller is responsible to close or re-activate the session. If the client
// does not have an active session the function returns no error.
func (c *Client) DetachSession(ctx context.Context) (*Session, error) {
func (c *Client) DetachSession(_ context.Context) (*Session, error) {
stats.Client().Add("DetachSession", 1)
s := c.Session()
c.setSession(nil)
Expand Down
6 changes: 4 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package opcua

import (
"context"
"github.com/stretchr/testify/assert"
"testing"

"github.com/pascaldekloe/goe/verify"
Expand All @@ -11,8 +12,9 @@ import (
)

func TestClient_Send_DoesNotPanicWhenDisconnected(t *testing.T) {
c := NewClient("opc.tcp://example.com:4840")
err := c.Send(context.Background(), &ua.ReadRequest{}, func(i interface{}) error {
c, err := NewClient("opc.tcp://example.com:4840")
assert.NoError(t, err)
err = c.Send(context.Background(), &ua.ReadRequest{}, func(i interface{}) error {
return nil
})
verify.Values(t, "", err, ua.StatusBadServerNotConnected)
Expand Down

0 comments on commit 9bd25ad

Please sign in to comment.