Skip to content

Commit 06e941a

Browse files
authoredMar 13, 2025··
fix(NODE-6845): ensure internal rejections are handled (#4448)
1 parent 54d29e5 commit 06e941a

17 files changed

+193
-151
lines changed
 

‎.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ secrets-export.fish
103103
mo-expansion.sh
104104
mo-expansion.yml
105105
expansions.sh
106+
uri.txt
106107

107108
.drivers-tools/
108109

‎.mocharc.js

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module.exports = {
77
require: [
88
'source-map-support/register',
99
'ts-node/register',
10+
'test/tools/runner/throw_rejections.cjs',
1011
'test/tools/runner/chai_addons.ts',
1112
'test/tools/runner/ee_checker.ts'
1213
],

‎src/cursor/abstract_cursor.ts

+49-40
Original file line numberDiff line numberDiff line change
@@ -1146,50 +1146,59 @@ class ReadableCursorStream extends Readable {
11461146
return;
11471147
}
11481148

1149-
this._cursor.next().then(
1150-
result => {
1151-
if (result == null) {
1152-
this.push(null);
1153-
} else if (this.destroyed) {
1154-
this._cursor.close().then(undefined, squashError);
1155-
} else {
1156-
if (this.push(result)) {
1157-
return this._readNext();
1149+
this._cursor
1150+
.next()
1151+
.then(
1152+
// result from next()
1153+
result => {
1154+
if (result == null) {
1155+
this.push(null);
1156+
} else if (this.destroyed) {
1157+
this._cursor.close().then(undefined, squashError);
1158+
} else {
1159+
if (this.push(result)) {
1160+
return this._readNext();
1161+
}
1162+
1163+
this._readInProgress = false;
1164+
}
1165+
},
1166+
// error from next()
1167+
err => {
1168+
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
1169+
// desired behavior is that a stream ends cleanly when a user explicitly closes
1170+
// a client during iteration. Alternatively, we could do the "right" thing and
1171+
// propagate the error message by removing this special case.
1172+
if (err.message.match(/server is closed/)) {
1173+
this._cursor.close().then(undefined, squashError);
1174+
return this.push(null);
11581175
}
11591176

1160-
this._readInProgress = false;
1161-
}
1162-
},
1163-
err => {
1164-
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
1165-
// desired behavior is that a stream ends cleanly when a user explicitly closes
1166-
// a client during iteration. Alternatively, we could do the "right" thing and
1167-
// propagate the error message by removing this special case.
1168-
if (err.message.match(/server is closed/)) {
1169-
this._cursor.close().then(undefined, squashError);
1170-
return this.push(null);
1171-
}
1177+
// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
1178+
// to be "operation was interrupted", where a cursor has been closed but there is an
1179+
// active getMore in-flight. This used to check if the cursor was killed but once
1180+
// that changed to happen in cleanup legitimate errors would not destroy the
1181+
// stream. There are change streams test specifically test these cases.
1182+
if (err.message.match(/operation was interrupted/)) {
1183+
return this.push(null);
1184+
}
11721185

1173-
// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
1174-
// to be "operation was interrupted", where a cursor has been closed but there is an
1175-
// active getMore in-flight. This used to check if the cursor was killed but once
1176-
// that changed to happen in cleanup legitimate errors would not destroy the
1177-
// stream. There are change streams test specifically test these cases.
1178-
if (err.message.match(/operation was interrupted/)) {
1179-
return this.push(null);
1186+
// NOTE: The two above checks on the message of the error will cause a null to be pushed
1187+
// to the stream, thus closing the stream before the destroy call happens. This means
1188+
// that either of those error messages on a change stream will not get a proper
1189+
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
1190+
// relies on that error event to be emitted to create its new cursor and thus was not
1191+
// working on 4.4 servers because the error emitted on failover was "interrupted at
1192+
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
1193+
// See NODE-4475.
1194+
return this.destroy(err);
11801195
}
1181-
1182-
// NOTE: The two above checks on the message of the error will cause a null to be pushed
1183-
// to the stream, thus closing the stream before the destroy call happens. This means
1184-
// that either of those error messages on a change stream will not get a proper
1185-
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
1186-
// relies on that error event to be emitted to create its new cursor and thus was not
1187-
// working on 4.4 servers because the error emitted on failover was "interrupted at
1188-
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
1189-
// See NODE-4475.
1190-
return this.destroy(err);
1191-
}
1192-
);
1196+
)
1197+
// if either of the above handlers throw
1198+
.catch(error => {
1199+
this._readInProgress = false;
1200+
this.destroy(error);
1201+
});
11931202
}
11941203
}
11951204

‎src/sdam/server.ts

+7-4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import {
4848
maxWireVersion,
4949
type MongoDBNamespace,
5050
noop,
51+
squashError,
5152
supportsRetryableWrites
5253
} from '../utils';
5354
import { throwIfWriteConcernError } from '../write_concern';
@@ -345,9 +346,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
345346
operationError instanceof MongoError &&
346347
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
347348
) {
348-
reauthPromise = this.pool.reauthenticate(conn).catch(error => {
349+
reauthPromise = this.pool.reauthenticate(conn);
350+
reauthPromise.then(undefined, error => {
349351
reauthPromise = null;
350-
throw error;
352+
squashError(error);
351353
});
352354

353355
await abortable(reauthPromise, options);
@@ -368,9 +370,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
368370
if (session?.pinnedConnection !== conn) {
369371
if (reauthPromise != null) {
370372
// The reauth promise only exists if it hasn't thrown.
371-
void reauthPromise.finally(() => {
373+
const checkBackIn = () => {
372374
this.pool.checkIn(conn);
373-
});
375+
};
376+
void reauthPromise.then(checkBackIn, checkBackIn);
374377
} else {
375378
this.pool.checkIn(conn);
376379
}

‎src/timeout.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';
33
import { type Document } from './bson';
44
import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error';
55
import { type ClientSession } from './sessions';
6-
import { csotMin, noop } from './utils';
6+
import { csotMin, noop, squashError } from './utils';
77

88
/** @internal */
99
export class TimeoutError extends Error {
@@ -102,7 +102,13 @@ export class Timeout extends Promise<never> {
102102
}
103103

104104
throwIfExpired(): void {
105-
if (this.timedOut) throw new TimeoutError('Timed out', { duration: this.duration });
105+
if (this.timedOut) {
106+
// This method is invoked when someone wants to throw immediately instead of await the result of this promise
107+
// Since they won't be handling the rejection from the promise (because we're about to throw here)
108+
// attach handling to prevent this from bubbling up to Node.js
109+
this.then(undefined, squashError);
110+
throw new TimeoutError('Timed out', { duration: this.duration });
111+
}
106112
}
107113

108114
public static expires(duration: number, unref?: true): Timeout {

‎test/integration/change-streams/change_stream.test.ts

+1
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ describe('Change Streams', function () {
819819
const write = lastWrite();
820820

821821
const nextP = changeStream.next();
822+
nextP.catch(() => null);
822823

823824
await changeStream.close();
824825

‎test/integration/client-side-operations-timeout/node_csot.test.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,8 @@ describe('CSOT driver tests', metadata, () => {
10231023

10241024
beforeEach(async function () {
10251025
cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 });
1026-
const _changePromise = once(cs, 'change');
1026+
cs.once('change', () => null);
1027+
10271028
await once(cs.cursor, 'init');
10281029

10291030
await internalClient.db().admin().command(failpoint);

‎test/integration/node-specific/abort_signal.test.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ describe('AbortSignal support', () => {
753753
if (args[1].find != null) {
754754
commandStub.restore();
755755
controller.abort();
756-
throw new ReAuthenticationError({});
756+
throw new ReAuthenticationError({ message: 'This is a fake reauthentication error' });
757757
}
758758
return commandStub.wrappedMethod.apply(this, args);
759759
});
@@ -792,8 +792,9 @@ describe('AbortSignal support', () => {
792792
describe('if reauth throws', () => {
793793
beforeEach(() => {
794794
sinon.stub(ConnectionPool.prototype, 'reauthenticate').callsFake(async function () {
795+
const error = new Error('Rejecting reauthenticate for testing');
795796
await sleep(1000);
796-
throw new Error();
797+
throw error;
797798
});
798799
});
799800

‎test/integration/node-specific/examples/change_streams.test.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ maybeDescribe('examples(change-stream):', function () {
6060
it('Open A Change Stream', {
6161
metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } },
6262
test: async function () {
63-
const looper = new Looper(() => db.collection('inventory').insertOne({ a: 1 }));
63+
const looper = new Looper(async () => {
64+
await db.collection('inventory').insertOne({ a: 1 });
65+
});
6466
looper.run();
6567

6668
// Start Changestream Example 1

‎test/integration/shared.js

+15-22
Original file line numberDiff line numberDiff line change
@@ -90,32 +90,25 @@ function ignoreNsNotFound(err) {
9090
if (!err.message.match(/ns not found/)) throw err;
9191
}
9292

93-
function setupDatabase(configuration, dbsToClean) {
93+
async function setupDatabase(configuration, dbsToClean) {
9494
dbsToClean = Array.isArray(dbsToClean) ? dbsToClean : [];
95-
var configDbName = configuration.db;
96-
var client = configuration.newClient(configuration.writeConcernMax(), {
97-
maxPoolSize: 1
98-
});
95+
const configDbName = configuration.db;
9996

10097
dbsToClean.push(configDbName);
10198

102-
return client
103-
.connect()
104-
.then(() =>
105-
dbsToClean.reduce(
106-
(result, dbName) =>
107-
result
108-
.then(() =>
109-
client.db(dbName).command({ dropAllUsersFromDatabase: 1, writeConcern: { w: 1 } })
110-
)
111-
.then(() => client.db(dbName).dropDatabase({ writeConcern: { w: 1 } })),
112-
Promise.resolve()
113-
)
114-
)
115-
.then(
116-
() => client.close(),
117-
err => client.close(() => Promise.reject(err))
118-
);
99+
const client = configuration.newClient();
100+
try {
101+
for (const dbName of dbsToClean) {
102+
const db = await client.db(dbName);
103+
for await (const { name } of db.listCollections({}, { nameOnly: true })) {
104+
const collection = db.collection(name);
105+
await collection.deleteMany({}).catch(() => null);
106+
await collection.drop().catch(() => null);
107+
}
108+
}
109+
} finally {
110+
await client.close();
111+
}
119112
}
120113

121114
/**

‎test/manual/mocharc.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ const [major] = process.versions.node.split('.');
44

55
/** @type {import("mocha").MochaOptions} */
66
module.exports = {
7-
require: ['ts-node/register', 'test/tools/runner/chai_addons.ts'],
7+
require: [
8+
'ts-node/register',
9+
'test/tools/runner/throw_rejections.cjs',
10+
'test/tools/runner/chai_addons.ts'
11+
],
812
reporter: 'test/tools/reporter/mongodb_reporter.js',
913
failZero: true,
1014
color: true,

‎test/mocha_lambda.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ const [major] = process.versions.node.split('.');
44

55
/** @type {import("mocha").MochaOptions} */
66
module.exports = {
7-
require: ['test/integration/node-specific/examples/setup.js'],
7+
require: [
8+
'test/tools/runner/throw_rejections.cjs',
9+
'test/integration/node-specific/examples/setup.js'
10+
],
811
extension: ['js'],
912
ui: 'test/tools/runner/metadata_ui.js',
1013
recursive: true,

‎test/mocha_mongodb.js

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module.exports = {
77
require: [
88
'source-map-support/register',
99
'ts-node/register',
10+
'test/tools/runner/throw_rejections.cjs',
1011
'test/tools/runner/chai_addons.ts',
1112
'test/tools/runner/ee_checker.ts',
1213
'test/tools/runner/hooks/configuration.ts',
+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// eslint-disable-next-line @typescript-eslint/no-require-imports
2+
const process = require('process');
3+
4+
process.on('unhandledRejection', error => {
5+
throw error;
6+
});

‎test/unit/assorted/optional_require.test.js

-69
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { expect } from 'chai';
2+
import { existsSync } from 'fs';
3+
import { resolve } from 'path';
4+
5+
import {
6+
AuthContext,
7+
compress,
8+
GSSAPI,
9+
HostAddress,
10+
MongoDBAWS,
11+
MongoMissingDependencyError
12+
} from '../../mongodb';
13+
14+
function moduleExistsSync(moduleName) {
15+
return existsSync(resolve(__dirname, `../../../node_modules/${moduleName}`));
16+
}
17+
18+
describe('optionalRequire', function () {
19+
describe('Snappy', function () {
20+
it('should error if not installed', async function () {
21+
const moduleName = 'snappy';
22+
if (moduleExistsSync(moduleName)) {
23+
return this.skip();
24+
}
25+
26+
const error = await compress(
27+
{ zlibCompressionLevel: 0, agreedCompressor: 'snappy' },
28+
Buffer.alloc(1)
29+
).then(
30+
() => null,
31+
e => e
32+
);
33+
34+
expect(error).to.be.instanceOf(MongoMissingDependencyError);
35+
});
36+
});
37+
38+
describe('Kerberos', function () {
39+
it('should error if not installed', async function () {
40+
const moduleName = 'kerberos';
41+
if (moduleExistsSync(moduleName)) {
42+
return this.skip();
43+
}
44+
const gssapi = new GSSAPI();
45+
46+
const error = await gssapi
47+
.auth(new AuthContext(null, true, { hostAddress: new HostAddress('a'), credentials: true }))
48+
.then(
49+
() => null,
50+
e => e
51+
);
52+
53+
expect(error).to.be.instanceOf(MongoMissingDependencyError);
54+
});
55+
});
56+
57+
describe('aws4', function () {
58+
it('should error if not installed', async function () {
59+
const moduleName = 'aws4';
60+
if (moduleExistsSync(moduleName)) {
61+
return this.skip();
62+
}
63+
const mdbAWS = new MongoDBAWS();
64+
65+
const error = await mdbAWS
66+
.auth(new AuthContext({ hello: { maxWireVersion: 9 } }, true, null))
67+
.catch(error => error);
68+
69+
expect(error).to.be.instanceOf(MongoMissingDependencyError);
70+
});
71+
});
72+
});

‎test/unit/cmap/auth/auth_provider.test.ts

+15-8
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
11
import { expect } from 'chai';
22

3-
import { AuthProvider, MongoRuntimeError } from '../../../mongodb';
3+
import { type AuthContext, AuthProvider, MongoRuntimeError } from '../../../mongodb';
44

55
describe('AuthProvider', function () {
66
describe('#reauth', function () {
77
context('when the provider is already reauthenticating', function () {
8-
const provider = new AuthProvider();
8+
const provider = new (class extends AuthProvider {
9+
override auth(_context: AuthContext): Promise<void> {
10+
throw new Error('Method not implemented.');
11+
}
12+
})();
13+
914
const context = { reauthenticating: true };
1015

11-
it('returns an error', function () {
12-
provider.reauth(context, error => {
13-
expect(error).to.exist;
14-
expect(error).to.be.instanceOf(MongoRuntimeError);
15-
expect(error?.message).to.equal('Reauthentication already in progress.');
16-
});
16+
it('returns an error', async function () {
17+
const error = await provider.reauth(context).then(
18+
() => null,
19+
error => error
20+
);
21+
expect(error).to.exist;
22+
expect(error).to.be.instanceOf(MongoRuntimeError);
23+
expect(error?.message).to.equal('Reauthentication already in progress.');
1724
});
1825
});
1926
});

0 commit comments

Comments
 (0)
Please sign in to comment.