Skip to content

Commit 335ee55

Browse files
authoredSep 7, 2022
feat(NODE-4385): add cmap pool pausing functionality (#3321)
1 parent f4702f4 commit 335ee55

25 files changed

+368
-149
lines changed
 

‎src/cmap/connection_pool.ts

+82-49
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
CONNECTION_POOL_CLEARED,
1414
CONNECTION_POOL_CLOSED,
1515
CONNECTION_POOL_CREATED,
16+
CONNECTION_POOL_READY,
1617
CONNECTION_READY
1718
} from '../constants';
1819
import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error';
@@ -31,9 +32,10 @@ import {
3132
ConnectionPoolClearedEvent,
3233
ConnectionPoolClosedEvent,
3334
ConnectionPoolCreatedEvent,
35+
ConnectionPoolReadyEvent,
3436
ConnectionReadyEvent
3537
} from './connection_pool_events';
36-
import { PoolClosedError, WaitQueueTimeoutError } from './errors';
38+
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
3739
import { ConnectionPoolMetrics } from './metrics';
3840

3941
/** @internal */
@@ -103,6 +105,7 @@ export interface CloseOptions {
103105
/** @public */
104106
export type ConnectionPoolEvents = {
105107
connectionPoolCreated(event: ConnectionPoolCreatedEvent): void;
108+
connectionPoolReady(event: ConnectionPoolReadyEvent): void;
106109
connectionPoolClosed(event: ConnectionPoolClosedEvent): void;
107110
connectionPoolCleared(event: ConnectionPoolClearedEvent): void;
108111
connectionCreated(event: ConnectionCreatedEvent): void;
@@ -167,6 +170,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
167170
* @event
168171
*/
169172
static readonly CONNECTION_POOL_CLEARED = CONNECTION_POOL_CLEARED;
173+
/**
174+
* Emitted each time the connection pool is marked ready
175+
* @event
176+
*/
177+
static readonly CONNECTION_POOL_READY = CONNECTION_POOL_READY;
170178
/**
171179
* Emitted when a connection is created.
172180
* @event
@@ -242,7 +250,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
242250

243251
process.nextTick(() => {
244252
this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
245-
this.ensureMinPoolSize();
246253
});
247254
}
248255

@@ -308,7 +315,13 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
308315
* Set the pool state to "ready"
309316
*/
310317
ready(): void {
318+
if (this[kPoolState] !== PoolState.paused) {
319+
return;
320+
}
311321
this[kPoolState] = PoolState.ready;
322+
this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this));
323+
clearTimeout(this[kMinPoolSizeTimer]);
324+
this.ensureMinPoolSize();
312325
}
313326

314327
/**
@@ -322,15 +335,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
322335
new ConnectionCheckOutStartedEvent(this)
323336
);
324337

325-
if (this.closed) {
326-
this.emit(
327-
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
328-
new ConnectionCheckOutFailedEvent(this, 'poolClosed')
329-
);
330-
callback(new PoolClosedError(this));
331-
return;
332-
}
333-
334338
const waitQueueMember: WaitQueueMember = { callback };
335339
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
336340
if (waitQueueTimeoutMS) {
@@ -390,26 +394,40 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
390394
* previous generation will eventually be pruned during subsequent checkouts.
391395
*/
392396
clear(serviceId?: ObjectId): void {
397+
if (this.closed) {
398+
return;
399+
}
400+
401+
// handle load balanced case
393402
if (this.loadBalanced && serviceId) {
394403
const sid = serviceId.toHexString();
395404
const generation = this.serviceGenerations.get(sid);
396405
// Only need to worry if the generation exists, since it should
397406
// always be there but typescript needs the check.
398407
if (generation == null) {
399-
// TODO(NODE-3483)
400408
throw new MongoRuntimeError('Service generations are required in load balancer mode.');
401409
} else {
402410
// Increment the generation for the service id.
403411
this.serviceGenerations.set(sid, generation + 1);
404412
}
405-
} else {
406-
this[kGeneration] += 1;
413+
this.emit(
414+
ConnectionPool.CONNECTION_POOL_CLEARED,
415+
new ConnectionPoolClearedEvent(this, serviceId)
416+
);
417+
return;
407418
}
408419

409-
this.emit(
410-
ConnectionPool.CONNECTION_POOL_CLEARED,
411-
new ConnectionPoolClearedEvent(this, serviceId)
412-
);
420+
// handle non load-balanced case
421+
this[kGeneration] += 1;
422+
const alreadyPaused = this[kPoolState] === PoolState.paused;
423+
this[kPoolState] = PoolState.paused;
424+
425+
this.clearMinPoolSizeTimer();
426+
this.processWaitQueue();
427+
428+
if (!alreadyPaused) {
429+
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
430+
}
413431
}
414432

415433
/** Close the pool */
@@ -430,33 +448,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
430448
// immediately cancel any in-flight connections
431449
this[kCancellationToken].emit('cancel');
432450

433-
// drain the wait queue
434-
while (this.waitQueueSize) {
435-
const waitQueueMember = this[kWaitQueue].pop();
436-
if (waitQueueMember) {
437-
if (waitQueueMember.timer) {
438-
clearTimeout(waitQueueMember.timer);
439-
}
440-
if (!waitQueueMember[kCancelled]) {
441-
// TODO(NODE-3483): Replace with MongoConnectionPoolClosedError
442-
waitQueueMember.callback(new MongoRuntimeError('Connection pool closed'));
443-
}
444-
}
445-
}
446-
447-
// clear the min pool size timer
448-
const minPoolSizeTimer = this[kMinPoolSizeTimer];
449-
if (minPoolSizeTimer) {
450-
clearTimeout(minPoolSizeTimer);
451-
}
452-
453451
// end the connection counter
454452
if (typeof this[kConnectionCounter].return === 'function') {
455453
this[kConnectionCounter].return(undefined);
456454
}
457455

458-
// mark the pool as closed immediately
459456
this[kPoolState] = PoolState.closed;
457+
this.clearMinPoolSizeTimer();
458+
this.processWaitQueue();
459+
460460
eachAsync<Connection>(
461461
this[kConnections].toArray(),
462462
(conn, cb) => {
@@ -526,12 +526,19 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
526526
});
527527
}
528528

529+
/** Clear the min pool size timer */
530+
private clearMinPoolSizeTimer(): void {
531+
const minPoolSizeTimer = this[kMinPoolSizeTimer];
532+
if (minPoolSizeTimer) {
533+
clearTimeout(minPoolSizeTimer);
534+
}
535+
}
536+
529537
private destroyConnection(connection: Connection, reason: string) {
530538
this.emit(
531539
ConnectionPool.CONNECTION_CLOSED,
532540
new ConnectionClosedEvent(this, connection, reason)
533541
);
534-
535542
// destroy the connection
536543
process.nextTick(() => connection.destroy());
537544
}
@@ -580,14 +587,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
580587
connect(connectOptions, (err, connection) => {
581588
if (err || !connection) {
582589
this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
583-
callback(err);
590+
this[kPending]--;
591+
callback(err ?? new MongoRuntimeError('Connection creation failed without error'));
584592
return;
585593
}
586594

587595
// The pool might have closed since we started trying to create a connection
588-
if (this.closed) {
596+
if (this[kPoolState] !== PoolState.ready) {
589597
this[kPending]--;
590598
connection.destroy({ force: true });
599+
callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this));
591600
return;
592601
}
593602

@@ -616,17 +625,25 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
616625
connection.markAvailable();
617626
this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection));
618627

628+
this[kPending]--;
619629
callback(undefined, connection);
620630
return;
621631
});
622632
}
623633

624634
private ensureMinPoolSize() {
625635
const minPoolSize = this.options.minPoolSize;
626-
if (this.closed || minPoolSize === 0) {
636+
if (this[kPoolState] !== PoolState.ready || minPoolSize === 0) {
627637
return;
628638
}
629639

640+
for (let i = 0; i < this[kConnections].length; i++) {
641+
const connection = this[kConnections].peekAt(i);
642+
if (connection && this.connectionIsPerished(connection)) {
643+
this[kConnections].removeOne(i);
644+
}
645+
}
646+
630647
if (
631648
this.totalConnectionCount < minPoolSize &&
632649
this.pendingConnectionCount < this.options.maxConnecting
@@ -635,23 +652,25 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
635652
// connection permits because that potentially delays the availability of
636653
// the connection to a checkout request
637654
this.createConnection((err, connection) => {
638-
this[kPending]--;
639655
if (!err && connection) {
640656
this[kConnections].push(connection);
641657
process.nextTick(() => this.processWaitQueue());
642658
}
643-
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
659+
if (this[kPoolState] === PoolState.ready) {
660+
clearTimeout(this[kMinPoolSizeTimer]);
661+
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
662+
}
644663
});
645664
} else {
665+
clearTimeout(this[kMinPoolSizeTimer]);
646666
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100);
647667
}
648668
}
649669

650670
private processWaitQueue() {
651-
if (this.closed || this[kProcessingWaitQueue]) {
671+
if (this[kProcessingWaitQueue]) {
652672
return;
653673
}
654-
655674
this[kProcessingWaitQueue] = true;
656675

657676
while (this.waitQueueSize) {
@@ -666,6 +685,21 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
666685
continue;
667686
}
668687

688+
if (this[kPoolState] !== PoolState.ready) {
689+
const reason = this.closed ? 'poolClosed' : 'connectionError';
690+
const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this);
691+
this.emit(
692+
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
693+
new ConnectionCheckOutFailedEvent(this, reason)
694+
);
695+
if (waitQueueMember.timer) {
696+
clearTimeout(waitQueueMember.timer);
697+
}
698+
this[kWaitQueue].shift();
699+
waitQueueMember.callback(error);
700+
continue;
701+
}
702+
669703
if (!this.availableConnectionCount) {
670704
break;
671705
}
@@ -701,7 +735,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
701735
continue;
702736
}
703737
this.createConnection((err, connection) => {
704-
this[kPending]--;
705738
if (waitQueueMember[kCancelled]) {
706739
if (!err && connection) {
707740
this[kConnections].push(connection);
@@ -710,7 +743,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
710743
if (err) {
711744
this.emit(
712745
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
713-
new ConnectionCheckOutFailedEvent(this, err)
746+
new ConnectionCheckOutFailedEvent(this, 'connectionError')
714747
);
715748
} else if (connection) {
716749
this[kCheckedOut]++;

‎src/cmap/connection_pool_events.ts

+12
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@ export class ConnectionPoolCreatedEvent extends ConnectionPoolMonitoringEvent {
3737
}
3838
}
3939

40+
/**
41+
* An event published when a connection pool is ready
42+
* @public
43+
* @category Event
44+
*/
45+
export class ConnectionPoolReadyEvent extends ConnectionPoolMonitoringEvent {
46+
/** @internal */
47+
constructor(pool: ConnectionPool) {
48+
super(pool);
49+
}
50+
}
51+
4052
/**
4153
* An event published when a connection pool is closed
4254
* @public

‎src/cmap/errors.ts

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MongoDriverError } from '../error';
1+
import { MongoDriverError, MongoNetworkError } from '../error';
22
import type { ConnectionPool } from './connection_pool';
33

44
/**
@@ -19,6 +19,27 @@ export class PoolClosedError extends MongoDriverError {
1919
}
2020
}
2121

22+
/**
23+
* An error indicating a connection pool is currently paused
24+
* @category Error
25+
*/
26+
export class PoolClearedError extends MongoNetworkError {
27+
// TODO(NODE-3144): needs to extend RetryableError or be marked retryable in some other way per spec
28+
/** The address of the connection pool */
29+
address: string;
30+
31+
constructor(pool: ConnectionPool) {
32+
// TODO(NODE-3135): pass in original pool-clearing error and use in message
33+
// "failed with: <original error which cleared the pool>"
34+
super(`Connection pool for ${pool.address} was cleared because another operation failed`);
35+
this.address = pool.address;
36+
}
37+
38+
override get name(): string {
39+
return 'MongoPoolClearedError';
40+
}
41+
}
42+
2243
/**
2344
* An error thrown when a request to check out a connection times out
2445
* @category Error

‎src/constants.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export const TOPOLOGY_DESCRIPTION_CHANGED = 'topologyDescriptionChanged' as cons
2626
export const CONNECTION_POOL_CREATED = 'connectionPoolCreated' as const;
2727
export const CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
2828
export const CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;
29+
export const CONNECTION_POOL_READY = 'connectionPoolReady' as const;
2930
export const CONNECTION_CREATED = 'connectionCreated' as const;
3031
export const CONNECTION_READY = 'connectionReady' as const;
3132
export const CONNECTION_CLOSED = 'connectionClosed' as const;
@@ -57,15 +58,16 @@ export const HEARTBEAT_EVENTS = Object.freeze([
5758
/** @public */
5859
export const CMAP_EVENTS = Object.freeze([
5960
CONNECTION_POOL_CREATED,
61+
CONNECTION_POOL_READY,
62+
CONNECTION_POOL_CLEARED,
6063
CONNECTION_POOL_CLOSED,
6164
CONNECTION_CREATED,
6265
CONNECTION_READY,
6366
CONNECTION_CLOSED,
6467
CONNECTION_CHECK_OUT_STARTED,
6568
CONNECTION_CHECK_OUT_FAILED,
6669
CONNECTION_CHECKED_OUT,
67-
CONNECTION_CHECKED_IN,
68-
CONNECTION_POOL_CLEARED
70+
CONNECTION_CHECKED_IN
6971
] as const);
7072

7173
/** @public */

0 commit comments

Comments
 (0)
Please sign in to comment.