Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-5915): refactor topology close logic to be synchronous #4021

Merged
merged 18 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export async function connect(options: ConnectionOptions): Promise<Connection> {
await performInitialHandshake(connection, options);
return connection;
} catch (error) {
connection?.destroy({ force: false });
connection?.destroy();
throw error;
}
}
Expand Down
12 changes: 1 addition & 11 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ export interface ConnectionOptions
mongoLogger?: MongoLogger | undefined;
}

/** @internal */
export interface DestroyOptions {
/** Force the destruction. */
force: boolean;
}

/** @public */
export type ConnectionEvents = {
commandStarted(event: CommandStartedEvent): void;
Expand Down Expand Up @@ -298,14 +292,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}, 1).unref(); // No need for this timer to hold the event loop open
}

public destroy(options: DestroyOptions, callback?: Callback): void {
public destroy(): void {
if (this.closed) {
if (typeof callback === 'function') process.nextTick(callback);
return;
}
if (typeof callback === 'function') {
this.once('close', () => process.nextTick(() => callback()));
}

// load balanced mode requires that these listeners remain on the connection
// after cleanup on timeouts, errors or close so we remove them before calling
Expand Down
44 changes: 14 additions & 30 deletions src/cmap/connection_pool.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's file a follow-up ticket in the V7 epic to remove closeOptions since its no longer used anywhere in the code base after these changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout here. I can file the follow-up and the first subtask of that follow-up can be to mark it as deprecated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking a little closer, it seems the only places that interface was used are in Topology.close and ConnectionPool.close which are both internal classes, so users shouldn't be interacting with this interface at all. Seems like it wouldn't be a breaking change to just remove it in this PR along with DestroyOptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, even though it's only used in internal classes / types, it's public and exported so technically it's a part of our API. In situations like this in the past, we've considered it breaking to remove and done it in a major release

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import {
type Callback,
eachAsync,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eachAsync utils function is now also used nowhere in the codebase, do we want to remove it? It uses callbacks, and IIUC we no longer want to add any new callback code to the driver.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I agree that removing it is likely the way we want to go here. @nbbeeken @baileympearson thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go for it

List,
makeCounter,
promiseWithResolvers,
Expand Down Expand Up @@ -493,25 +492,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
private interruptInUseConnections(minGeneration: number) {
for (const connection of this[kCheckedOut]) {
if (connection.generation <= minGeneration) {
this.checkIn(connection);
connection.onError(new PoolClearedOnNetworkError(this));
this.checkIn(connection);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baileympearson this was done to fix a test failure where we were getting an uncaught error when running this spec test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is right. Good catch.

I think the reason this worked in the past was because

  1. checkIn calls destroyConnection(), which calls connection.destroy() in a process.nextTick()
  2. then `connection.onError() runs and shuts down the connection
  3. then connection.destroy() is a no-op because the connection has already been destroyed

You made destroyConnection synchronous, so we need to run onError first to ensure the connection is closed with the correct error. Then we run checkIn, which resets the pool state and calls connection.destroy(), which is a no-op.

}
}
}

/** Close the pool */
close(callback: Callback<void>): void;
close(options: CloseOptions, callback: Callback<void>): void;
close(_options?: CloseOptions | Callback<void>, _cb?: Callback<void>): void {
let options = _options as CloseOptions;
const callback = (_cb ?? _options) as Callback<void>;
if (typeof options === 'function') {
options = {};
}

options = Object.assign({ force: false }, options);
close(): void {
if (this.closed) {
return callback();
return;
}

// immediately cancel any in-flight connections
Expand All @@ -526,21 +516,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this.clearMinPoolSizeTimer();
this.processWaitQueue();

eachAsync<Connection>(
this[kConnections].toArray(),
(conn, cb) => {
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
callback(err);
}
);
for (const conn of this[kConnections]) {
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy();
}
this[kConnections].clear();
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
}

/**
Expand Down Expand Up @@ -592,7 +576,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionClosedEvent(this, connection, reason)
);
// destroy the connection
process.nextTick(() => connection.destroy({ force: false }));
connection.destroy();
}

private connectionIsStale(connection: Connection) {
Expand Down Expand Up @@ -648,7 +632,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// The pool might have closed since we started trying to create a connection
if (this[kPoolState] !== PoolState.ready) {
this[kPending]--;
connection.destroy({ force: true });
connection.destroy();
callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this));
return;
}
Expand Down
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ export type {
Connection,
ConnectionEvents,
ConnectionOptions,
DestroyOptions,
ProxyOptions
} from './cmap/connection';
export type {
Expand Down
21 changes: 7 additions & 14 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
try {
await promisify(callback => this.topology?.connect(options, callback))();
} catch (error) {
this.topology?.close({ force: true });
this.topology?.close();
throw error;
}
};
Expand Down Expand Up @@ -614,19 +614,12 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
const topology = this.topology;
this.topology = undefined;

await new Promise<void>((resolve, reject) => {
topology.close({ force }, error => {
if (error) return reject(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.closeCallback(this, force, error => {
if (error) return reject(error);
resolve();
});
}
resolve();
});
});
topology.close();

const { encrypter } = this[kOptions];
if (encrypter) {
await encrypter.close(this, force);
}
Comment on lines +617 to +622
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can close() throw? If so, both the new and old implementations potentially would leave the encrypter open if topology.close() throws

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the investigation into whether these can throw or not.

doesn't have to be in this PR but do you think it might be worth handling errors anyways, since we could introduce errors into close() in the future?

}

/**
Expand Down
14 changes: 7 additions & 7 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ function resetMonitorState(monitor: Monitor) {

monitor[kCancellationToken].emit('cancel');

monitor.connection?.destroy({ force: true });
monitor.connection?.destroy();
monitor.connection = null;
}

Expand Down Expand Up @@ -247,7 +247,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);

function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy({ force: true });
monitor.connection?.destroy();
monitor.connection = null;

monitor.emitAndLogHeartbeat(
Expand Down Expand Up @@ -366,13 +366,13 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
await performInitialHandshake(connection, monitor.connectOptions);
return connection;
} catch (error) {
connection.destroy({ force: false });
connection.destroy();
throw error;
}
})().then(
connection => {
if (isInCloseState(monitor)) {
connection.destroy({ force: true });
connection.destroy();
return;
}

Expand Down Expand Up @@ -479,7 +479,7 @@ export class RTTPinger {
this.closed = true;
clearTimeout(this[kMonitorId]);

this.connection?.destroy({ force: true });
this.connection?.destroy();
this.connection = undefined;
}
}
Expand All @@ -495,7 +495,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {

function measureAndReschedule(conn?: Connection) {
if (rttPinger.closed) {
conn?.destroy({ force: true });
conn?.destroy();
return;
}

Expand Down Expand Up @@ -529,7 +529,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => {
rttPinger.connection?.destroy({ force: true });
rttPinger.connection?.destroy();
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
Expand Down
25 changes: 5 additions & 20 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Document } from '../bson';
import { type AutoEncrypter } from '../client-side-encryption/auto_encrypter';
import { type CommandOptions, Connection, type DestroyOptions } from '../cmap/connection';
import { type CommandOptions, Connection } from '../cmap/connection';
import {
ConnectionPool,
type ConnectionPoolEvents,
Expand Down Expand Up @@ -41,7 +41,6 @@ import type { GetMoreOptions } from '../operations/get_more';
import type { ClientSession } from '../sessions';
import { isTransactionCommand } from '../transactions';
import {
type Callback,
type EventEmitterWithState,
makeStateMachine,
maxWireVersion,
Expand Down Expand Up @@ -236,18 +235,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

/** Destroy the server connection */
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}
options = Object.assign({}, { force: false }, options);

destroy(): void {
if (this.s.state === STATE_CLOSED) {
if (typeof callback === 'function') {
callback();
}

return;
}

Expand All @@ -257,13 +246,9 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.monitor?.close();
}

this.pool.close(options, err => {
stateTransition(this, STATE_CLOSED);
this.emit('closed');
if (typeof callback === 'function') {
callback(err);
}
});
this.pool.close();
stateTransition(this, STATE_CLOSED);
this.emit('closed');
Comment on lines +249 to +251
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as with client.close() - can pool.close() throw?

}

/**
Expand Down