Skip to content

Commit 661c30a

Browse files
MaximoLiberatarobertsLando
andauthoredAug 8, 2024··
fix(test): close all open connections in abstract_client test (#1917)
* fix: tets hang up * fix(test): close open connections * test: catch error * feat: move clean_method class to help * fix: hang tests * fix: lint style * fix: teardown helper functionality * docs: change examples according new changes * fix(test): close all open connections in abstract_client test * fix(lint): rename variable * fix(test): prevent hanging of some tests * test: let the responsibility to beforeEachExec method of closing the open connections * lint: fix style * fix(test): do not listen connect event * Revert "fix(test): do not listen connect event" This reverts commit 2dc39f2. * fix(test): do not for close client connection * fix(test): hanging by clock * test: remove unnecesary timeout * fix(typo): rename variable --------- Co-authored-by: Daniel Lando <daniel.sorridi@gmail.com>
1 parent 93f4482 commit 661c30a

File tree

2 files changed

+238
-177
lines changed

2 files changed

+238
-177
lines changed
 

‎test/abstract_client.ts

+196-169
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import sinon from 'sinon'
66
import fs from 'fs'
77
import levelStore from 'mqtt-level-store'
88
import Store from '../src/lib/store'
9-
import serverBuilder from './server_helpers_for_client_tests'
9+
import serverBuilderFn from './server_helpers_for_client_tests'
1010
import handlePubrel from '../src/lib/handlers/pubrel'
1111
import TeardownHelper from './helpers/TeardownHelper'
1212
import handle from '../src/lib/handlers/index'
@@ -50,16 +50,23 @@ const fakeTimersOptions = {
5050

5151
export default function abstractTest(server, config, ports) {
5252
const version = config.protocolVersion || 4
53+
const teardownHelper = new TeardownHelper()
5354

5455
function connect(opts?: IClientOptions | string) {
5556
if (typeof opts === 'string') {
5657
opts = { host: opts }
5758
}
5859
opts = { ...config, ...opts } as IClientOptions
59-
return mqtt.connect(opts)
60+
const instance = mqtt.connect(opts)
61+
teardownHelper.addClient(instance)
62+
return instance
6063
}
6164

62-
const teardownHelper = new TeardownHelper()
65+
function serverBuilder(...args: Parameters<typeof serverBuilderFn>) {
66+
const instance = serverBuilderFn(...args)
67+
teardownHelper.addServer(instance)
68+
return instance
69+
}
6370

6471
async function beforeEachExec() {
6572
await teardownHelper.runAll()
@@ -74,6 +81,9 @@ export default function abstractTest(server, config, ports) {
7481
after(afterExec)
7582

7683
describe('closing', () => {
84+
beforeEach(beforeEachExec)
85+
after(afterExec)
86+
7787
it('should emit close if stream closes', function _test(t, done) {
7888
const client = connect()
7989

@@ -254,14 +264,19 @@ export default function abstractTest(server, config, ports) {
254264

255265
it('should emit end even on a failed connection', function _test(t, done) {
256266
const client = connect({ host: 'this_hostname_should_not_exist' })
267+
let timeoutEmitted = false
257268

258269
const timeout = setTimeout(() => {
270+
timeoutEmitted = true
259271
done(new Error('Disconnected client has failed to emit end'))
260272
}, 500)
261273

262274
client.once('end', () => {
263-
clearTimeout(timeout)
264-
done()
275+
// Prevent hanging test if `end` is not emitted before timeout
276+
if (!timeoutEmitted) {
277+
clearTimeout(timeout)
278+
done()
279+
}
265280
})
266281

267282
// after 200ms manually invoke client.end
@@ -298,6 +313,9 @@ export default function abstractTest(server, config, ports) {
298313
})
299314

300315
describe('connecting', () => {
316+
beforeEach(beforeEachExec)
317+
after(afterExec)
318+
301319
it('should connect to the broker', function _test(t, done) {
302320
const client = connect()
303321
client.on('error', done)
@@ -326,7 +344,7 @@ export default function abstractTest(server, config, ports) {
326344
server.once('client', (serverClient) => {
327345
serverClient.once('connect', (packet) => {
328346
assert.strictEqual(packet.clean, true)
329-
done()
347+
client.end((err) => done(err))
330348
})
331349
})
332350
})
@@ -361,17 +379,24 @@ export default function abstractTest(server, config, ports) {
361379
})
362380

363381
it('should require a clientId with clean=false', function _test(t, done) {
382+
let errorCaught = false
383+
364384
try {
365385
const client = connect({ clean: false })
366386
client.on('error', (err) => {
367387
done(err)
368388
})
369389
} catch (err) {
390+
errorCaught = true
370391
assert.strictEqual(
371392
err.message,
372393
'Missing clientId for unclean clients',
373394
)
374395
done()
396+
} finally {
397+
if (!errorCaught) {
398+
done(new Error('Client should have thrown an error'))
399+
}
375400
}
376401
})
377402

@@ -384,7 +409,7 @@ export default function abstractTest(server, config, ports) {
384409
server.once('client', (serverClient) => {
385410
serverClient.once('connect', (packet) => {
386411
assert.include(packet.clientId, 'testclient')
387-
done()
412+
client.end((err) => done(err))
388413
})
389414
})
390415
})
@@ -473,6 +498,9 @@ export default function abstractTest(server, config, ports) {
473498
})
474499

475500
describe('handling offline states', () => {
501+
beforeEach(beforeEachExec)
502+
after(afterExec)
503+
476504
it('should emit offline event once when the client transitions from connected states to disconnected ones', function _test(t, done) {
477505
const client = connect({ reconnectPeriod: 20 })
478506

@@ -498,6 +526,9 @@ export default function abstractTest(server, config, ports) {
498526
})
499527

500528
describe('topic validations when subscribing', () => {
529+
beforeEach(beforeEachExec)
530+
after(afterExec)
531+
501532
it('should be ok for well-formated topics', function _test(t, done) {
502533
const client = connect()
503534
client.subscribe(
@@ -549,7 +580,7 @@ export default function abstractTest(server, config, ports) {
549580
}
550581
assert.isArray(granted2)
551582
assert.isEmpty(granted2)
552-
done()
583+
client.end((err3) => done(err3))
553584
})
554585
})
555586
})
@@ -629,9 +660,7 @@ export default function abstractTest(server, config, ports) {
629660

630661
client.once('connect', () => {
631662
assert.strictEqual(client.queue.length, 0)
632-
setTimeout(() => {
633-
client.end(true, done)
634-
}, 10)
663+
client.end((err) => done(err))
635664
})
636665
})
637666

@@ -641,9 +670,7 @@ export default function abstractTest(server, config, ports) {
641670
client.publish('test', 'test', { qos: 0 })
642671
assert.strictEqual(client.queue.length, 0)
643672
client.on('connect', () => {
644-
setTimeout(() => {
645-
client.end(true, done)
646-
}, 10)
673+
client.end((err) => done(err))
647674
})
648675
})
649676

@@ -656,9 +683,7 @@ export default function abstractTest(server, config, ports) {
656683
client.unsubscribe('test')
657684
assert.strictEqual(client.queue.length, 2)
658685
client.on('connect', () => {
659-
setTimeout(() => {
660-
client.end(true, done)
661-
}, 10)
686+
client.end((err) => done(err))
662687
})
663688
})
664689

@@ -706,8 +731,6 @@ export default function abstractTest(server, config, ports) {
706731
})
707732
})
708733

709-
teardownHelper.addServer(server2)
710-
711734
server2.listen(ports.PORTAND50, () => {
712735
client = connect({
713736
port: ports.PORTAND50,
@@ -719,7 +742,6 @@ export default function abstractTest(server, config, ports) {
719742
outgoingStore,
720743
queueQoSZero: true,
721744
})
722-
teardownHelper.addClient(client)
723745
client.on('packetreceive', (packet) => {
724746
if (packet.cmd === 'connack') {
725747
setImmediate(() => {
@@ -783,8 +805,6 @@ export default function abstractTest(server, config, ports) {
783805
})
784806
})
785807

786-
teardownHelper.addServer(server2)
787-
788808
const clientOptions = {
789809
port: ports.PORTAND72,
790810
host: 'localhost',
@@ -799,8 +819,6 @@ export default function abstractTest(server, config, ports) {
799819
server2.listen(ports.PORTAND72, () => {
800820
client = connect(clientOptions)
801821

802-
teardownHelper.addClient(client)
803-
804822
client.once('close', () => {
805823
client.once('connect', () => {
806824
client.publish('test', 'payload2', { qos: 1 }, () => {
@@ -833,9 +851,7 @@ export default function abstractTest(server, config, ports) {
833851

834852
client.on('connect', () => {
835853
assert.isTrue(called)
836-
setTimeout(() => {
837-
client.end(true, done)
838-
}, 10)
854+
client.end((err) => done(err))
839855
})
840856
})
841857

@@ -1158,8 +1174,6 @@ export default function abstractTest(server, config, ports) {
11581174
})
11591175
})
11601176

1161-
teardownHelper.addServer(server2)
1162-
11631177
server2.listen(ports.PORTAND72, () => {
11641178
client = connect({
11651179
port: ports.PORTAND72,
@@ -1169,8 +1183,6 @@ export default function abstractTest(server, config, ports) {
11691183
reconnectPeriod: 0,
11701184
})
11711185

1172-
teardownHelper.addClient(client)
1173-
11741186
client.once('connect', () => {
11751187
client.publish(
11761188
'a',
@@ -1238,8 +1250,6 @@ export default function abstractTest(server, config, ports) {
12381250
})
12391251
})
12401252

1241-
teardownHelper.addServer(server2)
1242-
12431253
server2.listen(ports.PORTAND103, () => {
12441254
client = connect({
12451255
port: ports.PORTAND103,
@@ -1249,8 +1259,6 @@ export default function abstractTest(server, config, ports) {
12491259
reconnectPeriod: 0,
12501260
})
12511261

1252-
teardownHelper.addClient(client)
1253-
12541262
client.once('connect', () => {
12551263
client.publish(
12561264
'a',
@@ -1340,6 +1348,17 @@ export default function abstractTest(server, config, ports) {
13401348
})
13411349

13421350
function testQosHandleMessage(qos, done) {
1351+
teardownHelper.add({ executeOnce: true, order: 1 }, () => {
1352+
if (clock) {
1353+
clock.restore()
1354+
}
1355+
})
1356+
1357+
const clock = sinon.useFakeTimers({
1358+
...fakeTimersOptions,
1359+
toFake: ['setTimeout'],
1360+
})
1361+
13431362
const client = connect()
13441363

13451364
let messageEventCount = 0
@@ -1353,10 +1372,14 @@ export default function abstractTest(server, config, ports) {
13531372
if (handleMessageCount === 10) {
13541373
setTimeout(() => {
13551374
client.end(true, done)
1356-
})
1375+
}, 10)
1376+
1377+
clock.tick(10)
13571378
}
13581379
callback()
13591380
}, 10)
1381+
1382+
clock.tick(10)
13601383
}
13611384

13621385
client.on('message', (topic, message, packet) => {
@@ -1773,8 +1796,6 @@ export default function abstractTest(server, config, ports) {
17731796
})
17741797
})
17751798

1776-
teardownHelper.addServer(server2)
1777-
17781799
server2.listen(ports.PORTAND50, () => {
17791800
client = connect({
17801801
port: ports.PORTAND50,
@@ -1786,8 +1807,6 @@ export default function abstractTest(server, config, ports) {
17861807
outgoingStore,
17871808
})
17881809

1789-
teardownHelper.addClient(client)
1790-
17911810
client.on('connect', () => {
17921811
if (!reconnect) {
17931812
client.publish('topic', 'payload1', { qos: 1 })
@@ -1877,6 +1896,9 @@ export default function abstractTest(server, config, ports) {
18771896
})
18781897

18791898
describe('unsubscribing', () => {
1899+
beforeEach(beforeEachExec)
1900+
after(afterExec)
1901+
18801902
it('should send an unsubscribe packet (offline)', function _test(t, done) {
18811903
const client = connect()
18821904
let received = false
@@ -2010,14 +2032,17 @@ export default function abstractTest(server, config, ports) {
20102032
let clock: sinon.SinonFakeTimers
20112033

20122034
// eslint-disable-next-line
2013-
beforeEach(() => {
2035+
beforeEach(async () => {
2036+
await beforeEachExec()
20142037
clock = sinon.useFakeTimers(fakeTimersOptions)
20152038
})
20162039

20172040
afterEach(() => {
20182041
clock.restore()
20192042
})
20202043

2044+
after(afterExec)
2045+
20212046
it('should send ping at keepalive interval', function _test(t, done) {
20222047
const interval = 3000
20232048
const client = connect({ keepalive: interval / 1000 })
@@ -2101,64 +2126,79 @@ export default function abstractTest(server, config, ports) {
21012126
})
21022127

21032128
const reschedulePing = (reschedulePings: boolean) => {
2104-
it(`should ${
2105-
!reschedulePings ? 'not ' : ''
2106-
}reschedule pings if publishing at a higher rate than keepalive and reschedulePings===${reschedulePings}`, function _test(t, done) {
2107-
const intervalMs = 3000
2108-
const client = connect({
2109-
keepalive: intervalMs / 1000,
2110-
reschedulePings,
2111-
})
2112-
2113-
const spyReschedule = sinon.spy(
2114-
client,
2115-
'_reschedulePing' as any,
2116-
)
2129+
it(
2130+
`should ${
2131+
!reschedulePings ? 'not ' : ''
2132+
}reschedule pings if publishing at a higher rate than keepalive and reschedulePings===${reschedulePings}`,
2133+
{
2134+
timeout: 4000,
2135+
},
2136+
function _test(t, done) {
2137+
clock.restore()
21172138

2118-
let received = 0
2139+
teardownHelper.add(
2140+
{
2141+
executeOnce: true,
2142+
order: 1,
2143+
},
2144+
() => {
2145+
if (localClock) {
2146+
localClock.restore()
2147+
}
2148+
},
2149+
)
21192150

2120-
client.on('packetreceive', (packet) => {
2121-
if (packet.cmd === 'puback') {
2122-
process.nextTick(() => {
2123-
clock.tick(intervalMs)
2151+
const localClock = sinon.useFakeTimers({
2152+
...fakeTimersOptions,
2153+
toFake: ['setTimeout'],
2154+
})
2155+
const intervalMs = 3000
2156+
const client = connect({
2157+
keepalive: intervalMs / 1000,
2158+
reschedulePings,
2159+
})
21242160

2125-
received++
2161+
const spyReschedule = sinon.spy(
2162+
client,
2163+
'_reschedulePing' as any,
2164+
)
21262165

2127-
if (received === 2) {
2128-
if (reschedulePings) {
2129-
assert.strictEqual(
2130-
spyReschedule.callCount,
2131-
received,
2132-
)
2133-
} else {
2134-
assert.strictEqual(
2135-
spyReschedule.callCount,
2136-
0,
2137-
)
2166+
let received = 0
2167+
2168+
client.on('packetreceive', (packet) => {
2169+
if (packet.cmd === 'puback') {
2170+
process.nextTick(() => {
2171+
localClock.tick(intervalMs)
2172+
2173+
++received
2174+
2175+
if (received === 2) {
2176+
if (reschedulePings) {
2177+
assert.strictEqual(
2178+
spyReschedule.callCount,
2179+
received,
2180+
)
2181+
} else {
2182+
assert.strictEqual(
2183+
spyReschedule.callCount,
2184+
0,
2185+
)
2186+
}
2187+
client.end((err) => done(err))
21382188
}
2139-
client.end(true, done)
2140-
}
2141-
})
2142-
2143-
clock.tick(1)
2144-
}
2145-
})
2146-
2147-
server.once('client', (serverClient) => {
2148-
serverClient.on('publish', () => {
2149-
// needed to trigger the setImmediate inside server publish listener and send suback
2150-
clock.tick(1)
2189+
})
2190+
}
21512191
})
2152-
})
21532192

2154-
client.once('connect', () => {
2155-
// reset call count (it's called also on connack)
2156-
spyReschedule.resetHistory()
2157-
// use qos1 so the puback is received (to reschedule ping)
2158-
client.publish('foo', 'bar', { qos: 1 })
2159-
client.publish('foo', 'bar', { qos: 1 })
2160-
})
2161-
})
2193+
client.once('connect', () => {
2194+
// reset call count (it's called also on connack)
2195+
spyReschedule.resetHistory()
2196+
// use qos1 so the puback is received (to reschedule ping)
2197+
client.publish('foo', 'bar', { qos: 1 })
2198+
client.publish('foo', 'bar', { qos: 1 })
2199+
})
2200+
},
2201+
)
21622202
}
21632203

21642204
reschedulePing(true)
@@ -2168,7 +2208,7 @@ export default function abstractTest(server, config, ports) {
21682208
it(`should shift ping on pingresp when reschedulePings===${reschedulePings}`, function _test(t, done) {
21692209
const intervalMs = 3000
21702210

2171-
let client = connect({
2211+
const client = connect({
21722212
keepalive: intervalMs / 1000,
21732213
reschedulePings,
21742214
})
@@ -2180,7 +2220,6 @@ export default function abstractTest(server, config, ports) {
21802220
process.nextTick(() => {
21812221
assert.strictEqual(spy.callCount, 1)
21822222
client.end(true, done)
2183-
client = null
21842223
})
21852224
}
21862225
})
@@ -2201,6 +2240,9 @@ export default function abstractTest(server, config, ports) {
22012240
})
22022241

22032242
describe('pinging', () => {
2243+
beforeEach(beforeEachExec)
2244+
after(afterExec)
2245+
22042246
it('should setup keepalive manager', function _test(t, done) {
22052247
const client = connect({ keepalive: 3 })
22062248
client.once('connect', () => {
@@ -2227,26 +2269,21 @@ export default function abstractTest(server, config, ports) {
22272269

22282270
t.after(() => {
22292271
clock.restore()
2230-
if (client) {
2231-
client.end(true)
2232-
throw new Error('Test timed out')
2233-
}
22342272
})
22352273

22362274
const options: IClientOptions = {
22372275
keepalive: 60,
22382276
reconnectPeriod: 5000,
22392277
}
22402278

2241-
let client = connect(options)
2279+
const client = connect(options)
22422280

22432281
client.once('connect', () => {
22442282
client.once('error', (err) => {
22452283
assert.equal(err.message, 'Keepalive timeout')
22462284
client.once('connect', () => {
22472285
client.end(true, done)
22482286
clock.tick(100)
2249-
client = null
22502287
})
22512288
})
22522289

@@ -2271,12 +2308,9 @@ export default function abstractTest(server, config, ports) {
22712308

22722309
t.after(() => {
22732310
clock.restore()
2274-
if (client) {
2275-
client.end(true)
2276-
}
22772311
})
22782312

2279-
let client = connect({ keepalive: 10 })
2313+
const client = connect({ keepalive: 10 })
22802314
client.once('close', () => {
22812315
done(new Error('Client closed connection'))
22822316
})
@@ -2295,7 +2329,6 @@ export default function abstractTest(server, config, ports) {
22952329
client.removeAllListeners('close')
22962330
client.end(true, done)
22972331
clock.tick(100)
2298-
client = null
22992332
}
23002333
})
23012334

@@ -2306,14 +2339,17 @@ export default function abstractTest(server, config, ports) {
23062339
})
23072340

23082341
describe('subscribing', () => {
2342+
beforeEach(beforeEachExec)
2343+
after(afterExec)
2344+
23092345
it('should send a subscribe message (offline)', function _test(t, done) {
23102346
const client = connect()
23112347

23122348
client.subscribe('test')
23132349

23142350
server.once('client', (serverClient) => {
23152351
serverClient.once('subscribe', () => {
2316-
done()
2352+
client.end((err) => done(err))
23172353
})
23182354
})
23192355
})
@@ -2338,7 +2374,7 @@ export default function abstractTest(server, config, ports) {
23382374
result.rh = 0
23392375
}
23402376
assert.include(packet.subscriptions[0], result)
2341-
done()
2377+
client.end((err) => done(err))
23422378
})
23432379
})
23442380
})
@@ -2353,7 +2389,7 @@ export default function abstractTest(server, config, ports) {
23532389

23542390
client.on('packetsend', (packet) => {
23552391
if (packet.cmd === 'subscribe') {
2356-
done()
2392+
client.end((err) => done(err))
23572393
}
23582394
})
23592395
})
@@ -2368,7 +2404,7 @@ export default function abstractTest(server, config, ports) {
23682404

23692405
client.on('packetreceive', (packet) => {
23702406
if (packet.cmd === 'suback') {
2371-
done()
2407+
client.end((err) => done(err))
23722408
}
23732409
})
23742410
})
@@ -2464,7 +2500,7 @@ export default function abstractTest(server, config, ports) {
24642500
}
24652501

24662502
assert.deepStrictEqual(packet.subscriptions, expected)
2467-
done()
2503+
client.end((err) => done(err))
24682504
})
24692505
})
24702506
})
@@ -2581,6 +2617,9 @@ export default function abstractTest(server, config, ports) {
25812617
})
25822618

25832619
describe('receiving messages', () => {
2620+
beforeEach(beforeEachExec)
2621+
after(afterExec)
2622+
25842623
it('should fire the message event', function _test(t, done) {
25852624
const client = connect()
25862625
const testPacket = {
@@ -2760,6 +2799,9 @@ export default function abstractTest(server, config, ports) {
27602799
})
27612800

27622801
describe('qos handling', () => {
2802+
beforeEach(beforeEachExec)
2803+
after(afterExec)
2804+
27632805
it('should follow qos 0 semantics (trivial)', function _test(t, done) {
27642806
const client = connect()
27652807
const testTopic = 'test'
@@ -3159,8 +3201,7 @@ export default function abstractTest(server, config, ports) {
31593201
const client = connect()
31603202

31613203
client.on('connect', () => {
3162-
client.end()
3163-
done() // it will raise an exception if called two times
3204+
client.end((err) => done(err))
31643205
})
31653206
})
31663207

@@ -3238,15 +3279,33 @@ export default function abstractTest(server, config, ports) {
32383279
})
32393280

32403281
it('should always cleanup successfully on reconnection', function _test(t, done) {
3282+
teardownHelper.add({ executeOnce: true, order: 1 }, () => {
3283+
if (clock) {
3284+
clock.restore()
3285+
}
3286+
})
3287+
3288+
const clock = sinon.useFakeTimers({
3289+
...fakeTimersOptions,
3290+
toFake: ['setTimeout'],
3291+
})
3292+
32413293
const client = connect({
32423294
host: 'this_hostname_should_not_exist',
32433295
connectTimeout: 0,
32443296
reconnectPeriod: 1,
32453297
})
3298+
32463299
// bind client.end so that when it is called it is automatically passed in the done callback
32473300
setTimeout(() => {
3248-
client.end(done)
3249-
}, 100)
3301+
setTimeout(() => {
3302+
client.end(done)
3303+
}, 10)
3304+
3305+
clock.tick(10)
3306+
}, 10)
3307+
3308+
clock.tick(10)
32503309
})
32513310

32523311
it('should emit connack timeout error', function _test(t, done) {
@@ -3282,13 +3341,7 @@ export default function abstractTest(server, config, ports) {
32823341
timeout: 4000,
32833342
},
32843343
function _test(t, done) {
3285-
t.after(() => {
3286-
// close client if not closed
3287-
if (client) {
3288-
client.end(true)
3289-
}
3290-
})
3291-
let client = connect({ reconnectPeriod: 200 })
3344+
const client = connect({ reconnectPeriod: 200 })
32923345
let serverPublished = false
32933346
let clientCalledBack = false
32943347

@@ -3325,7 +3378,6 @@ export default function abstractTest(server, config, ports) {
33253378
setImmediate(() => {
33263379
assert.isTrue(clientCalledBack)
33273380
client.end(true, done)
3328-
client = null
33293381
})
33303382
}
33313383
})
@@ -3364,14 +3416,7 @@ export default function abstractTest(server, config, ports) {
33643416
timeout: 4000,
33653417
},
33663418
function _test(t, done) {
3367-
t.after(() => {
3368-
// close client if not closed
3369-
if (client) {
3370-
client.end(true)
3371-
}
3372-
})
3373-
3374-
let client = connect({ reconnectPeriod: 200 })
3419+
const client = connect({ reconnectPeriod: 200 })
33753420
let serverPublished = false
33763421
let clientCalledBack = false
33773422

@@ -3401,22 +3446,14 @@ export default function abstractTest(server, config, ports) {
34013446
setImmediate(() => {
34023447
assert.isTrue(clientCalledBack)
34033448
client.end(true, done)
3404-
client = null
34053449
})
34063450
}
34073451
})
34083452
},
34093453
)
34103454

34113455
it('should not resend in-flight QoS 1 removed publish messages from the client', function _test(t, done) {
3412-
t.after(() => {
3413-
// close client if not closed
3414-
if (client) {
3415-
client.end(true)
3416-
}
3417-
})
3418-
3419-
let client = connect({ reconnectPeriod: 100 })
3456+
const client = connect({ reconnectPeriod: 100 })
34203457
let clientCalledBack = false
34213458

34223459
server.once('client', (serverClient) => {
@@ -3450,7 +3487,6 @@ export default function abstractTest(server, config, ports) {
34503487
assert.isTrue(clientCalledBack)
34513488
client.end(true, (err) => {
34523489
done(err)
3453-
client = null
34543490
})
34553491
})
34563492

@@ -3600,17 +3636,13 @@ export default function abstractTest(server, config, ports) {
36003636
})
36013637
})
36023638

3603-
teardownHelper.addServer(server2)
3604-
36053639
server2.listen(ports.PORTAND49, () => {
36063640
client = connect({
36073641
port: ports.PORTAND49,
36083642
host: 'localhost',
36093643
reconnectPeriod: 100,
36103644
})
36113645

3612-
teardownHelper.addClient(client)
3613-
36143646
client.on('reconnect', () => {
36153647
reconnectEvent = true
36163648
})
@@ -3679,8 +3711,6 @@ export default function abstractTest(server, config, ports) {
36793711
})
36803712
})
36813713

3682-
teardownHelper.addServer(server2)
3683-
36843714
server2.listen(ports.PORTAND50, () => {
36853715
client = connect({
36863716
port: ports.PORTAND50,
@@ -3692,8 +3722,6 @@ export default function abstractTest(server, config, ports) {
36923722
outgoingStore,
36933723
})
36943724

3695-
teardownHelper.addClient(client)
3696-
36973725
client.on('connect', () => {
36983726
if (!reconnect) {
36993727
client.subscribe('test', { qos: 2 }, () => {})
@@ -3728,8 +3756,6 @@ export default function abstractTest(server, config, ports) {
37283756
})
37293757
})
37303758

3731-
teardownHelper.addServer(server2)
3732-
37333759
server2.listen(ports.PORTAND50, () => {
37343760
client = connect({
37353761
port: ports.PORTAND50,
@@ -3740,8 +3766,6 @@ export default function abstractTest(server, config, ports) {
37403766
reconnectPeriod: 0,
37413767
})
37423768

3743-
teardownHelper.addClient(client)
3744-
37453769
client.on('connect', () => {
37463770
client.subscribe('test', { qos: 2 }, (e) => {
37473771
if (!e) {
@@ -3791,8 +3815,6 @@ export default function abstractTest(server, config, ports) {
37913815
})
37923816
})
37933817

3794-
teardownHelper.addServer(server2)
3795-
37963818
server2.listen(ports.PORTAND50, () => {
37973819
client = connect({
37983820
port: ports.PORTAND50,
@@ -3804,8 +3826,6 @@ export default function abstractTest(server, config, ports) {
38043826
outgoingStore,
38053827
})
38063828

3807-
teardownHelper.addClient(client)
3808-
38093829
client.on('connect', () => {
38103830
if (!reconnect) {
38113831
client.publish('topic', 'payload', { qos: 1 })
@@ -3841,8 +3861,6 @@ export default function abstractTest(server, config, ports) {
38413861
})
38423862
})
38433863

3844-
teardownHelper.addServer(server2)
3845-
38463864
server2.listen(ports.PORTAND50, () => {
38473865
client = connect({
38483866
port: ports.PORTAND50,
@@ -3854,8 +3872,6 @@ export default function abstractTest(server, config, ports) {
38543872
outgoingStore,
38553873
})
38563874

3857-
teardownHelper.addClient(client)
3858-
38593875
client.on('connect', () => {
38603876
if (!reconnect) {
38613877
client.publish('topic', 'payload', { qos: 2 })
@@ -3896,8 +3912,6 @@ export default function abstractTest(server, config, ports) {
38963912
})
38973913
})
38983914

3899-
teardownHelper.addServer(server2)
3900-
39013915
server2.listen(ports.PORTAND50, () => {
39023916
client = connect({
39033917
port: ports.PORTAND50,
@@ -3909,8 +3923,6 @@ export default function abstractTest(server, config, ports) {
39093923
outgoingStore,
39103924
})
39113925

3912-
teardownHelper.addClient(client)
3913-
39143926
client.on('connect', () => {
39153927
if (!reconnect) {
39163928
client.publish(
@@ -3983,8 +3995,6 @@ export default function abstractTest(server, config, ports) {
39833995
})
39843996
})
39853997

3986-
teardownHelper.addServer(server2)
3987-
39883998
server2.listen(ports.PORTAND50, () => {
39893999
client = connect({
39904000
port: ports.PORTAND50,
@@ -3996,8 +4006,6 @@ export default function abstractTest(server, config, ports) {
39964006
outgoingStore,
39974007
})
39984008

3999-
teardownHelper.addClient(client)
4000-
40014009
client['nextId'] = 65535
40024010

40034011
client.on('connect', () => {
@@ -4042,6 +4050,17 @@ export default function abstractTest(server, config, ports) {
40424050
})
40434051

40444052
it('should be able to pub/sub if reconnect() is called at out of close handler', function _test(t, done) {
4053+
teardownHelper.add({ executeOnce: true, order: 1 }, () => {
4054+
if (clock) {
4055+
clock.restore()
4056+
}
4057+
})
4058+
4059+
const clock = sinon.useFakeTimers({
4060+
...fakeTimersOptions,
4061+
toFake: ['setTimeout'],
4062+
})
4063+
40454064
const client = connect({ reconnectPeriod: 0 })
40464065
let tryReconnect = true
40474066
let reconnectEvent = false
@@ -4052,6 +4071,8 @@ export default function abstractTest(server, config, ports) {
40524071
setTimeout(() => {
40534072
client.reconnect()
40544073
}, 100)
4074+
4075+
clock.tick(100)
40554076
} else {
40564077
assert.isTrue(reconnectEvent)
40574078
done()
@@ -4078,7 +4099,8 @@ export default function abstractTest(server, config, ports) {
40784099
const connack =
40794100
version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
40804101

4081-
beforeEach(() => {
4102+
beforeEach(async () => {
4103+
await beforeEachExec()
40824104
cachedClientListeners = server.listeners('client')
40834105
server.removeAllListeners('client')
40844106
})
@@ -4090,6 +4112,8 @@ export default function abstractTest(server, config, ports) {
40904112
})
40914113
})
40924114

4115+
after(afterExec)
4116+
40934117
it('should resubscribe even if disconnect is before suback', function _test(t, done) {
40944118
const client = connect({ reconnectPeriod: 100, ...config })
40954119
let subscribeCount = 0
@@ -4152,6 +4176,9 @@ export default function abstractTest(server, config, ports) {
41524176
})
41534177

41544178
describe('message id to subscription topic mapping', () => {
4179+
beforeEach(beforeEachExec)
4180+
after(afterExec)
4181+
41554182
it('should not create a mapping if resubscribe is disabled', function _test(t, done) {
41564183
const client = connect({ resubscribe: false })
41574184
client.subscribe('test1')

‎test/helpers/TeardownHelper.ts

+42-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { MqttClient } from 'src'
22
import { randomUUID } from 'node:crypto'
3+
import { isAsyncFunction } from 'node:util/types'
34
import serverBuilder from '../server_helpers_for_client_tests'
45

56
type ServerBuilderInstance = ReturnType<typeof serverBuilder>
@@ -12,6 +13,14 @@ type AddOptions = {
1213
* @default true
1314
*/
1415
executeOnce?: boolean
16+
/**
17+
* @description
18+
* The order in which the method will be executed.
19+
* If `order===0` the method will be executed after all methods before it that were added.
20+
*
21+
* @default 0
22+
*/
23+
order?: number
1524
}
1625

1726
type ResetOptions = {
@@ -24,6 +33,11 @@ type ResetOptions = {
2433
removeOnce?: boolean
2534
}
2635

36+
type Method =
37+
| Promise<any>
38+
| ((...args: any[]) => Promise<any>)
39+
| ((...args: any[]) => any)
40+
2741
/**
2842
* @description
2943
* Class to help clean the environment or close opened connections after tests finish.
@@ -99,7 +113,7 @@ class TeardownHelper {
99113
string,
100114
{
101115
options: AddOptions
102-
method: Promise<any> | ((...args: any[]) => Promise<any>)
116+
method: Method
103117
args: any[]
104118
}
105119
>
@@ -132,17 +146,17 @@ class TeardownHelper {
132146
* @description
133147
* Add a method to be executed
134148
*/
135-
add<T extends (...args: any[]) => Promise<void>>(
149+
add<T extends any[] = []>(
136150
options: AddOptions | undefined,
137-
method: Promise<void> | T,
138-
...args: Parameters<T>
151+
method: Method,
152+
...args: T
139153
): string {
140154
const id = randomUUID()
141155

142156
this.#methods.set(id, {
143157
method,
144158
args,
145-
options: { executeOnce: true, ...options },
159+
options: { executeOnce: true, order: 0, ...options },
146160
})
147161

148162
return id
@@ -235,23 +249,43 @@ class TeardownHelper {
235249
return
236250
}
237251

252+
const methodStored: (AddOptions & { key: string })[] = []
253+
254+
for (const [key, { options }] of this.#methods) {
255+
methodStored.push({ ...options, key })
256+
}
257+
258+
methodStored.sort((a, b) => b.order - a.order)
238259
const methods: Array<Promise<any>> = []
239260

240-
for (const [id, { method, options, args }] of this.#methods) {
261+
for (const { key, ...options } of methodStored) {
262+
const { method, args } = this.#methods.get(key)
263+
241264
if (method instanceof Promise) {
242265
methods.push(method)
243-
} else {
266+
} else if (isAsyncFunction(method)) {
244267
const promise = new Promise<any>((resolve, reject) => {
245268
method(...args)
246269
.then(resolve)
247270
.catch(reject)
248271
})
249272

273+
methods.push(promise)
274+
} else {
275+
const promise = new Promise<any>((resolve, reject) => {
276+
try {
277+
const result = method(...args)
278+
resolve(result)
279+
} catch (error) {
280+
reject(error)
281+
}
282+
})
283+
250284
methods.push(promise)
251285
}
252286

253287
if (options.executeOnce) {
254-
this.#methods.delete(id)
288+
this.#methods.delete(key)
255289
}
256290
}
257291

0 commit comments

Comments
 (0)
Please sign in to comment.