Skip to content

Commit

Permalink
Merge pull request #13726 from Automattic/vkarpov15/gh-13698
Browse files Browse the repository at this point in the history
fix(connection): reset document state in between transaction retries
  • Loading branch information
vkarpov15 committed Aug 13, 2023
2 parents 53fb14a + d53b2e8 commit 392f9fa
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 35 deletions.
81 changes: 49 additions & 32 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -517,49 +517,66 @@ Connection.prototype.startSession = async function startSession(options) {
Connection.prototype.transaction = function transaction(fn, options) {
return this.startSession().then(session => {
session[sessionNewDocuments] = new Map();
return session.withTransaction(() => fn(session), options).
return session.withTransaction(() => _wrapUserTransaction(fn, session), options).
then(res => {
delete session[sessionNewDocuments];
return res;
}).
catch(err => {
// If transaction was aborted, we need to reset newly
// inserted documents' `isNew`.
for (const doc of session[sessionNewDocuments].keys()) {
const state = session[sessionNewDocuments].get(doc);
if (state.hasOwnProperty('isNew')) {
doc.$isNew = state.$isNew;
}
if (state.hasOwnProperty('versionKey')) {
doc.set(doc.schema.options.versionKey, state.versionKey);
}

if (state.modifiedPaths.length > 0 && doc.$__.activePaths.states.modify == null) {
doc.$__.activePaths.states.modify = {};
}
for (const path of state.modifiedPaths) {
doc.$__.activePaths.paths[path] = 'modify';
doc.$__.activePaths.states.modify[path] = true;
}

for (const path of state.atomics.keys()) {
const val = doc.$__getValue(path);
if (val == null) {
continue;
}
val[arrayAtomicsSymbol] = state.atomics.get(path);
}
}
delete session[sessionNewDocuments];
throw err;
})
.finally(() => {
session.endSession()
.catch(() => {});
}).
finally(() => {
session.endSession().catch(() => {});
});
});
};

/*!
* Reset document state in between transaction retries re: gh-13698
*/

async function _wrapUserTransaction(fn, session) {
try {
const res = await fn(session);
return res;
} catch (err) {
_resetSessionDocuments(session);
throw err;
}
}

/*!
* If transaction was aborted, we need to reset newly inserted documents' `isNew`.
*/
function _resetSessionDocuments(session) {
for (const doc of session[sessionNewDocuments].keys()) {
const state = session[sessionNewDocuments].get(doc);
if (state.hasOwnProperty('isNew')) {
doc.$isNew = state.isNew;
}
if (state.hasOwnProperty('versionKey')) {
doc.set(doc.schema.options.versionKey, state.versionKey);
}

if (state.modifiedPaths.length > 0 && doc.$__.activePaths.states.modify == null) {
doc.$__.activePaths.states.modify = {};
}
for (const path of state.modifiedPaths) {
doc.$__.activePaths.paths[path] = 'modify';
doc.$__.activePaths.states.modify[path] = true;
}

for (const path of state.atomics.keys()) {
const val = doc.$__getValue(path);
if (val == null) {
continue;
}
val[arrayAtomicsSymbol] = state.atomics.get(path);
}
}
}

/**
* Helper for `dropCollection()`. Will delete the given collection, including
* all documents and indexes.
Expand Down
53 changes: 50 additions & 3 deletions test/docs/transactions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ describe('transactions', function() {
this.timeout(10000);

before(async function() {
if (!process.env.REPLICA_SET) {
if (!process.env.REPLICA_SET && !process.env.START_REPLICA_SET) {
_skipped = true;
this.skip();
}
db = start({ replicaSet: process.env.REPLICA_SET });
db = start(process.env.REPLICA_SET ? { replicaSet: process.env.REPLICA_SET } : {});
try {
await db.asPromise();

Expand Down Expand Up @@ -327,7 +327,7 @@ describe('transactions', function() {
// Session isn't committed
assert.equal(await Character.countDocuments({ title: /hand/i }), 0);

await tyrion.remove();
await tyrion.deleteOne();

// Undo both update and delete since doc should pull from `$session()`
await session.abortTransaction();
Expand All @@ -339,6 +339,7 @@ describe('transactions', function() {
});

it('save() with no changes (gh-8571)', async function() {
db.deleteModel(/Test/);
const Test = db.model('Test', Schema({ name: String }));

await Test.createCollection();
Expand All @@ -349,4 +350,50 @@ describe('transactions', function() {
});
await session.endSession();
});

it('transaction() resets $isNew on error', async function() {
db.deleteModel(/Test/);
const Test = db.model('Test', Schema({ name: String }));

await Test.createCollection();
await Test.deleteMany({});

const doc = new Test({ name: 'test' });
assert.ok(doc.$isNew);
await assert.rejects(
db.transaction(async(session) => {
await doc.save({ session });
throw new Error('Oops!');
}),
/Oops!/
);
assert.ok(doc.$isNew);
const exists = await Test.exists({ _id: doc._id });
assert.ok(!exists);
});

it('transaction() resets $isNew between retries (gh-13698)', async function() {
db.deleteModel(/Test/);
const Test = db.model('Test', Schema({ name: String }));

await Test.createCollection();
await Test.deleteMany({});

const doc = new Test({ name: 'test' });
assert.ok(doc.$isNew);
let retryCount = 0;
await db.transaction(async(session) => {
assert.ok(doc.$isNew);
await doc.save({ session });
if (++retryCount < 3) {
throw new mongoose.mongo.MongoServerError({
errorLabels: ['TransientTransactionError']
});
}
});

const docs = await Test.find();
assert.equal(docs.length, 1);
assert.equal(docs[0].name, 'test');
});
});

0 comments on commit 392f9fa

Please sign in to comment.