Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move all MongoDB-specific connection logic into driver layer, add createClient() method to handle creating MongoClient #13542

Merged
merged 2 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
209 changes: 9 additions & 200 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ const clone = require('./helpers/clone');
const driver = require('./driver');
const get = require('./helpers/get');
const immediate = require('./helpers/immediate');
const mongodb = require('mongodb');
const pkg = require('../package.json');
const utils = require('./utils');
const processConnectionOptions = require('./helpers/processConnectionOptions');
const CreateCollectionsError = require('./error/createCollectionsError');

const arrayAtomicsSymbol = require('./helpers/symbols').arrayAtomicsSymbol;
Expand Down Expand Up @@ -739,7 +736,7 @@ Connection.prototype.openUri = async function openUri(uri, options) {
throw err;
}

this.$initialConnection = _createMongoClient(this, uri, options).
this.$initialConnection = this.createClient(uri, options).
then(() => this).
catch(err => {
this.readyState = STATES.disconnected;
Expand Down Expand Up @@ -796,184 +793,6 @@ function _handleConnectionErrors(err) {
return err;
}

/*!
* ignore
*/

async function _createMongoClient(conn, uri, options) {
if (typeof uri !== 'string') {
throw new MongooseError('The `uri` parameter to `openUri()` must be a ' +
`string, got "${typeof uri}". Make sure the first parameter to ` +
'`mongoose.connect()` or `mongoose.createConnection()` is a string.');
}

if (conn._destroyCalled) {
throw new MongooseError(
'Connection has been closed and destroyed, and cannot be used for re-opening the connection. ' +
'Please create a new connection with `mongoose.createConnection()` or `mongoose.connect()`.'
);
}

if (conn.readyState === STATES.connecting || conn.readyState === STATES.connected) {
if (conn._connectionString !== uri) {
throw new MongooseError('Can\'t call `openUri()` on an active connection with ' +
'different connection strings. Make sure you aren\'t calling `mongoose.connect()` ' +
'multiple times. See: https://mongoosejs.com/docs/connections.html#multiple_connections');
}
}

options = processConnectionOptions(uri, options);

if (options) {

const autoIndex = options.config && options.config.autoIndex != null ?
options.config.autoIndex :
options.autoIndex;
if (autoIndex != null) {
conn.config.autoIndex = autoIndex !== false;
delete options.config;
delete options.autoIndex;
}

if ('autoCreate' in options) {
conn.config.autoCreate = !!options.autoCreate;
delete options.autoCreate;
}

if ('sanitizeFilter' in options) {
conn.config.sanitizeFilter = options.sanitizeFilter;
delete options.sanitizeFilter;
}

// Backwards compat
if (options.user || options.pass) {
options.auth = options.auth || {};
options.auth.username = options.user;
options.auth.password = options.pass;

conn.user = options.user;
conn.pass = options.pass;
}
delete options.user;
delete options.pass;

if (options.bufferCommands != null) {
conn.config.bufferCommands = options.bufferCommands;
delete options.bufferCommands;
}
} else {
options = {};
}

conn._connectionOptions = options;
const dbName = options.dbName;
if (dbName != null) {
conn.$dbName = dbName;
}
delete options.dbName;

if (!utils.hasUserDefinedProperty(options, 'driverInfo')) {
options.driverInfo = {
name: 'Mongoose',
version: pkg.version
};
}

conn.readyState = STATES.connecting;
conn._connectionString = uri;

let client;
try {
client = new mongodb.MongoClient(uri, options);
} catch (error) {
conn.readyState = STATES.disconnected;
throw error;
}
conn.client = client;

client.setMaxListeners(0);
await client.connect();

_setClient(conn, client, options, dbName);

for (const db of conn.otherDbs) {
_setClient(db, client, {}, db.name);
}
return conn;
}

/*!
* ignore
*/

function _setClient(conn, client, options, dbName) {
const db = dbName != null ? client.db(dbName) : client.db();
conn.db = db;
conn.client = client;
conn.host = client &&
client.s &&
client.s.options &&
client.s.options.hosts &&
client.s.options.hosts[0] &&
client.s.options.hosts[0].host || void 0;
conn.port = client &&
client.s &&
client.s.options &&
client.s.options.hosts &&
client.s.options.hosts[0] &&
client.s.options.hosts[0].port || void 0;
conn.name = dbName != null ? dbName : client && client.s && client.s.options && client.s.options.dbName || void 0;
conn._closeCalled = client._closeCalled;

const _handleReconnect = () => {
// If we aren't disconnected, we assume this reconnect is due to a
// socket timeout. If there's no activity on a socket for
// `socketTimeoutMS`, the driver will attempt to reconnect and emit
// this event.
if (conn.readyState !== STATES.connected) {
conn.readyState = STATES.connected;
conn.emit('reconnect');
conn.emit('reconnected');
conn.onOpen();
}
};

const type = client &&
client.topology &&
client.topology.description &&
client.topology.description.type || '';

if (type === 'Single') {
client.on('serverDescriptionChanged', ev => {
const newDescription = ev.newDescription;
if (newDescription.type === 'Unknown') {
conn.readyState = STATES.disconnected;
} else {
_handleReconnect();
}
});
} else if (type.startsWith('ReplicaSet')) {
client.on('topologyDescriptionChanged', ev => {
// Emit disconnected if we've lost connectivity to the primary
const description = ev.newDescription;
if (conn.readyState === STATES.connected && description.type !== 'ReplicaSetWithPrimary') {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
} else if (conn.readyState === STATES.disconnected && description.type === 'ReplicaSetWithPrimary') {
_handleReconnect();
}
});
}

conn.onOpen();

for (const i in conn.collections) {
if (utils.object.hasOwnProperty(conn.collections, i)) {
conn.collections[i].onOpen();
}
}
}

/**
* Destroy the connection. Similar to [`.close`](https://mongoosejs.com/docs/api/connection.html#Connection.prototype.close()),
* but also removes the connection from Mongoose's `connections` list and prevents the
Expand Down Expand Up @@ -1526,26 +1345,16 @@ Connection.prototype.getClient = function getClient() {
* @return {Connection} this
*/

Connection.prototype.setClient = function setClient(client) {
if (!(client instanceof mongodb.MongoClient)) {
throw new MongooseError('Must call `setClient()` with an instance of MongoClient');
}
if (this.readyState !== STATES.disconnected) {
throw new MongooseError('Cannot call `setClient()` on a connection that is already connected.');
}
if (client.topology == null) {
throw new MongooseError('Cannot call `setClient()` with a MongoClient that you have not called `connect()` on yet.');
}

this._connectionString = client.s.url;
_setClient(this, client, {}, client.s.options.dbName);
Connection.prototype.setClient = function setClient() {
throw new MongooseError('Connection#setClient not implemented by driver');
};

for (const model of Object.values(this.models)) {
// Errors handled internally, so safe to ignore error
model.init().catch(function $modelInitNoop() {});
}
/*!
* Called internally by `openUri()` to create a MongoClient instance.
*/

return this;
Connection.prototype.createClient = function createClient() {
throw new MongooseError('Connection#createClient not implemented by driver');
};

/**
Expand Down