Skip to content

Commit 90a717e

Browse files
authoredMar 13, 2025··
fix(executor): do not use leaking registerAbortSignalListener, and handle listeners inside the execution context (#6977)
* fix(executor): do not use leaking `registerAbortSignalListener`, and handle listeners inside the execution context * lets go

File tree

6 files changed

+89
-41
lines changed

6 files changed

+89
-41
lines changed
 

‎.changeset/swift-geese-behave.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@graphql-tools/executor': patch
3+
'@graphql-tools/utils': patch
4+
---
5+
6+
In executor, do not use leaking `registerAbortSignalListener`, and handle listeners inside the
7+
execution context

‎packages/executor/src/execution/__tests__/abort-signal.test.ts

+17-14
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ describe('Abort Signal', () => {
3434
subscribe() {
3535
return new Repeater(async (push, stop) => {
3636
let i = 0;
37-
stop.then(() => {
37+
stop.finally(() => {
3838
stopped = true;
3939
});
4040

@@ -150,7 +150,7 @@ describe('Abort Signal', () => {
150150
didInvokeFirstFn = true;
151151
return true;
152152
},
153-
async second() {
153+
second() {
154154
didInvokeSecondFn = true;
155155
controller.abort();
156156
return true;
@@ -162,18 +162,21 @@ describe('Abort Signal', () => {
162162
},
163163
},
164164
});
165-
const result$ = normalizedExecutor({
166-
schema,
167-
document: parse(/* GraphQL */ `
168-
mutation {
169-
first
170-
second
171-
third
172-
}
173-
`),
174-
signal: controller.signal,
175-
});
176-
expect(result$).rejects.toBeInstanceOf(DOMException);
165+
await expect(
166+
Promise.resolve().then(() =>
167+
normalizedExecutor({
168+
schema,
169+
document: parse(/* GraphQL */ `
170+
mutation {
171+
first
172+
second
173+
third
174+
}
175+
`),
176+
signal: controller.signal,
177+
}),
178+
),
179+
).rejects.toBeInstanceOf(DOMException);
177180
expect(didInvokeFirstFn).toBe(true);
178181
expect(didInvokeSecondFn).toBe(true);
179182
expect(didInvokeThirdFn).toBe(false);

‎packages/executor/src/execution/__tests__/stream-test.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -583,9 +583,7 @@ describe('Execute: stream directive', () => {
583583
],
584584
hasNext: true,
585585
},
586-
{
587-
hasNext: false,
588-
},
586+
{ hasNext: false },
589587
]);
590588
});
591589

‎packages/executor/src/execution/execute.ts

+54-18
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import {
3434
collectFields,
3535
createGraphQLError,
3636
fakePromise,
37-
getAbortPromise,
3837
getArgumentValues,
3938
getDefinedRootType,
4039
GraphQLResolveInfo,
@@ -52,11 +51,10 @@ import {
5251
Path,
5352
pathToArray,
5453
promiseReduce,
55-
registerAbortSignalListener,
5654
} from '@graphql-tools/utils';
5755
import { TypedDocumentNode } from '@graphql-typed-document-node/core';
5856
import { DisposableSymbols } from '@whatwg-node/disposablestack';
59-
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
57+
import { createDeferredPromise, handleMaybePromise } from '@whatwg-node/promise-helpers';
6058
import { coerceError } from './coerceError.js';
6159
import { flattenAsyncIterable } from './flattenAsyncIterable.js';
6260
import { invariant } from './invariant.js';
@@ -127,6 +125,8 @@ export interface ExecutionContext<TVariables = any, TContext = any> {
127125
errors: Array<GraphQLError>;
128126
subsequentPayloads: Set<AsyncPayloadRecord>;
129127
signal?: AbortSignal;
128+
onSignalAbort?(handler: () => void): void;
129+
signalPromise?: Promise<never>;
130130
}
131131

132132
export interface FormattedExecutionResult<
@@ -421,6 +421,8 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
421421
signal,
422422
} = args;
423423

424+
signal?.throwIfAborted();
425+
424426
// If the schema used for execution is invalid, throw an error.
425427
assertValidSchema(schema);
426428

@@ -489,6 +491,31 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
489491
return coercedVariableValues.errors;
490492
}
491493

494+
signal?.throwIfAborted();
495+
496+
let onSignalAbort: ExecutionContext['onSignalAbort'];
497+
let signalPromise: ExecutionContext['signalPromise'];
498+
499+
if (signal) {
500+
const listeners = new Set<() => void>();
501+
const signalDeferred = createDeferredPromise<never>();
502+
signalPromise = signalDeferred.promise;
503+
const sharedListener = () => {
504+
signalDeferred.reject(signal.reason);
505+
signal.removeEventListener('abort', sharedListener);
506+
};
507+
signal.addEventListener('abort', sharedListener, { once: true });
508+
signalPromise.catch(() => {
509+
for (const listener of listeners) {
510+
listener();
511+
}
512+
listeners.clear();
513+
});
514+
onSignalAbort = handler => {
515+
listeners.add(handler);
516+
};
517+
}
518+
492519
return {
493520
schema,
494521
fragments,
@@ -502,6 +529,8 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
502529
subsequentPayloads: new Set(),
503530
errors: [],
504531
signal,
532+
onSignalAbort,
533+
signalPromise,
505534
};
506535
}
507536

@@ -626,7 +655,7 @@ function executeFields(
626655
}
627656
}
628657
} catch (error) {
629-
if (containsPromise) {
658+
if (error !== exeContext.signal?.reason && containsPromise) {
630659
// Ensure that any promises returned by other fields are handled, as they may also reject.
631660
return handleMaybePromise(
632661
() => promiseForObject(results, exeContext.signal),
@@ -649,7 +678,7 @@ function executeFields(
649678
// Otherwise, results is a map from field name to the result of resolving that
650679
// field, which is possibly a promise. Return a promise that will return this
651680
// same map, but with any promises replaced with the values they resolved to.
652-
return promiseForObject(results, exeContext.signal);
681+
return promiseForObject(results, exeContext.signal, exeContext.signalPromise);
653682
}
654683

655684
/**
@@ -679,6 +708,7 @@ function executeField(
679708

680709
// Get the resolve function, regardless of if its result is normal or abrupt (error).
681710
try {
711+
exeContext.signal?.throwIfAborted();
682712
// Build a JS object of arguments from the field.arguments AST, using the
683713
// variables scope to fulfill any variable references.
684714
// TODO: find a way to memoize, in case this field is within a List type.
@@ -973,8 +1003,9 @@ async function completeAsyncIteratorValue(
9731003
iterator: AsyncIterator<unknown>,
9741004
asyncPayloadRecord?: AsyncPayloadRecord,
9751005
): Promise<ReadonlyArray<unknown>> {
976-
if (exeContext.signal && iterator.return) {
977-
registerAbortSignalListener(exeContext.signal, () => {
1006+
exeContext.signal?.throwIfAborted();
1007+
if (iterator.return) {
1008+
exeContext.onSignalAbort?.(() => {
9781009
iterator.return?.();
9791010
});
9801011
}
@@ -1755,18 +1786,25 @@ function executeSubscription(exeContext: ExecutionContext): MaybePromise<AsyncIt
17551786
const result = resolveFn(rootValue, args, contextValue, info);
17561787

17571788
if (isPromise(result)) {
1758-
return result.then(assertEventStream).then(undefined, error => {
1759-
throw locatedError(error, fieldNodes, pathToArray(path));
1760-
});
1789+
return result
1790+
.then(result => assertEventStream(result, exeContext.signal, exeContext.onSignalAbort))
1791+
.then(undefined, error => {
1792+
throw locatedError(error, fieldNodes, pathToArray(path));
1793+
});
17611794
}
17621795

1763-
return assertEventStream(result, exeContext.signal);
1796+
return assertEventStream(result, exeContext.signal, exeContext.onSignalAbort);
17641797
} catch (error) {
17651798
throw locatedError(error, fieldNodes, pathToArray(path));
17661799
}
17671800
}
17681801

1769-
function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable<unknown> {
1802+
function assertEventStream(
1803+
result: unknown,
1804+
signal?: AbortSignal,
1805+
onSignalAbort?: (handler: () => void) => void,
1806+
): AsyncIterable<unknown> {
1807+
signal?.throwIfAborted();
17701808
if (result instanceof Error) {
17711809
throw result;
17721810
}
@@ -1777,13 +1815,13 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable
17771815
'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`,
17781816
);
17791817
}
1780-
if (signal) {
1818+
if (onSignalAbort) {
17811819
return {
17821820
[Symbol.asyncIterator]() {
17831821
const asyncIterator = result[Symbol.asyncIterator]();
17841822

17851823
if (asyncIterator.return) {
1786-
registerAbortSignalListener(signal, () => {
1824+
onSignalAbort?.(() => {
17871825
asyncIterator.return?.();
17881826
});
17891827
}
@@ -2110,8 +2148,6 @@ function yieldSubsequentPayloads(
21102148
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
21112149
let isDone = false;
21122150

2113-
const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined;
2114-
21152151
async function next(): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
21162152
if (isDone) {
21172153
return { value: undefined, done: true };
@@ -2121,8 +2157,8 @@ function yieldSubsequentPayloads(
21212157
record => record.promise,
21222158
);
21232159

2124-
if (abortPromise) {
2125-
await Promise.race([abortPromise, ...subSequentPayloadPromises]);
2160+
if (exeContext.signalPromise) {
2161+
await Promise.race([exeContext.signalPromise, ...subSequentPayloadPromises]);
21262162
} else {
21272163
await Promise.race(subSequentPayloadPromises);
21282164
}

‎packages/executor/src/execution/promiseForObject.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import { getAbortPromise, isPromise, MaybePromise } from '@graphql-tools/utils';
2-
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
1+
import { handleMaybePromise, isPromise, MaybePromise } from '@whatwg-node/promise-helpers';
32

43
type ResolvedObject<TData> = {
54
[TKey in keyof TData]: TData[TKey] extends Promise<infer TValue> ? TValue : TData[TKey];
@@ -15,7 +14,9 @@ type ResolvedObject<TData> = {
1514
export function promiseForObject<TData>(
1615
object: TData,
1716
signal?: AbortSignal,
17+
signalPromise?: Promise<never>,
1818
): MaybePromise<ResolvedObject<TData>> {
19+
signal?.throwIfAborted();
1920
const resolvedObject = Object.create(null);
2021
const promises: Promise<void>[] = [];
2122
for (const key in object) {
@@ -33,9 +34,8 @@ export function promiseForObject<TData>(
3334
return resolvedObject;
3435
}
3536
const promiseAll = promises.length === 1 ? promises[0] : Promise.all(promises);
36-
if (signal) {
37-
const abortPromise = getAbortPromise(signal);
38-
return Promise.race([abortPromise, promiseAll]).then(() => resolvedObject);
37+
if (signalPromise) {
38+
return Promise.race([signalPromise, promiseAll]).then(() => resolvedObject);
3939
}
4040
return promiseAll.then(() => resolvedObject);
4141
}

‎packages/utils/src/registerAbortSignalListener.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { fakeRejectPromise } from '@whatwg-node/promise-helpers';
12
import { memoize1 } from './memoize.js';
23

34
// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected"
@@ -32,8 +33,11 @@ export function registerAbortSignalListener(signal: AbortSignal, listener: () =>
3233
}
3334

3435
export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) {
36+
// If the signal is already aborted, return a rejected promise
37+
if (signal.aborted) {
38+
return fakeRejectPromise(signal.reason);
39+
}
3540
return new Promise<void>((_resolve, reject) => {
36-
// If the signal is already aborted, return a rejected promise
3741
if (signal.aborted) {
3842
reject(signal.reason);
3943
return;

0 commit comments

Comments
 (0)
Please sign in to comment.