Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5840): heartbeat duration includes socket creation #3973

Merged
merged 9 commits into from
Jan 30, 2024
237 changes: 103 additions & 134 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
MongoRuntimeError,
needsRetryableWriteLabel
} from '../error';
import { type Callback, HostAddress, ns } from '../utils';
import { HostAddress, ns, promiseWithResolvers } from '../utils';
import { AuthContext, type AuthProvider } from './auth/auth_provider';
import { GSSAPI } from './auth/gssapi';
import { MongoCR } from './auth/mongocr';
Expand Down Expand Up @@ -55,27 +55,26 @@ export const AUTH_PROVIDERS = new Map<AuthMechanism | string, AuthProvider>([
/** @public */
export type Stream = Socket | TLSSocket;

export function connect(options: ConnectionOptions, callback: Callback<Connection>): void {
makeConnection({ ...options, existingSocket: undefined }, (err, socket) => {
if (err || !socket) {
return callback(err);
}

let ConnectionType = options.connectionType ?? Connection;
if (options.autoEncrypter) {
ConnectionType = CryptoConnection;
}
export async function connect(options: ConnectionOptions): Promise<Connection> {
let connection: Connection | null = null;
try {
const socket = await makeSocket(options);
connection = makeConnection(options, socket);
await performInitialHandshake(connection, options);
return connection;
} catch (error) {
connection?.destroy({ force: false });
throw error;
}
}

const connection = new ConnectionType(socket, options);
export function makeConnection(options: ConnectionOptions, socket: Stream): Connection {
let ConnectionType = options.connectionType ?? Connection;
if (options.autoEncrypter) {
ConnectionType = CryptoConnection;
}

performInitialHandshake(connection, options).then(
() => callback(undefined, connection),
error => {
connection.destroy({ force: false });
callback(error);
}
);
});
return new ConnectionType(socket, options);
}

function checkSupportedServer(hello: Document, options: ConnectionOptions) {
Expand Down Expand Up @@ -103,7 +102,7 @@ function checkSupportedServer(hello: Document, options: ConnectionOptions) {
return new MongoCompatibilityError(message);
}

async function performInitialHandshake(
export async function performInitialHandshake(
conn: Connection,
options: ConnectionOptions
): Promise<void> {
Expand Down Expand Up @@ -329,35 +328,21 @@ function parseSslOptions(options: MakeConnectionOptions): TLSConnectionOpts {
return result;
}

const SOCKET_ERROR_EVENT_LIST = ['error', 'close', 'timeout', 'parseError'] as const;
type ErrorHandlerEventName = (typeof SOCKET_ERROR_EVENT_LIST)[number] | 'cancel';
const SOCKET_ERROR_EVENTS = new Set(SOCKET_ERROR_EVENT_LIST);

function makeConnection(options: MakeConnectionOptions, _callback: Callback<Stream>) {
export async function makeSocket(options: MakeConnectionOptions): Promise<Stream> {
const useTLS = options.tls ?? false;
const noDelay = options.noDelay ?? true;
const connectTimeoutMS = options.connectTimeoutMS ?? 30000;
const rejectUnauthorized = options.rejectUnauthorized ?? true;
const existingSocket = options.existingSocket;

let socket: Stream;
const callback: Callback<Stream> = function (err, ret) {
if (err && socket) {
socket.destroy();
}

_callback(err, ret);
};

if (options.proxyHost != null) {
// Currently, only Socks5 is supported.
return makeSocks5Connection(
{
...options,
connectTimeoutMS // Should always be present for Socks5
},
callback
);
return makeSocks5Connection({
...options,
connectTimeoutMS // Should always be present for Socks5
});
}

if (useTLS) {
Expand All @@ -379,47 +364,41 @@ function makeConnection(options: MakeConnectionOptions, _callback: Callback<Stre
socket.setTimeout(connectTimeoutMS);
socket.setNoDelay(noDelay);

const connectEvent = useTLS ? 'secureConnect' : 'connect';
let cancellationHandler: (err: Error) => void;
function errorHandler(eventName: ErrorHandlerEventName) {
return (err: Error) => {
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
if (cancellationHandler && options.cancellationToken) {
options.cancellationToken.removeListener('cancel', cancellationHandler);
}

socket.removeListener(connectEvent, connectHandler);
callback(connectionFailureError(eventName, err));
};
}
let cancellationHandler: ((err: Error) => void) | null = null;

function connectHandler() {
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
if (cancellationHandler && options.cancellationToken) {
options.cancellationToken.removeListener('cancel', cancellationHandler);
const { promise: connectedSocket, resolve, reject } = promiseWithResolvers<Stream>();
if (existingSocket) {
resolve(socket);
} else {
const connectEvent = useTLS ? 'secureConnect' : 'connect';
socket
.once(connectEvent, () => resolve(socket))
.once('error', error => reject(connectionFailureError('error', error)))
.once('timeout', () => reject(connectionFailureError('timeout')))
.once('close', () => reject(connectionFailureError('close')));

if (options.cancellationToken != null) {
cancellationHandler = () => reject(connectionFailureError('cancel'));
options.cancellationToken.once('cancel', cancellationHandler);
}
}

if ('authorizationError' in socket) {
if (socket.authorizationError && rejectUnauthorized) {
// TODO(NODE-5192): wrap this with a MongoError subclass
return callback(socket.authorizationError);
}
try {
socket = await connectedSocket;
return socket;
} catch (error) {
socket.destroy();
if ('authorizationError' in socket && socket.authorizationError != null && rejectUnauthorized) {
// TODO(NODE-5192): wrap this with a MongoError subclass
throw socket.authorizationError;
}

throw error;
} finally {
socket.setTimeout(0);
callback(undefined, socket);
}

SOCKET_ERROR_EVENTS.forEach(event => socket.once(event, errorHandler(event)));
if (options.cancellationToken) {
cancellationHandler = errorHandler('cancel');
options.cancellationToken.once('cancel', cancellationHandler);
}

if (existingSocket) {
process.nextTick(connectHandler);
} else {
socket.once(connectEvent, connectHandler);
socket.removeAllListeners();
if (cancellationHandler != null) {
options.cancellationToken?.removeListener('cancel', cancellationHandler);
}
}
}

Expand All @@ -435,78 +414,68 @@ function loadSocks() {
return socks;
}

function makeSocks5Connection(options: MakeConnectionOptions, callback: Callback<Stream>) {
async function makeSocks5Connection(options: MakeConnectionOptions): Promise<Stream> {
const hostAddress = HostAddress.fromHostPort(
options.proxyHost ?? '', // proxyHost is guaranteed to set here
options.proxyPort ?? 1080
);

// First, connect to the proxy server itself:
makeConnection(
{
...options,
hostAddress,
tls: false,
proxyHost: undefined
},
(err, rawSocket) => {
if (err || !rawSocket) {
return callback(err);
}
const rawSocket = await makeSocket({
...options,
hostAddress,
tls: false,
proxyHost: undefined
});

const destination = parseConnectOptions(options) as net.TcpNetConnectOpts;
if (typeof destination.host !== 'string' || typeof destination.port !== 'number') {
return callback(
new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts')
);
}
const destination = parseConnectOptions(options) as net.TcpNetConnectOpts;
if (typeof destination.host !== 'string' || typeof destination.port !== 'number') {
throw new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts');
}

try {
socks ??= loadSocks();
} catch (error) {
return callback(error);
socks ??= loadSocks();

try {
// Then, establish the Socks5 proxy connection:
const { socket } = await socks.SocksClient.createConnection({
existing_socket: rawSocket,
timeout: options.connectTimeoutMS,
command: 'connect',
destination: {
host: destination.host,
port: destination.port
},
proxy: {
// host and port are ignored because we pass existing_socket
host: 'iLoveJavaScript',
port: 0,
type: 5,
userId: options.proxyUsername || undefined,
alenakhineika marked this conversation as resolved.
Show resolved Hide resolved
password: options.proxyPassword || undefined
}
});

// Then, establish the Socks5 proxy connection:
socks.SocksClient.createConnection({
existing_socket: rawSocket,
timeout: options.connectTimeoutMS,
command: 'connect',
destination: {
host: destination.host,
port: destination.port
},
proxy: {
// host and port are ignored because we pass existing_socket
host: 'iLoveJavaScript',
port: 0,
type: 5,
userId: options.proxyUsername || undefined,
password: options.proxyPassword || undefined
}
}).then(
({ socket }) => {
// Finally, now treat the resulting duplex stream as the
// socket over which we send and receive wire protocol messages:
makeConnection(
{
...options,
existingSocket: socket,
proxyHost: undefined
},
callback
);
},
error => callback(connectionFailureError('error', error))
);
}
);
// Finally, now treat the resulting duplex stream as the
// socket over which we send and receive wire protocol messages:
return await makeSocket({
...options,
existingSocket: socket,
proxyHost: undefined
});
} catch (error) {
throw connectionFailureError('error', error);
}
}

function connectionFailureError(type: ErrorHandlerEventName, err: Error) {
function connectionFailureError(type: 'error', cause: Error): MongoNetworkError;
function connectionFailureError(type: 'close' | 'timeout' | 'cancel'): MongoNetworkError;
function connectionFailureError(
type: 'error' | 'close' | 'timeout' | 'cancel',
cause?: Error
): MongoNetworkError {
switch (type) {
case 'error':
return new MongoNetworkError(MongoError.buildErrorMessage(err), { cause: err });
return new MongoNetworkError(MongoError.buildErrorMessage(cause), { cause });
case 'timeout':
return new MongoNetworkTimeoutError('connection timed out');
case 'close':
Expand Down