Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement callback subscription protocol in new plugin ApolloServerPluginSubscriptionCallback #7617

Merged
merged 21 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
263 changes: 246 additions & 17 deletions packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -972,16 +972,16 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
Expand Down Expand Up @@ -1052,26 +1052,244 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: Unexpected status code: 500",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});

it('retries failed `next` requests', async () => {
const server = await startSubscriptionServer({ logger });

// Mock the initial check response from the router.
mockRouterCheckResponse();

// Start the subscription; this triggers the initial check request and
// starts the heartbeat interval. This simulates an incoming subscription
// request from the router.
const result = await server.executeOperation({
query: `#graphql
subscription {
count
}
`,
extensions: {
subscription: {
callback_url: 'http://mock-router-url.com',
subscription_id: '1234-cats',
verifier: 'my-verifier-token',
},
},
});

expect(result.http.status).toEqual(200);
expect(result.http.headers.get('subscription-protocol')).toEqual(
'callback',
);

// Mock the heartbeat response from the router. We'll trigger it once below
// after the subscription is initialized to make sure it works.
const firstHeartbeat = mockRouterHeartbeatResponse();
// Advance timers to trigger the heartbeat once. This consumes the one
// heartbeat mock from above.
jest.advanceTimersByTime(5000);
await firstHeartbeat;

// Next we'll trigger some subscription events. In advance, we'll mock the 2
// router responses.
const updates = Promise.all([
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 1 } }),
mockRouterNextResponse({ payload: { count: 2 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 2 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 2 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 2 } }),
]);

// Trigger a couple updates. These send `next` requests to the router.
logger.debug('TESTING: Triggering first update');
await server.executeOperation({
query: `#graphql
mutation {
addOne
}
`,
});

logger.debug('TESTING: Triggering second update');
await server.executeOperation({
query: `#graphql
mutation {
addOne
}
`,
});
await updates;

// When we shutdown the server, we'll stop listening for subscription
// updates, await unresolved requests, and send a `complete` request to the
// router for each active subscription.
const completeRequest = mockRouterCompleteResponse();

await server.stop();
await completeRequest;

// The heartbeat should be cleaned up at this point. There is no second
// heartbeat mock, so if it ticks again it'll throw an error.
jest.advanceTimersByTime(5000);

expect(logger.orderOfOperations).toMatchInlineSnapshot(`
[
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionCallback[1234-cats]: Sending \`check\` request to router",
"SubscriptionCallback[1234-cats]: \`check\` request successful",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]",
"TESTING: Triggering first update",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router with subscription update",
"TESTING: Triggering second update",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500",
"SubscriptionManager[1234-cats]: \`next\` request successful",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router with subscription update",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500",
"SubscriptionManager[1234-cats]: \`next\` request successful",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router",
"SubscriptionManager[1234-cats]: \`complete\` request successful",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});

it('terminates subscription after max retries `next` requests', async () => {
const server = await startSubscriptionServer({ logger });

// Mock the initial check response from the router.
mockRouterCheckResponse();

// Start the subscription; this triggers the initial check request and
// starts the heartbeat interval. This simulates an incoming subscription
// request from the router.
const result = await server.executeOperation({
query: `#graphql
subscription {
count
}
`,
extensions: {
subscription: {
callback_url: 'http://mock-router-url.com',
subscription_id: '1234-cats',
verifier: 'my-verifier-token',
},
},
});

expect(result.http.status).toEqual(200);
expect(result.http.headers.get('subscription-protocol')).toEqual(
'callback',
);

// Mock the heartbeat response from the router. We'll trigger it once below
// after the subscription is initialized to make sure it works.
const firstHeartbeat = mockRouterHeartbeatResponse();
// Advance timers to trigger the heartbeat once. This consumes the one
// heartbeat mock from above.
jest.advanceTimersByTime(5000);
await firstHeartbeat;

// 5 failures to hit the retry limit
const updates = Promise.all(
[...new Array(5)].map(() =>
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
),
);

// Trigger a couple updates. These send `next` requests to the router.
logger.debug('TESTING: Triggering first update');
await server.executeOperation({
query: `#graphql
mutation {
addOne
}
`,
});

// After 5 failures, the plugin will terminate the subscriptions and send
// a `complete` request to the router.
const completeRequest = mockRouterCompleteResponse({
errors: [
{ message: '`next` request failed, terminating subscription' },
],
});

await updates;
await completeRequest;
await server.stop();

// The heartbeat should be cleaned up at this point. There is no second
// heartbeat mock, so if it ticks again it'll throw an error.
jest.advanceTimersByTime(5000);

expect(logger.orderOfOperations).toMatchInlineSnapshot(`
[
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionCallback[1234-cats]: Sending \`check\` request to router",
"SubscriptionCallback[1234-cats]: \`check\` request successful",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]",
"TESTING: Triggering first update",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router with subscription update",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 4) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 5) due to error: \`next\` request failed with unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: \`next\` request failed, terminating subscription: Error: \`next\` request failed with unexpected status code: 500",
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors",
"SubscriptionManager[1234-cats]: \`complete\` request successful",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});
});
});

Expand All @@ -1082,9 +1300,16 @@ async function startSubscriptionServer(
const pubsub = new PubSub();
const server = new ApolloServer({
plugins: [
ApolloServerPluginSubscriptionCallback(
opts?.logger ? { logger: opts.logger } : undefined,
),
ApolloServerPluginSubscriptionCallback({
// set some reasonable testing defaults
retry: {
retries: 5,
factor: 1,
trevor-scheer marked this conversation as resolved.
Show resolved Hide resolved
maxTimeout: 100,
clenfest marked this conversation as resolved.
Show resolved Hide resolved
minTimeout: 100,
},
...(opts?.logger ? { logger: opts.logger } : undefined),
}),
],
typeDefs: `#graphql
type Query {
Expand Down Expand Up @@ -1224,12 +1449,14 @@ function mockRouterNextResponse(requestOpts: {
url?: string;
id?: string;
verifier?: string;
responseCode?: number;
}) {
const {
payload,
url = 'http://mock-router-url.com',
id = '1234-cats',
verifier = 'my-verifier-token',
responseCode = 200,
} = requestOpts;

return promisifyNock(
Expand All @@ -1242,7 +1469,7 @@ function mockRouterNextResponse(requestOpts: {
verifier,
payload: { data: payload },
})
.reply(200),
.reply(responseCode),
);
}

Expand Down Expand Up @@ -1283,7 +1510,9 @@ function orderOfOperationsLogger() {
this.orderOfOperations.push(msg);
},
info() {},
warn() {},
warn(msg: string) {
this.orderOfOperations.push(`WARN: ${msg}`);
},
error(msg: string) {
this.orderOfOperations.push(`ERROR: ${msg}`);
},
Expand Down