Skip to content

Commit

Permalink
Merge pull request #13542 from Automattic/vkarpov15/openuri-refactor
Browse files Browse the repository at this point in the history
Move all MongoDB-specific connection logic into driver layer, add `createClient()` method to handle creating MongoClient
  • Loading branch information
vkarpov15 committed Jul 4, 2023
2 parents bc5643d + 24a72f9 commit eb16d8d
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 202 deletions.
209 changes: 9 additions & 200 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ const clone = require('./helpers/clone');
const driver = require('./driver');
const get = require('./helpers/get');
const immediate = require('./helpers/immediate');
const mongodb = require('mongodb');
const pkg = require('../package.json');
const utils = require('./utils');
const processConnectionOptions = require('./helpers/processConnectionOptions');
const CreateCollectionsError = require('./error/createCollectionsError');

const arrayAtomicsSymbol = require('./helpers/symbols').arrayAtomicsSymbol;
Expand Down Expand Up @@ -739,7 +736,7 @@ Connection.prototype.openUri = async function openUri(uri, options) {
throw err;
}

this.$initialConnection = _createMongoClient(this, uri, options).
this.$initialConnection = this.createClient(uri, options).
then(() => this).
catch(err => {
this.readyState = STATES.disconnected;
Expand Down Expand Up @@ -796,184 +793,6 @@ function _handleConnectionErrors(err) {
return err;
}

/*!
* ignore
*/

async function _createMongoClient(conn, uri, options) {
if (typeof uri !== 'string') {
throw new MongooseError('The `uri` parameter to `openUri()` must be a ' +
`string, got "${typeof uri}". Make sure the first parameter to ` +
'`mongoose.connect()` or `mongoose.createConnection()` is a string.');
}

if (conn._destroyCalled) {
throw new MongooseError(
'Connection has been closed and destroyed, and cannot be used for re-opening the connection. ' +
'Please create a new connection with `mongoose.createConnection()` or `mongoose.connect()`.'
);
}

if (conn.readyState === STATES.connecting || conn.readyState === STATES.connected) {
if (conn._connectionString !== uri) {
throw new MongooseError('Can\'t call `openUri()` on an active connection with ' +
'different connection strings. Make sure you aren\'t calling `mongoose.connect()` ' +
'multiple times. See: https://mongoosejs.com/docs/connections.html#multiple_connections');
}
}

options = processConnectionOptions(uri, options);

if (options) {

const autoIndex = options.config && options.config.autoIndex != null ?
options.config.autoIndex :
options.autoIndex;
if (autoIndex != null) {
conn.config.autoIndex = autoIndex !== false;
delete options.config;
delete options.autoIndex;
}

if ('autoCreate' in options) {
conn.config.autoCreate = !!options.autoCreate;
delete options.autoCreate;
}

if ('sanitizeFilter' in options) {
conn.config.sanitizeFilter = options.sanitizeFilter;
delete options.sanitizeFilter;
}

// Backwards compat
if (options.user || options.pass) {
options.auth = options.auth || {};
options.auth.username = options.user;
options.auth.password = options.pass;

conn.user = options.user;
conn.pass = options.pass;
}
delete options.user;
delete options.pass;

if (options.bufferCommands != null) {
conn.config.bufferCommands = options.bufferCommands;
delete options.bufferCommands;
}
} else {
options = {};
}

conn._connectionOptions = options;
const dbName = options.dbName;
if (dbName != null) {
conn.$dbName = dbName;
}
delete options.dbName;

if (!utils.hasUserDefinedProperty(options, 'driverInfo')) {
options.driverInfo = {
name: 'Mongoose',
version: pkg.version
};
}

conn.readyState = STATES.connecting;
conn._connectionString = uri;

let client;
try {
client = new mongodb.MongoClient(uri, options);
} catch (error) {
conn.readyState = STATES.disconnected;
throw error;
}
conn.client = client;

client.setMaxListeners(0);
await client.connect();

_setClient(conn, client, options, dbName);

for (const db of conn.otherDbs) {
_setClient(db, client, {}, db.name);
}
return conn;
}

/*!
* ignore
*/

function _setClient(conn, client, options, dbName) {
const db = dbName != null ? client.db(dbName) : client.db();
conn.db = db;
conn.client = client;
conn.host = client &&
client.s &&
client.s.options &&
client.s.options.hosts &&
client.s.options.hosts[0] &&
client.s.options.hosts[0].host || void 0;
conn.port = client &&
client.s &&
client.s.options &&
client.s.options.hosts &&
client.s.options.hosts[0] &&
client.s.options.hosts[0].port || void 0;
conn.name = dbName != null ? dbName : client && client.s && client.s.options && client.s.options.dbName || void 0;
conn._closeCalled = client._closeCalled;

const _handleReconnect = () => {
// If we aren't disconnected, we assume this reconnect is due to a
// socket timeout. If there's no activity on a socket for
// `socketTimeoutMS`, the driver will attempt to reconnect and emit
// this event.
if (conn.readyState !== STATES.connected) {
conn.readyState = STATES.connected;
conn.emit('reconnect');
conn.emit('reconnected');
conn.onOpen();
}
};

const type = client &&
client.topology &&
client.topology.description &&
client.topology.description.type || '';

if (type === 'Single') {
client.on('serverDescriptionChanged', ev => {
const newDescription = ev.newDescription;
if (newDescription.type === 'Unknown') {
conn.readyState = STATES.disconnected;
} else {
_handleReconnect();
}
});
} else if (type.startsWith('ReplicaSet')) {
client.on('topologyDescriptionChanged', ev => {
// Emit disconnected if we've lost connectivity to the primary
const description = ev.newDescription;
if (conn.readyState === STATES.connected && description.type !== 'ReplicaSetWithPrimary') {
// Implicitly emits 'disconnected'
conn.readyState = STATES.disconnected;
} else if (conn.readyState === STATES.disconnected && description.type === 'ReplicaSetWithPrimary') {
_handleReconnect();
}
});
}

conn.onOpen();

for (const i in conn.collections) {
if (utils.object.hasOwnProperty(conn.collections, i)) {
conn.collections[i].onOpen();
}
}
}

/**
* Destroy the connection. Similar to [`.close`](https://mongoosejs.com/docs/api/connection.html#Connection.prototype.close()),
* but also removes the connection from Mongoose's `connections` list and prevents the
Expand Down Expand Up @@ -1526,26 +1345,16 @@ Connection.prototype.getClient = function getClient() {
* @return {Connection} this
*/

Connection.prototype.setClient = function setClient(client) {
if (!(client instanceof mongodb.MongoClient)) {
throw new MongooseError('Must call `setClient()` with an instance of MongoClient');
}
if (this.readyState !== STATES.disconnected) {
throw new MongooseError('Cannot call `setClient()` on a connection that is already connected.');
}
if (client.topology == null) {
throw new MongooseError('Cannot call `setClient()` with a MongoClient that you have not called `connect()` on yet.');
}

this._connectionString = client.s.url;
_setClient(this, client, {}, client.s.options.dbName);
Connection.prototype.setClient = function setClient() {
throw new MongooseError('Connection#setClient not implemented by driver');
};

for (const model of Object.values(this.models)) {
// Errors handled internally, so safe to ignore error
model.init().catch(function $modelInitNoop() {});
}
/*!
* Called internally by `openUri()` to create a MongoClient instance.
*/

return this;
Connection.prototype.createClient = function createClient() {
throw new MongooseError('Connection#createClient not implemented by driver');
};

/**
Expand Down

0 comments on commit eb16d8d

Please sign in to comment.