Skip to content

Commit 71db754

Browse files
n1ru4lgithub-actions[bot]ardatan
authoredMar 12, 2024··
fix: onSubscribeError should be invoked (#3196)
* make that test fail * this fixes it * test: subscription plugin hooks are invoked * test: error masking for subscriptions * fix: ensure onSubscribeError is invoked * this version works * feat: result handling * no need to wrap; this is responsibility of executor * ok * chore(dependencies): updated changesets for modified dependencies * Use mapAsyncIterator instead * Use error handler * Update executor version * chore(dependencies): updated changesets for modified dependencies * Test if error masking fn is called once * chore(dependencies): updated changesets for modified dependencies * changeset: onParams setResult --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Arda TANRIKULU <ardatanrikulu@gmail.com>
1 parent 6725f8e commit 71db754

File tree

11 files changed

+633
-74
lines changed

11 files changed

+633
-74
lines changed
 

‎.changeset/fair-pots-matter.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'graphql-yoga': minor
3+
---
4+
5+
Allow setting async iterable within `onParams` hook `setResult` function
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'graphql-yoga': patch
3+
---
4+
dependencies updates:
5+
- Updated dependency [`@graphql-tools/executor@^1.2.2`
6+
↗︎](https://www.npmjs.com/package/@graphql-tools/executor/v/1.2.2) (from `^1.0.0`, in
7+
`dependencies`)
8+
- Updated dependency [`@graphql-tools/utils@^10.1.0`
9+
↗︎](https://www.npmjs.com/package/@graphql-tools/utils/v/10.1.0) (from `^10.0.0`, in
10+
`dependencies`)

‎packages/graphql-yoga/__tests__/error-masking.spec.ts

+192
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { inspect } from '@graphql-tools/utils';
22
import { createGraphQLError, createSchema, createYoga } from '../src/index.js';
3+
import { eventStream } from './utilities.js';
34

45
describe('error masking', () => {
56
function createTestSchema() {
@@ -501,4 +502,195 @@ describe('error masking', () => {
501502
],
502503
});
503504
});
505+
506+
it('subscription event source error is masked', async () => {
507+
const eventSouce = (async function* source() {
508+
yield { hi: 'hi' };
509+
throw new Error('I like turtles');
510+
})();
511+
512+
const schema = createSchema({
513+
typeDefs: /* GraphQL */ `
514+
type Subscription {
515+
hi: String!
516+
}
517+
type Query {
518+
hi: String!
519+
}
520+
`,
521+
resolvers: {
522+
Subscription: {
523+
hi: {
524+
subscribe: () => eventSouce,
525+
},
526+
},
527+
},
528+
});
529+
530+
const yoga = createYoga({ schema, logging: false });
531+
532+
const response = await yoga.fetch('http://yoga/graphql', {
533+
method: 'POST',
534+
headers: {
535+
'content-type': 'application/json',
536+
accept: 'text/event-stream',
537+
},
538+
body: JSON.stringify({
539+
query: /* GraphQL */ `
540+
subscription {
541+
hi
542+
}
543+
`,
544+
}),
545+
});
546+
547+
let counter = 0;
548+
549+
for await (const chunk of eventStream(response.body!)) {
550+
if (counter === 0) {
551+
expect(chunk).toEqual({ data: { hi: 'hi' } });
552+
counter++;
553+
continue;
554+
} else if (counter === 1) {
555+
expect(chunk).toMatchObject({ errors: [{ message: 'Unexpected error.' }] });
556+
counter++;
557+
continue;
558+
}
559+
560+
throw new Error('Should not have received more than 2 chunks.');
561+
}
562+
563+
expect(counter).toBe(2);
564+
});
565+
566+
it('subscription event source creation error is masked', async () => {
567+
const schema = createSchema({
568+
typeDefs: /* GraphQL */ `
569+
type Subscription {
570+
hi: String!
571+
}
572+
type Query {
573+
hi: String!
574+
}
575+
`,
576+
resolvers: {
577+
Subscription: {
578+
hi: {
579+
subscribe: () => {
580+
throw new Error('I like turtles');
581+
},
582+
},
583+
},
584+
},
585+
});
586+
587+
const yoga = createYoga({ schema, logging: false });
588+
589+
const response = await yoga.fetch('http://yoga/graphql', {
590+
method: 'POST',
591+
headers: {
592+
'content-type': 'application/json',
593+
accept: 'text/event-stream',
594+
},
595+
body: JSON.stringify({
596+
query: /* GraphQL */ `
597+
subscription {
598+
hi
599+
}
600+
`,
601+
}),
602+
});
603+
604+
let counter = 0;
605+
606+
for await (const chunk of eventStream(response.body!)) {
607+
if (counter === 0) {
608+
expect(chunk).toMatchObject({ errors: [{ message: 'Unexpected error.', path: ['hi'] }] });
609+
counter++;
610+
continue;
611+
}
612+
613+
throw new Error('Should not have received more than 2 chunks.');
614+
}
615+
616+
expect(counter).toBe(1);
617+
});
618+
619+
it('subscription field resolve error is masked', async () => {
620+
const eventSource = (async function* source() {
621+
yield 1;
622+
yield 2;
623+
yield 3;
624+
})();
625+
const schema = createSchema({
626+
typeDefs: /* GraphQL */ `
627+
type Subscription {
628+
hi: String!
629+
}
630+
type Query {
631+
hi: String!
632+
}
633+
`,
634+
resolvers: {
635+
Subscription: {
636+
hi: {
637+
subscribe: () => eventSource,
638+
resolve: data => {
639+
if (data === 1) {
640+
return 'hee';
641+
}
642+
if (data === 2) {
643+
throw new Error('I like turtles');
644+
}
645+
if (data === 3) {
646+
return 'hoo';
647+
}
648+
throw new Error('This shall never be reached');
649+
},
650+
},
651+
},
652+
},
653+
});
654+
655+
const yoga = createYoga({ schema, logging: false });
656+
657+
const response = await yoga.fetch('http://yoga/graphql', {
658+
method: 'POST',
659+
headers: {
660+
'content-type': 'application/json',
661+
accept: 'text/event-stream',
662+
},
663+
body: JSON.stringify({
664+
query: /* GraphQL */ `
665+
subscription {
666+
hi
667+
}
668+
`,
669+
}),
670+
});
671+
672+
let counter = 0;
673+
674+
for await (const chunk of eventStream(response.body!)) {
675+
if (counter === 0) {
676+
expect(chunk).toMatchObject({ data: { hi: 'hee' } });
677+
counter++;
678+
continue;
679+
}
680+
if (counter === 1) {
681+
expect(chunk).toMatchObject({ errors: [{ message: 'Unexpected error.', path: ['hi'] }] });
682+
counter++;
683+
continue;
684+
}
685+
if (counter === 2) {
686+
expect(chunk).toMatchObject({ data: { hi: 'hoo' } });
687+
counter++;
688+
continue;
689+
}
690+
691+
throw new Error('Should not have received more than 3 chunks.');
692+
}
693+
694+
expect(counter).toBe(3);
695+
});
504696
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { createYoga, type Plugin } from '../src';
2+
import { eventStream } from './utilities';
3+
4+
test('onParams -> setResult to single execution result', async () => {
5+
const plugin: Plugin = {
6+
async onParams({ setResult }) {
7+
setResult({ data: { hello: 'world' } });
8+
},
9+
};
10+
11+
const yoga = createYoga({ plugins: [plugin] });
12+
13+
const result = await yoga.fetch('http://yoga/graphql', {
14+
method: 'POST',
15+
body: JSON.stringify({ query: '{ hello }' }),
16+
headers: {
17+
'Content-Type': 'application/json',
18+
},
19+
});
20+
21+
expect(result.status).toBe(200);
22+
const body = await result.json();
23+
expect(body).toEqual({ data: { hello: 'world' } });
24+
});
25+
26+
test('onParams -> setResult to event stream execution result', async () => {
27+
const plugin: Plugin = {
28+
async onParams({ setResult }) {
29+
setResult(
30+
(async function* () {
31+
yield { data: { hello: 'hee' } };
32+
yield { data: { hello: 'hoo' } };
33+
})(),
34+
);
35+
},
36+
};
37+
38+
const yoga = createYoga({ plugins: [plugin] });
39+
40+
const result = await yoga.fetch('http://yoga/graphql', {
41+
method: 'POST',
42+
body: JSON.stringify({ query: '{ hello }' }),
43+
headers: {
44+
'Content-Type': 'application/json',
45+
Accept: 'text/event-stream',
46+
},
47+
});
48+
49+
expect(result.status).toBe(200);
50+
let counter = 0;
51+
for await (const value of eventStream(result.body!)) {
52+
if (counter === 0) {
53+
expect(value).toEqual({ data: { hello: 'hee' } });
54+
counter++;
55+
} else if (counter === 1) {
56+
expect(value).toEqual({ data: { hello: 'hoo' } });
57+
counter++;
58+
}
59+
}
60+
expect(counter).toBe(2);
61+
});

‎packages/graphql-yoga/__tests__/subscriptions.spec.ts

+240-28
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,6 @@
11
import { GraphQLError } from 'graphql';
2-
import { createSchema, createYoga, Repeater } from '../src/index.js';
3-
4-
function eventStream<TType = unknown>(source: ReadableStream<Uint8Array>) {
5-
return new Repeater<TType>(async (push, end) => {
6-
const cancel: Promise<{ done: true }> = end.then(() => ({ done: true }));
7-
const iterable = source[Symbol.asyncIterator]();
8-
// eslint-disable-next-line no-constant-condition
9-
while (true) {
10-
const result = await Promise.race([cancel, iterable.next()]);
11-
12-
if (result.done) {
13-
break;
14-
}
15-
16-
const values = result.value.toString().split('\n\n').filter(Boolean);
17-
for (const value of values) {
18-
if (!value.startsWith('data: ')) {
19-
continue;
20-
}
21-
const result = value.replace('data: ', '');
22-
push(JSON.parse(result));
23-
}
24-
}
25-
26-
iterable.return?.();
27-
end();
28-
});
29-
}
2+
import { createSchema, createYoga, maskError, Plugin } from '../src/index.js';
3+
import { eventStream } from './utilities.js';
304

315
describe('Subscription', () => {
326
test('eventStream', async () => {
@@ -85,6 +59,10 @@ describe('Subscription', () => {
8559
counter++;
8660
}
8761
}
62+
63+
if (counter !== 3) {
64+
throw new Error('Did not receive all events');
65+
}
8866
});
8967

9068
test('should issue pings while connected', async () => {
@@ -318,6 +296,69 @@ event: complete
318296
`);
319297
});
320298

299+
test('erroring event stream should be handled (non GraphQL error; disabled error masking)', async () => {
300+
const schema = createSchema({
301+
typeDefs: /* GraphQL */ `
302+
type Subscription {
303+
hi: String!
304+
}
305+
type Query {
306+
hi: String!
307+
}
308+
`,
309+
resolvers: {
310+
Subscription: {
311+
hi: {
312+
async *subscribe() {
313+
yield { hi: 'hi' };
314+
throw new Error('hi');
315+
},
316+
},
317+
},
318+
},
319+
});
320+
321+
const logging = {
322+
debug: jest.fn(),
323+
info: jest.fn(),
324+
warn: jest.fn(),
325+
error: jest.fn(),
326+
};
327+
328+
const yoga = createYoga({ schema, logging, maskedErrors: false });
329+
const response = await yoga.fetch('http://yoga/graphql', {
330+
method: 'POST',
331+
headers: {
332+
'content-type': 'application/json',
333+
accept: 'text/event-stream',
334+
},
335+
body: JSON.stringify({
336+
query: /* GraphQL */ `
337+
subscription {
338+
hi
339+
}
340+
`,
341+
}),
342+
});
343+
const text = await response.text();
344+
345+
expect(text).toMatchInlineSnapshot(`
346+
":
347+
348+
event: next
349+
data: {"data":{"hi":"hi"}}
350+
351+
event: next
352+
data: {"errors":[{"message":"hi","locations":[{"line":2,"column":11}]}]}
353+
354+
event: complete
355+
356+
"
357+
`);
358+
// errors are only logged when error masking is enabled
359+
expect(logging.error).toBeCalledTimes(0);
360+
});
361+
321362
test('erroring event stream should be handled (GraphQL error)', async () => {
322363
const schema = createSchema({
323364
typeDefs: /* GraphQL */ `
@@ -382,6 +423,177 @@ event: complete
382423
});
383424
});
384425

426+
describe('subscription plugin hooks', () => {
427+
test('onNext and onEnd is invoked for event source', async () => {
428+
const source = (async function* foo() {
429+
yield { hi: 'hi' };
430+
yield { hi: 'hello' };
431+
})();
432+
433+
const schema = createSchema({
434+
typeDefs: /* GraphQL */ `
435+
type Subscription {
436+
hi: String!
437+
}
438+
type Query {
439+
hi: String!
440+
}
441+
`,
442+
resolvers: {
443+
Subscription: {
444+
hi: {
445+
subscribe: () => source,
446+
},
447+
},
448+
},
449+
});
450+
451+
const onNextCalls: unknown[] = [];
452+
let didInvokeOnEnd = false;
453+
454+
const plugin: Plugin = {
455+
onSubscribe() {
456+
return {
457+
onSubscribeResult() {
458+
return {
459+
onNext(ctx) {
460+
onNextCalls.push(ctx.result);
461+
},
462+
onEnd() {
463+
expect(onNextCalls).toHaveLength(2);
464+
didInvokeOnEnd = true;
465+
},
466+
};
467+
},
468+
};
469+
},
470+
};
471+
472+
const yoga = createYoga({ schema, plugins: [plugin] });
473+
474+
const response = await yoga.fetch('http://yoga/graphql', {
475+
method: 'POST',
476+
headers: {
477+
'content-type': 'application/json',
478+
accept: 'text/event-stream',
479+
},
480+
body: JSON.stringify({
481+
query: /* GraphQL */ `
482+
subscription {
483+
hi
484+
}
485+
`,
486+
}),
487+
});
488+
489+
let counter = 0;
490+
491+
for await (const chunk of eventStream(response.body!)) {
492+
if (counter === 0) {
493+
expect(chunk).toEqual({ data: { hi: 'hi' } });
494+
counter++;
495+
} else if (counter === 1) {
496+
expect(chunk).toEqual({ data: { hi: 'hello' } });
497+
counter++;
498+
}
499+
}
500+
501+
expect(counter).toBe(2);
502+
expect(onNextCalls).toEqual([{ data: { hi: 'hi' } }, { data: { hi: 'hello' } }]);
503+
expect(didInvokeOnEnd).toBe(true);
504+
});
505+
506+
test('onSubscribeError and onEnd is invoked if error is thrown from event source', async () => {
507+
const source = (async function* foo() {
508+
yield { hi: 'hi' };
509+
throw new GraphQLError('hi');
510+
})();
511+
512+
const schema = createSchema({
513+
typeDefs: /* GraphQL */ `
514+
type Subscription {
515+
hi: String!
516+
}
517+
type Query {
518+
hi: String!
519+
}
520+
`,
521+
resolvers: {
522+
Subscription: {
523+
hi: {
524+
subscribe: () => source,
525+
},
526+
},
527+
},
528+
});
529+
530+
let onNextCallCounter = 0;
531+
let didInvokeOnEnd = false;
532+
let didInvokeOnSubscribeError = false;
533+
534+
const plugin: Plugin = {
535+
onSubscribe() {
536+
return {
537+
onSubscribeError() {
538+
didInvokeOnSubscribeError = true;
539+
},
540+
onSubscribeResult() {
541+
return {
542+
onNext() {
543+
onNextCallCounter++;
544+
},
545+
onEnd() {
546+
expect(onNextCallCounter).toEqual(1);
547+
didInvokeOnEnd = true;
548+
},
549+
};
550+
},
551+
};
552+
},
553+
};
554+
555+
const maskErrorFn = jest.fn(maskError);
556+
const yoga = createYoga({
557+
schema,
558+
plugins: [plugin],
559+
maskedErrors: { maskError: maskErrorFn },
560+
});
561+
562+
const response = await yoga.fetch('http://yoga/graphql', {
563+
method: 'POST',
564+
headers: {
565+
'content-type': 'application/json',
566+
accept: 'text/event-stream',
567+
},
568+
body: JSON.stringify({
569+
query: /* GraphQL */ `
570+
subscription {
571+
hi
572+
}
573+
`,
574+
}),
575+
});
576+
577+
let counter = 0;
578+
579+
for await (const chunk of eventStream(response.body!)) {
580+
if (counter === 0) {
581+
expect(chunk).toEqual({ data: { hi: 'hi' } });
582+
counter++;
583+
} else if (counter === 1) {
584+
expect(chunk).toMatchObject({ errors: [{ message: 'hi' }] });
585+
counter++;
586+
}
587+
}
588+
589+
expect(counter).toBe(2);
590+
expect(onNextCallCounter).toEqual(1);
591+
expect(didInvokeOnEnd).toBe(true);
592+
expect(didInvokeOnSubscribeError).toBe(true);
593+
expect(maskErrorFn).toBeCalledTimes(1);
594+
});
595+
});
596+
385597
type Deferred<T = void> = {
386598
resolve: (value: T) => void;
387599
reject: (value: unknown) => void;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// eslint-disable-next-line import/no-extraneous-dependencies
2+
import { Repeater } from '@repeaterjs/repeater';
3+
4+
/** Parse SSE event stream and yield data pieces. */
5+
export function eventStream<TType = unknown>(source: ReadableStream<Uint8Array>) {
6+
return new Repeater<TType>(async (push, end) => {
7+
const cancel: Promise<{ done: true }> = end.then(() => ({ done: true }));
8+
const iterable = source[Symbol.asyncIterator]();
9+
10+
// eslint-disable-next-line no-constant-condition
11+
while (true) {
12+
const result = await Promise.race([cancel, iterable.next()]);
13+
14+
if (result.done) {
15+
break;
16+
}
17+
18+
const values = result.value.toString().split('\n').filter(Boolean);
19+
for (const value of values) {
20+
if (!value.startsWith('data: ')) {
21+
continue;
22+
}
23+
const result = value.replace('data: ', '');
24+
push(JSON.parse(result));
25+
}
26+
}
27+
28+
iterable.return?.();
29+
end();
30+
});
31+
}

‎packages/graphql-yoga/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@
5050
},
5151
"dependencies": {
5252
"@envelop/core": "^5.0.0",
53-
"@graphql-tools/executor": "^1.0.0",
53+
"@graphql-tools/executor": "^1.2.2",
5454
"@graphql-tools/schema": "^10.0.0",
55-
"@graphql-tools/utils": "^10.0.0",
55+
"@graphql-tools/utils": "^10.1.0",
5656
"@graphql-yoga/logger": "^2.0.0",
5757
"@graphql-yoga/subscription": "^5.0.0",
5858
"@whatwg-node/fetch": "^0.9.7",

‎packages/graphql-yoga/src/plugins/types.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export interface OnParamsEventPayload {
112112
params: GraphQLParams;
113113
request: Request;
114114
setParams: (params: GraphQLParams) => void;
115-
setResult: (result: ExecutionResult) => void;
115+
setResult: (result: ExecutionResult | AsyncIterable<ExecutionResult>) => void;
116116
fetchAPI: FetchAPI;
117117
}
118118

‎packages/graphql-yoga/src/server.ts

+19-1
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ import { ExecutionResult, parse, specifiedRules, validate } from 'graphql';
33
import {
44
envelop,
55
GetEnvelopedFn,
6+
isAsyncIterable,
67
PromiseOrValue,
78
useEngine,
89
useExtendContext,
910
useMaskedErrors,
1011
} from '@envelop/core';
1112
import { normalizedExecutor } from '@graphql-tools/executor';
13+
import { mapAsyncIterator } from '@graphql-tools/utils';
1214
import { createLogger, LogLevel, YogaLogger } from '@graphql-yoga/logger';
1315
import * as defaultFetchAPI from '@whatwg-node/fetch';
1416
import {
@@ -431,7 +433,7 @@ export class YogaServer<
431433
: [serverContext: TServerContext]
432434
) {
433435
try {
434-
let result: ExecutionResult | undefined;
436+
let result: ExecutionResult | AsyncIterable<ExecutionResult> | undefined;
435437

436438
for (const onParamsHook of this.onParamsHooks) {
437439
await onParamsHook({
@@ -475,6 +477,22 @@ export class YogaServer<
475477
this.logger.debug(`Processing GraphQL Parameters done.`);
476478
}
477479

480+
/** Ensure that error thrown from subscribe is sent to client */
481+
// TODO: this should probably be something people can customize via a hook?
482+
if (isAsyncIterable(result)) {
483+
const iterator = result[Symbol.asyncIterator]();
484+
result = mapAsyncIterator(
485+
iterator,
486+
v => v,
487+
(err: Error) => {
488+
const errors = handleError(err, this.maskedErrorsOpts, this.logger);
489+
return {
490+
errors,
491+
};
492+
},
493+
);
494+
}
495+
478496
return result;
479497
} catch (error) {
480498
const errors = handleError(error, this.maskedErrorsOpts, this.logger);

‎pnpm-lock.yaml

+68-38
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎website/src/pages/docs/features/defer-stream.mdx

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
---
22
description:
3-
Stream and defer are directives that allow you to improve latency for clients by sending the
4-
most important data as soon as it's ready.
3+
Stream and defer are directives that allow you to improve latency for clients by sending the most
4+
important data as soon as it's ready.
55
---
66

77
import { Callout } from '@theguild/components'
88

99
# Defer and Stream
1010

11-
Stream and defer are directives that allow you to improve latency for clients by sending the
12-
most important data as soon as it's ready.
11+
Stream and defer are directives that allow you to improve latency for clients by sending the most
12+
important data as soon as it's ready.
1313

1414
As applications grow, the GraphQL operation documents can get bigger. The server will only send the
1515
response back once all the data requested in the query is ready. However, not all requested data is

0 commit comments

Comments
 (0)
Please sign in to comment.