Skip to content

Commit fbe5294

Browse files
authoredJul 17, 2024··
fix: connect after client.end not working (#1902)
* fix: publish after connect manually * feat: add new cover tests * fix: lint style * feat: remove setTimeout from test * feat: reduce indented in test * feat: close clock and client after test
1 parent cd0b044 commit fbe5294

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed
 

‎src/lib/client.ts

+12
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,13 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
732732
this.log('connect :: calling method to clear reconnect')
733733
this._clearReconnect()
734734

735+
if (this.disconnected && !this.reconnecting) {
736+
this.incomingStore = this.options.incomingStore || new Store()
737+
this.outgoingStore = this.options.outgoingStore || new Store()
738+
this.disconnecting = false
739+
this.disconnected = false
740+
}
741+
735742
this.log(
736743
'connect :: using streamBuilder provided to client to create stream',
737744
)
@@ -1459,6 +1466,11 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
14591466
})
14601467
if (this._deferredReconnect) {
14611468
this._deferredReconnect()
1469+
} else if (
1470+
this.options.reconnectPeriod === 0 ||
1471+
this.options.manualConnect
1472+
) {
1473+
this.disconnecting = false
14621474
}
14631475
}
14641476

‎test/client.ts

+166
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,172 @@ describe('MqttClient', () => {
615615
},
616616
)
617617

618+
describe('connect manually', () => {
619+
it(
620+
'should not throw an error when publish after second connect',
621+
{
622+
timeout: 10000,
623+
},
624+
async function _test(t) {
625+
const clock = useFakeTimers({
626+
shouldClearNativeTimers: true,
627+
toFake: ['setTimeout'],
628+
})
629+
630+
t.after(async () => {
631+
clock.restore()
632+
if (client) {
633+
await client.endAsync(true)
634+
}
635+
})
636+
637+
const fail = await new Promise<boolean>((resolveParent) => {
638+
let countConnects = 0
639+
640+
const publishInterval = (
641+
repetible: number,
642+
timeout: number,
643+
callback: (threwError: boolean) => void,
644+
): void => {
645+
const method = () =>
646+
new Promise<boolean>((resolve) => {
647+
client.publish('test', 'test', (err) => {
648+
if (
649+
err?.message.toLocaleLowerCase() ===
650+
'client disconnecting'
651+
) {
652+
resolve(true)
653+
} else {
654+
resolve(false)
655+
}
656+
})
657+
})
658+
659+
if (repetible <= 0) {
660+
callback(false)
661+
return
662+
}
663+
664+
method().then((threwError) => {
665+
clock.tick(timeout)
666+
667+
if (threwError) {
668+
callback(true)
669+
return
670+
}
671+
672+
publishInterval(repetible - 1, timeout, callback)
673+
})
674+
}
675+
676+
client = mqtt.connect(config)
677+
678+
client.on('connect', () => {
679+
++countConnects
680+
681+
const intervalRepetible = 4
682+
const intervalTimeout = 250
683+
const connectTimeout =
684+
intervalRepetible * intervalTimeout
685+
686+
publishInterval(
687+
intervalRepetible,
688+
intervalTimeout,
689+
(threwError) => {
690+
if (countConnects === 2) {
691+
resolveParent(threwError)
692+
}
693+
},
694+
)
695+
696+
if (countConnects === 1) {
697+
clock.setTimeout(() => {
698+
client.end(() => client.connect())
699+
}, connectTimeout)
700+
}
701+
})
702+
})
703+
704+
assert.isFalse(fail, 'disconnecting variable was not reset')
705+
},
706+
)
707+
708+
it(
709+
'reset disconnecting variable to false after disconnect when option reconnectPeriod=0',
710+
{
711+
timeout: 10000,
712+
},
713+
async function _test(t) {
714+
client = await mqtt.connectAsync({
715+
...config,
716+
reconnectPeriod: 0,
717+
})
718+
719+
assert.isFalse(
720+
client.disconnecting,
721+
'disconnecting should be false after connect',
722+
)
723+
724+
const endPromise = client.endAsync()
725+
726+
assert.isTrue(
727+
client.disconnecting,
728+
'disconnecting should be true processing end',
729+
)
730+
731+
await endPromise
732+
733+
assert.isFalse(
734+
client.disconnecting,
735+
'disconnecting should be false after end',
736+
)
737+
},
738+
)
739+
740+
it(
741+
'reset disconnecting variable to false after disconnect when option manualConnect=true',
742+
{
743+
timeout: 10000,
744+
},
745+
async function _test(t) {
746+
client = mqtt.connect({
747+
...config,
748+
manualConnect: true,
749+
})
750+
751+
await new Promise((resolve, reject) => {
752+
client
753+
.connect()
754+
.on('error', (err) => {
755+
reject(err)
756+
})
757+
.once('connect', () => {
758+
resolve(undefined)
759+
})
760+
})
761+
762+
assert.isFalse(
763+
client.disconnecting,
764+
'disconnecting should be false after connect',
765+
)
766+
767+
const endPromise = client.endAsync()
768+
769+
assert.isTrue(
770+
client.disconnecting,
771+
'disconnecting should be true processing end',
772+
)
773+
774+
await endPromise
775+
776+
assert.isFalse(
777+
client.disconnecting,
778+
'disconnecting should be false after end',
779+
)
780+
},
781+
)
782+
})
783+
618784
describe('async methods', () => {
619785
it(
620786
'connect-subscribe-unsubscribe-end',

0 commit comments

Comments
 (0)
Please sign in to comment.