Skip to content

Commit c4fa349

Browse files
authoredMar 19, 2025··
Add purge queue command (#8353)
* Add purge queue command * fixup! Add purge queue command
1 parent 31332ca commit c4fa349

File tree

5 files changed

+268
-0
lines changed

5 files changed

+268
-0
lines changed
 

‎.changeset/lemon-areas-tap.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"wrangler": minor
3+
---
4+
5+
Add new command to purge a Queue
6+
7+
This new command can be used to delete all existing messages in a Queue

‎packages/wrangler/src/__tests__/queues.test.ts

+180
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { http, HttpResponse } from "msw";
22
import { mockAccountId, mockApiToken } from "./helpers/mock-account-id";
33
import { mockConsoleMethods } from "./helpers/mock-console";
4+
import { mockPrompt } from "./helpers/mock-dialogs";
5+
import { useMockIsTTY } from "./helpers/mock-istty";
46
import { msw } from "./helpers/msw";
57
import { runInTempDir } from "./helpers/run-in-tmp";
68
import { runWrangler } from "./helpers/run-wrangler";
@@ -35,6 +37,7 @@ describe("wrangler", () => {
3537
wrangler queues consumer Configure Queue consumers
3638
wrangler queues pause-delivery <name> Pause message delivery for a Queue
3739
wrangler queues resume-delivery <name> Resume message delivery for a Queue
40+
wrangler queues purge <name> Purge messages from a Queue
3841
3942
GLOBAL FLAGS
4043
-c, --config Path to Wrangler configuration file [string]
@@ -2066,4 +2069,181 @@ describe("wrangler", () => {
20662069
`);
20672070
});
20682071
});
2072+
2073+
describe("purge", () => {
2074+
const { setIsTTY } = useMockIsTTY();
2075+
beforeEach(() => {
2076+
setIsTTY(false);
2077+
});
2078+
2079+
function mockPurgeRequest() {
2080+
const requests = { count: 0 };
2081+
2082+
msw.use(
2083+
http.post(
2084+
"*/accounts/:accountId/queues/:queueId/purge",
2085+
async ({ request }) => {
2086+
requests.count += 1;
2087+
2088+
const body = (await request.json()) as {
2089+
delete_messages_permanently: boolean;
2090+
};
2091+
expect(body.delete_messages_permanently).toEqual(true);
2092+
return HttpResponse.json({
2093+
success: true,
2094+
errors: [],
2095+
messages: [],
2096+
result: {
2097+
started_on: "01-01-2001",
2098+
complete: false,
2099+
},
2100+
});
2101+
},
2102+
{ once: true }
2103+
)
2104+
);
2105+
return requests;
2106+
}
2107+
function mockGetQueueRequest(queueName: string) {
2108+
const requests = { count: 0 };
2109+
msw.use(
2110+
http.get(
2111+
"*/accounts/:accountId/queues?*",
2112+
async () => {
2113+
requests.count += 1;
2114+
return HttpResponse.json({
2115+
success: true,
2116+
errors: [],
2117+
messages: [],
2118+
result: [
2119+
{
2120+
queue_name: queueName,
2121+
created_on: "",
2122+
producers: [],
2123+
consumers: [],
2124+
producers_total_count: 1,
2125+
consumers_total_count: 0,
2126+
modified_on: "",
2127+
queue_id: "queueId",
2128+
},
2129+
],
2130+
});
2131+
},
2132+
{ once: true }
2133+
)
2134+
);
2135+
return requests;
2136+
}
2137+
2138+
it("should show the correct help text", async () => {
2139+
await runWrangler("queues purge --help");
2140+
expect(std.err).toMatchInlineSnapshot(`""`);
2141+
expect(std.out).toMatchInlineSnapshot(`
2142+
"wrangler queues purge <name>
2143+
2144+
Purge messages from a Queue
2145+
2146+
POSITIONALS
2147+
name The name of the queue [string] [required]
2148+
2149+
GLOBAL FLAGS
2150+
-c, --config Path to Wrangler configuration file [string]
2151+
--cwd Run as if Wrangler was started in the specified directory instead of the current working directory [string]
2152+
-e, --env Environment to use for operations, and for selecting .env and .dev.vars files [string]
2153+
-h, --help Show help [boolean]
2154+
-v, --version Show version number [boolean]
2155+
2156+
OPTIONS
2157+
--force Skip the confirmation dialog and forcefully purge the Queue [boolean]"
2158+
`);
2159+
});
2160+
2161+
it("rejects a missing --force flag in non-interactive mode", async () => {
2162+
const getrequests = mockGetQueueRequest("testQueue");
2163+
const requests = mockPurgeRequest();
2164+
2165+
await expect(
2166+
runWrangler("queues purge testQueue")
2167+
).rejects.toThrowErrorMatchingInlineSnapshot(
2168+
`[Error: The --force flag is required to purge a Queue in non-interactive mode]`
2169+
);
2170+
2171+
expect(requests.count).toEqual(0);
2172+
expect(getrequests.count).toEqual(0);
2173+
2174+
expect(std.out).toMatchInlineSnapshot(`""`);
2175+
});
2176+
2177+
it("allows purge with the --force flag in non-interactive mode", async () => {
2178+
const getrequests = mockGetQueueRequest("testQueue");
2179+
const requests = mockPurgeRequest();
2180+
2181+
await runWrangler("queues purge testQueue --force");
2182+
2183+
expect(requests.count).toEqual(1);
2184+
expect(getrequests.count).toEqual(1);
2185+
2186+
expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
2187+
});
2188+
2189+
it("allows purge with the --force flag in non-interactive mode", async () => {
2190+
const getrequests = mockGetQueueRequest("testQueue");
2191+
const requests = mockPurgeRequest();
2192+
2193+
await runWrangler("queues purge testQueue --force");
2194+
2195+
expect(requests.count).toEqual(1);
2196+
expect(getrequests.count).toEqual(1);
2197+
2198+
expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
2199+
});
2200+
2201+
it("allows purge with the --force flag in interactive mode", async () => {
2202+
setIsTTY(true);
2203+
const getrequests = mockGetQueueRequest("testQueue");
2204+
const requests = mockPurgeRequest();
2205+
await runWrangler("queues purge testQueue --force");
2206+
2207+
expect(requests.count).toEqual(1);
2208+
expect(getrequests.count).toEqual(1);
2209+
2210+
expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
2211+
});
2212+
2213+
it("rejects invalid confirmation in interactive mode", async () => {
2214+
setIsTTY(true);
2215+
const getrequests = mockGetQueueRequest("testQueue");
2216+
const requests = mockPurgeRequest();
2217+
mockPrompt({
2218+
text: "This operation will permanently delete all the messages in Queue testQueue. Type testQueue to proceed.",
2219+
result: "wrong-name",
2220+
});
2221+
await expect(
2222+
runWrangler("queues purge testQueue")
2223+
).rejects.toThrowErrorMatchingInlineSnapshot(
2224+
`[Error: Incorrect queue name provided. Skipping purge operation]`
2225+
);
2226+
2227+
expect(requests.count).toEqual(0);
2228+
expect(getrequests.count).toEqual(0);
2229+
2230+
expect(std.out).toMatchInlineSnapshot(`""`);
2231+
});
2232+
2233+
it("allows purge with correct confirmation in interactive mode", async () => {
2234+
setIsTTY(true);
2235+
const getrequests = mockGetQueueRequest("testQueue");
2236+
const requests = mockPurgeRequest();
2237+
mockPrompt({
2238+
text: "This operation will permanently delete all the messages in Queue testQueue. Type testQueue to proceed.",
2239+
result: "testQueue",
2240+
});
2241+
await runWrangler("queues purge testQueue");
2242+
2243+
expect(requests.count).toEqual(1);
2244+
expect(getrequests.count).toEqual(1);
2245+
2246+
expect(std.out).toMatchInlineSnapshot(`"Purged Queue 'testQueue'"`);
2247+
});
2248+
});
20692249
});

‎packages/wrangler/src/queues/cli/commands/index.ts

+8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
options as pauseResumeOptions,
1010
resumeHandler,
1111
} from "./pause-resume";
12+
import { handler as purgeHandler, options as purgeOptions } from "./purge";
1213
import { handler as updateHandler, options as updateOptions } from "./update";
1314
import type { CommonYargsArgv } from "../../../yargs-types";
1415

@@ -65,5 +66,12 @@ export function queues(yargs: CommonYargsArgv) {
6566
resumeHandler
6667
);
6768

69+
yargs.command(
70+
"purge <name>",
71+
"Purge messages from a Queue",
72+
purgeOptions,
73+
purgeHandler
74+
);
75+
6876
yargs.fail(HandleUnauthorizedError);
6977
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { readConfig } from "../../../config";
2+
import { prompt } from "../../../dialogs";
3+
import { FatalError } from "../../../errors";
4+
import isInteractive from "../../../is-interactive";
5+
import { logger } from "../../../logger";
6+
import { purgeQueue } from "../../client";
7+
import type {
8+
CommonYargsArgv,
9+
StrictYargsOptionsToInterface,
10+
} from "../../../yargs-types";
11+
12+
export function options(yargs: CommonYargsArgv) {
13+
return yargs
14+
.positional("name", {
15+
type: "string",
16+
demandOption: true,
17+
description: "The name of the queue",
18+
})
19+
.option("force", {
20+
describe: "Skip the confirmation dialog and forcefully purge the Queue",
21+
type: "boolean",
22+
});
23+
}
24+
25+
export async function handler(
26+
args: StrictYargsOptionsToInterface<typeof options>
27+
) {
28+
const config = readConfig(args);
29+
30+
if (!args.force && !isInteractive()) {
31+
throw new FatalError(
32+
"The --force flag is required to purge a Queue in non-interactive mode"
33+
);
34+
}
35+
36+
if (!args.force && isInteractive()) {
37+
const result = await prompt(
38+
`This operation will permanently delete all the messages in Queue ${args.name}. Type ${args.name} to proceed.`
39+
);
40+
41+
if (result !== args.name) {
42+
throw new FatalError(
43+
"Incorrect queue name provided. Skipping purge operation"
44+
);
45+
}
46+
}
47+
await purgeQueue(config, args.name);
48+
49+
logger.log(`Purged Queue '${args.name}'`);
50+
}

‎packages/wrangler/src/queues/client.ts

+23
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ export interface ConsumerSettings {
8585
retry_delay?: number;
8686
}
8787

88+
export interface PurgeQueueBody {
89+
delete_messages_permanently: boolean;
90+
}
91+
92+
export interface PurgeQueueResponse {
93+
started_at: string;
94+
complete: boolean;
95+
}
96+
8897
const queuesUrl = (accountId: string, queueId?: string): string => {
8998
let url = `/accounts/${accountId}/queues`;
9099
if (queueId) {
@@ -406,3 +415,17 @@ export async function deleteWorkerConsumer(
406415
);
407416
return deleteConsumerById(config, queue.queue_id, targetConsumer.consumer_id);
408417
}
418+
419+
export async function purgeQueue(
420+
config: Config,
421+
queueName: string
422+
): Promise<void> {
423+
const accountId = await requireAuth(config);
424+
const queue = await getQueue(config, queueName);
425+
const purgeURL = `${queuesUrl(accountId, queue.queue_id)}/purge`;
426+
const body: PurgeQueueBody = { delete_messages_permanently: true };
427+
return fetchResult(purgeURL, {
428+
method: "POST",
429+
body: JSON.stringify(body),
430+
});
431+
}

0 commit comments

Comments
 (0)
Please sign in to comment.