Skip to content

Commit

Permalink
Autopaho: Call error call back a maximum of once per connection
Browse files Browse the repository at this point in the history
autoPaho had the potential to call the error call beck multiple times in relation to one event.
This mirrored the paho functionality but does not make much sense from an end user perspective,
so this change prevents multiple calls to OnClientError in relation to a single connection.
We also prevent errors from being logger when the user has called Disconnect (it's not an error
if it's the result of a request) and add a test.

closes #157
  • Loading branch information
MattBrittan committed Aug 9, 2023
2 parents fc7208e + 86badee commit 21b4033
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
3 changes: 2 additions & 1 deletion autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, e

var err error
select {
case err = <-errChan: // Message on error channel indicates connection has (or will) drop.
case err = <-errChan: // Message on the error channel indicates connection has (or will) drop.
case <-innerCtx.Done():
eh.shutdown() // Prevent any errors triggered by closure of context from reaching user
// As the connection is up, we call disconnect to shut things down cleanly
if err = c.cli.Disconnect(&paho.Disconnect{ReasonCode: 0}); err != nil {
cfg.Debug.Printf("disconnect returned error: %s\n", err)
Expand Down
32 changes: 22 additions & 10 deletions autopaho/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
// errorHandler provides the onClientError callback function that will be called by the Paho library. The sole aim
// of this is to pass a single error onto the error channel (the library may send multiple errors; only the first
// will be processed).
// The callback userOnClientError will be called a maximum of one time. If userOnServerDisconnect is called, then
// userOnClientError will not be called (but there is a small chance that userOnClientError will be called followed
// by userOnServerDisconnect (if we encounter an error sending but there is a DISCONNECT in the queue).
type errorHandler struct {
debug paho.Logger

Expand All @@ -20,15 +23,21 @@ type errorHandler struct {
userOnServerDisconnect func(*paho.Disconnect) // User provided OnServerDisconnect function
}

// shutdown prevents any further calls from emitting a message
func (e *errorHandler) shutdown() {
e.mu.Lock()
defer e.mu.Unlock()
e.errChan = nil
}

// onClientError called by the paho library when an error occurs. We assume that the error is always fatal
func (e *errorHandler) onClientError(err error) {
e.handleError(err)
if e.userOnClientError != nil {
if e.handleError(err) && e.userOnClientError != nil {
go e.userOnClientError(err)
}
}

// onClientError called by the paho library when the server requests a disconnection (for example as part of a
// onClientError called by the paho library when the server requests a disconnection (for example, as part of a
// clean broker shutdown). We want to begin attempting to reconnect when this occurs (and pass a detectable error
// to the user)
func (e *errorHandler) onServerDisconnect(d *paho.Disconnect) {
Expand All @@ -39,16 +48,19 @@ func (e *errorHandler) onServerDisconnect(d *paho.Disconnect) {
}

// handleError ensures that only a single error is sent to the channel (all errors go to the users OnClientError function)
func (e *errorHandler) handleError(err error) {
// Returns true if the error was sent to the channel (i.e. this is the first error we have seen)
func (e *errorHandler) handleError(err error) bool {
e.mu.Lock()
defer e.mu.Unlock()
if e.errChan != nil {
errChan := e.errChan // prevent any chance of deadlock with concurrent call to e.shutdown
e.errChan = nil
e.mu.Unlock()
if errChan != nil {
e.debug.Printf("received error: %s", err)
e.errChan <- err
e.errChan = nil
} else {
e.debug.Printf("received extra error: %s", err)
errChan <- err
return true
}
e.debug.Printf("received extra error: %s", err)
return false
}

// DisconnectError will be passed when the server requests disconnection (allows this error type to be detected)
Expand Down
5 changes: 5 additions & 0 deletions autopaho/internal/testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func New(logger Logger) *Instance {
}
}

// Connected returns true if the test server has an active connection with the client
func (i *Instance) Connected() bool {
return i.connected.Load()
}

// Connect establishes a connection to the test broker
// Note that this can fail!
// Returns a net.Conn (to pass to paho), a channel that will be closed when connection has shutdown and
Expand Down

0 comments on commit 21b4033

Please sign in to comment.