Skip to content

Commit

Permalink
implement retry/error handling on update
Browse files Browse the repository at this point in the history
  • Loading branch information
trevor-scheer committed Jul 25, 2023
1 parent 9f09eed commit 0cdceda
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 47 deletions.
263 changes: 246 additions & 17 deletions packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -972,16 +972,16 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: FetchError: request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): request to http://mock-router-url.com/ failed, reason: network request error",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
Expand Down Expand Up @@ -1052,26 +1052,244 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): Unexpected status code: 500",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: Error: Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): Unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: Unexpected status code: 500",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});

it('retries failed `next` requests', async () => {
const server = await startSubscriptionServer({ logger });

// Mock the initial check response from the router.
mockRouterCheckResponse();

// Start the subscription; this triggers the initial check request and
// starts the heartbeat interval. This simulates an incoming subscription
// request from the router.
const result = await server.executeOperation({
query: `#graphql
subscription {
count
}
`,
extensions: {
subscription: {
callback_url: 'http://mock-router-url.com',
subscription_id: '1234-cats',
verifier: 'my-verifier-token',
},
},
});

expect(result.http.status).toEqual(200);
expect(result.http.headers.get('subscription-protocol')).toEqual(
'callback',
);

// Mock the heartbeat response from the router. We'll trigger it once below
// after the subscription is initialized to make sure it works.
const firstHeartbeat = mockRouterHeartbeatResponse();
// Advance timers to trigger the heartbeat once. This consumes the one
// heartbeat mock from above.
jest.advanceTimersByTime(5000);
await firstHeartbeat;

// Next we'll trigger some subscription events. In advance, we'll mock the 2
// router responses.
const updates = Promise.all([
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 1 } }),
mockRouterNextResponse({ payload: { count: 2 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 2 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 2 }, responseCode: 500 }),
mockRouterNextResponse({ payload: { count: 2 } }),
]);

// Trigger a couple updates. These send `next` requests to the router.
logger.debug('TESTING: Triggering first update');
await server.executeOperation({
query: `#graphql
mutation {
addOne
}
`,
});

logger.debug('TESTING: Triggering second update');
await server.executeOperation({
query: `#graphql
mutation {
addOne
}
`,
});
await updates;

// When we shutdown the server, we'll stop listening for subscription
// updates, await unresolved requests, and send a `complete` request to the
// router for each active subscription.
const completeRequest = mockRouterCompleteResponse();

await server.stop();
await completeRequest;

// The heartbeat should be cleaned up at this point. There is no second
// heartbeat mock, so if it ticks again it'll throw an error.
jest.advanceTimersByTime(5000);

expect(logger.orderOfOperations).toMatchInlineSnapshot(`
[
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionCallback[1234-cats]: Sending \`check\` request to router",
"SubscriptionCallback[1234-cats]: \`check\` request successful",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]",
"TESTING: Triggering first update",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router with subscription update",
"TESTING: Triggering second update",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500",
"SubscriptionManager[1234-cats]: \`next\` request successful",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router with subscription update",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500",
"SubscriptionManager[1234-cats]: \`next\` request successful",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router",
"SubscriptionManager[1234-cats]: \`complete\` request successful",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});

it('terminates subscription after max retries `next` requests', async () => {
const server = await startSubscriptionServer({ logger });

// Mock the initial check response from the router.
mockRouterCheckResponse();

// Start the subscription; this triggers the initial check request and
// starts the heartbeat interval. This simulates an incoming subscription
// request from the router.
const result = await server.executeOperation({
query: `#graphql
subscription {
count
}
`,
extensions: {
subscription: {
callback_url: 'http://mock-router-url.com',
subscription_id: '1234-cats',
verifier: 'my-verifier-token',
},
},
});

expect(result.http.status).toEqual(200);
expect(result.http.headers.get('subscription-protocol')).toEqual(
'callback',
);

// Mock the heartbeat response from the router. We'll trigger it once below
// after the subscription is initialized to make sure it works.
const firstHeartbeat = mockRouterHeartbeatResponse();
// Advance timers to trigger the heartbeat once. This consumes the one
// heartbeat mock from above.
jest.advanceTimersByTime(5000);
await firstHeartbeat;

// TODO
const updates = Promise.all(
[...new Array(5)].map(() =>
mockRouterNextResponse({ payload: { count: 1 }, responseCode: 500 }),
),
);

// Trigger a couple updates. These send `next` requests to the router.
logger.debug('TESTING: Triggering first update');
await server.executeOperation({
query: `#graphql
mutation {
addOne
}
`,
});

// After 5 failures, the plugin will terminate the subscriptions and send
// a `complete` request to the router.
const completeRequest = mockRouterCompleteResponse({
errors: [
{ message: '`next` request failed, terminating subscription' },
],
});

await updates;
await completeRequest;
await server.stop();

// The heartbeat should be cleaned up at this point. There is no second
// heartbeat mock, so if it ticks again it'll throw an error.
jest.advanceTimersByTime(5000);

expect(logger.orderOfOperations).toMatchInlineSnapshot(`
[
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionCallback[1234-cats]: Sending \`check\` request to router",
"SubscriptionCallback[1234-cats]: \`check\` request successful",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat received response for IDs: [1234-cats]",
"SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]",
"TESTING: Triggering first update",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router with subscription update",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 4) due to error: \`next\` request failed with unexpected status code: 500",
"WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 5) due to error: \`next\` request failed with unexpected status code: 500",
"ERROR: SubscriptionManager[1234-cats]: \`next\` request failed, terminating subscription: Error: \`next\` request failed with unexpected status code: 500",
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors",
"SubscriptionManager[1234-cats]: \`complete\` request successful",
"SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]",
"SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
`);
});
});
});

Expand All @@ -1082,9 +1300,16 @@ async function startSubscriptionServer(
const pubsub = new PubSub();
const server = new ApolloServer({
plugins: [
ApolloServerPluginSubscriptionCallback(
opts?.logger ? { logger: opts.logger } : undefined,
),
ApolloServerPluginSubscriptionCallback({
// set some reasonable testing defaults
retry: {
retries: 5,
factor: 1,
maxTimeout: 100,
minTimeout: 100,
},
...(opts?.logger ? { logger: opts.logger } : undefined),
}),
],
typeDefs: `#graphql
type Query {
Expand Down Expand Up @@ -1224,12 +1449,14 @@ function mockRouterNextResponse(requestOpts: {
url?: string;
id?: string;
verifier?: string;
responseCode?: number;
}) {
const {
payload,
url = 'http://mock-router-url.com',
id = '1234-cats',
verifier = 'my-verifier-token',
responseCode = 200,
} = requestOpts;

return promisifyNock(
Expand All @@ -1242,7 +1469,7 @@ function mockRouterNextResponse(requestOpts: {
verifier,
payload: { data: payload },
})
.reply(200),
.reply(responseCode),
);
}

Expand Down Expand Up @@ -1283,7 +1510,9 @@ function orderOfOperationsLogger() {
this.orderOfOperations.push(msg);
},
info() {},
warn() {},
warn(msg: string) {
this.orderOfOperations.push(`WARN: ${msg}`);
},
error(msg: string) {
this.orderOfOperations.push(`ERROR: ${msg}`);
},
Expand Down

0 comments on commit 0cdceda

Please sign in to comment.