Skip to content

Commit 7c8ae1c

Browse files
cmackenzie1emily-shen
andauthoredJan 9, 2025··
feat: Use OAuth flow to generate R2 tokens for Pipelines (#7534)
* feat: Use OAuth flow to generate R2 tokens for Pipelines This commit changes the generateR2Tokens flow which will direct the user to the web browser to perform a OAuth flow to grant the Workers Pipelines client the ability to generate R2 tokens on behalf of the user. This will only run if the user does not provide the credentials as CLI parameters. Due to requiring user interactivity, and reliance on the callbacks, there is no easy way to support a "headless" mode for `wrangler pipelines create` (or `update`) unless the user provides the tokens as arguments. The same applies for testing this flow, which can only be done manually at this time. * fix: add forced delayed to allow r2 tokens time to sync Create odd-ducks-attack.md * Add docs around bespoke OAuth solution * fix: Wait for R2 token to sync After creating an R2 token, there is a slight delay before if can be used. Previously we would sleep for some amount of time, but this method is really sensitive to latency. Instead, use the S3 SDK and try using the token until we exhaust all attempts, or we finally succeed in using it. Each failure results in a constant backoff of 1 second. This commit does add the dependency `@aws-sdk/client-s3`. * fix pnpm-lock.yaml * fix: clear timeout if token retrieved successfully This uses the promise based version of `setTimeout` from NodeJS and registers the AbortController to handle cancellation signal. The http server `.close()` method is also registered to the abort controller for cleanup as `controller.abort()` is always called before returning the result. --------- Co-authored-by: emily-shen <69125074+emily-shen@users.noreply.github.com>
1 parent b8e5f63 commit 7c8ae1c

File tree

6 files changed

+1378
-140
lines changed

6 files changed

+1378
-140
lines changed
 

‎.changeset/odd-ducks-attack.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"wrangler": patch
3+
---
4+
5+
feat: Use OAuth flow to generate R2 tokens for Pipelines

‎packages/wrangler/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"type:tests": "tsc -p ./src/__tests__/tsconfig.json && tsc -p ./e2e/tsconfig.json"
7070
},
7171
"dependencies": {
72+
"@aws-sdk/client-s3": "^3.721.0",
7273
"@cloudflare/kv-asset-handler": "workspace:*",
7374
"@esbuild-plugins/node-globals-polyfill": "^0.2.3",
7475
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",

‎packages/wrangler/src/__tests__/pipelines.test.ts

+7-85
Original file line numberDiff line numberDiff line change
@@ -46,76 +46,7 @@ describe("pipelines", () => {
4646
endpoint: "https://0001.pipelines.cloudflarestorage.com",
4747
} satisfies Pipeline;
4848

49-
function mockCreateR2Token(bucket: string) {
50-
const requests = { count: 0 };
51-
msw.use(
52-
http.get(
53-
"*/accounts/:accountId/r2/buckets/:bucket",
54-
async ({ params }) => {
55-
expect(params.accountId).toEqual("some-account-id");
56-
expect(params.bucket).toEqual(bucket);
57-
requests.count++;
58-
return HttpResponse.json(
59-
{
60-
success: true,
61-
errors: [],
62-
messages: [],
63-
result: null,
64-
},
65-
{ status: 200 }
66-
);
67-
},
68-
{ once: true }
69-
),
70-
http.get(
71-
"*/user/tokens/permission_groups",
72-
async () => {
73-
requests.count++;
74-
return HttpResponse.json(
75-
{
76-
success: true,
77-
errors: [],
78-
messages: [],
79-
result: [
80-
{
81-
id: "2efd5506f9c8494dacb1fa10a3e7d5b6",
82-
name: "Workers R2 Storage Bucket Item Write",
83-
description:
84-
"Grants write access to Cloudflare R2 Bucket Scoped Storage",
85-
scopes: ["com.cloudflare.edge.r2.bucket"],
86-
},
87-
],
88-
},
89-
{ status: 200 }
90-
);
91-
},
92-
{ once: true }
93-
),
94-
http.post(
95-
"*/user/tokens",
96-
async () => {
97-
requests.count++;
98-
return HttpResponse.json(
99-
{
100-
success: true,
101-
errors: [],
102-
messages: [],
103-
result: {
104-
id: "service-token-id",
105-
name: "my-service-token",
106-
value: "my-secret-value",
107-
},
108-
},
109-
{ status: 200 }
110-
);
111-
},
112-
{ once: true }
113-
)
114-
);
115-
return requests;
116-
}
117-
118-
function mockCreeatR2TokenFailure(bucket: string) {
49+
function mockCreateR2TokenFailure(bucket: string) {
11950
const requests = { count: 0 };
12051
msw.use(
12152
http.get(
@@ -310,6 +241,7 @@ describe("pipelines", () => {
310241
);
311242
return requests;
312243
}
244+
313245
beforeAll(() => {
314246
__testSkipDelays();
315247
});
@@ -380,15 +312,6 @@ describe("pipelines", () => {
380312
`);
381313
});
382314

383-
it("should create a pipeline", async () => {
384-
const tokenReq = mockCreateR2Token("test-bucket");
385-
const requests = mockCreateRequest("my-pipeline");
386-
await runWrangler("pipelines create my-pipeline --r2 test-bucket");
387-
388-
expect(tokenReq.count).toEqual(3);
389-
expect(requests.count).toEqual(1);
390-
});
391-
392315
it("should create a pipeline with explicit credentials", async () => {
393316
const requests = mockCreateRequest("my-pipeline");
394317
await runWrangler(
@@ -398,7 +321,7 @@ describe("pipelines", () => {
398321
});
399322

400323
it("should fail a missing bucket", async () => {
401-
const requests = mockCreeatR2TokenFailure("bad-bucket");
324+
const requests = mockCreateR2TokenFailure("bad-bucket");
402325
await expect(
403326
runWrangler("pipelines create bad-pipeline --r2 bad-bucket")
404327
).rejects.toThrowError();
@@ -540,22 +463,21 @@ describe("pipelines", () => {
540463

541464
it("should update a pipeline with new bucket", async () => {
542465
const pipeline: Pipeline = samplePipeline;
543-
const tokenReq = mockCreateR2Token("new-bucket");
544466
mockShowRequest(pipeline.name, pipeline);
545467

546468
const update = JSON.parse(JSON.stringify(pipeline));
547469
update.destination.path.bucket = "new_bucket";
548470
update.destination.credentials = {
549471
endpoint: "https://some-account-id.r2.cloudflarestorage.com",
550472
access_key_id: "service-token-id",
551-
secret_access_key:
552-
"be22cbae9c1585c7b61a92fdb75afd10babd535fb9b317f90ac9a9ca896d02d7",
473+
secret_access_key: "my-secret-access-key",
553474
};
554475
const updateReq = mockUpdateRequest(update.name, update);
555476

556-
await runWrangler("pipelines update my-pipeline --r2 new-bucket");
477+
await runWrangler(
478+
"pipelines update my-pipeline --r2 new-bucket --access-key-id service-token-id --secret-access-key my-secret-access-key"
479+
);
557480

558-
expect(tokenReq.count).toEqual(3);
559481
expect(updateReq.count).toEqual(1);
560482
});
561483

‎packages/wrangler/src/pipelines/client.ts

+98-33
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
import assert from "node:assert";
12
import { createHash } from "node:crypto";
3+
import http from "node:http";
4+
import { setTimeout as setTimeoutPromise } from "node:timers/promises";
25
import { fetchResult } from "../cfetch";
6+
import { getCloudflareApiEnvironmentFromEnv } from "../environment-variables/misc-variables";
7+
import { UserError } from "../errors";
8+
import { logger } from "../logger";
9+
import openInBrowser from "../open-in-browser";
310
import type { R2BucketInfo } from "../r2/helpers";
411

512
// ensure this is in sync with:
@@ -96,44 +103,102 @@ export type PermissionGroup = {
96103
scopes: string[];
97104
};
98105

99-
// Generate a Service Token to write to R2 for a pipeline
106+
interface S3AccessKey {
107+
accessKeyId: string;
108+
secretAccessKey: string;
109+
}
110+
111+
/**
112+
* Generate an R2 service token for the given account ID, bucket name, and pipeline name.
113+
*
114+
* This function kicks off its own OAuth process using the Workers Pipelines OAuth client requesting the scope
115+
* `pipelines:setup`. Once the user confirms, our OAuth callback endpoint will validate the request, exchange the
116+
* authorization code and return a bucket-scoped R2 token.
117+
*
118+
* This OAuth flow is distinct from the one used in `wrangler login` to ensure these tokens are generated server-side
119+
* and that only the tokens of concern are returned to the user.
120+
* @param accountId
121+
* @param bucketName
122+
* @param pipelineName
123+
*/
100124
export async function generateR2ServiceToken(
101-
label: string,
102125
accountId: string,
103-
bucket: string
104-
): Promise<ServiceToken> {
105-
const res = await fetchResult<PermissionGroup[]>(
106-
`/user/tokens/permission_groups`,
107-
{
108-
method: "GET",
109-
}
110-
);
111-
const perm = res.find(
112-
(g) => g.name == "Workers R2 Storage Bucket Item Write"
113-
);
114-
if (!perm) {
115-
throw new Error("Missing R2 Permissions");
116-
}
126+
bucketName: string,
127+
pipelineName: string
128+
): Promise<S3AccessKey> {
129+
// TODO: Refactor into startHttpServerWithTimeout function and update `getOauthToken`
130+
const controller = new AbortController();
131+
const signal = controller.signal;
117132

118-
// generate specific bucket write token for pipeline
119-
const body = JSON.stringify({
120-
policies: [
121-
{
122-
effect: "allow",
123-
permission_groups: [{ id: perm.id }],
124-
resources: {
125-
[`com.cloudflare.edge.r2.bucket.${accountId}_default_${bucket}`]: "*",
126-
},
127-
},
128-
],
129-
name: label,
130-
});
133+
// Create timeout promise to prevent hanging forever
134+
const timeoutPromise = setTimeoutPromise(120000, "timeout", { signal });
131135

132-
return await fetchResult<ServiceToken>(`/user/tokens`, {
133-
method: "POST",
134-
headers: API_HEADERS,
135-
body,
136+
// Create server promise to handle the callback and register the cleanup handler on the controller
137+
const serverPromise = new Promise<S3AccessKey>((resolve, reject) => {
138+
const server = http.createServer(async (request, response) => {
139+
assert(request.url, "This request doesn't have a URL"); // This should never happen
140+
141+
if (request.method !== "GET") {
142+
response.writeHead(405);
143+
response.end("Method not allowed.");
144+
return;
145+
}
146+
147+
const { pathname, searchParams } = new URL(
148+
request.url,
149+
`http://${request.headers.host}`
150+
);
151+
152+
if (pathname !== "/") {
153+
response.writeHead(404);
154+
response.end("Not found.");
155+
return;
156+
}
157+
158+
// Retrieve values from the URL parameters
159+
const accessKeyId = searchParams.get("access-key-id");
160+
const secretAccessKey = searchParams.get("secret-access-key");
161+
162+
if (!accessKeyId || !secretAccessKey) {
163+
reject(new UserError("Missing required URL parameters"));
164+
return;
165+
}
166+
167+
resolve({ accessKeyId, secretAccessKey } as S3AccessKey);
168+
// Do a final redirect to "clear" the URL of the sensitive URL parameters that were returned.
169+
response.writeHead(307, {
170+
Location:
171+
"https://welcome.developers.workers.dev/wrangler-oauth-consent-granted",
172+
});
173+
response.end();
174+
});
175+
176+
// Register cleanup handler
177+
signal.addEventListener("abort", () => {
178+
server.close();
179+
});
180+
server.listen(8976, "localhost");
136181
});
182+
183+
const env = getCloudflareApiEnvironmentFromEnv();
184+
const oauthDomain =
185+
env === "staging"
186+
? "oauth.pipelines-staging.cloudflare.com"
187+
: "oauth.pipelines.cloudflare.com";
188+
189+
const urlToOpen = `https://${oauthDomain}/oauth/login?accountId=${accountId}&bucketName=${bucketName}&pipelineName=${pipelineName}`;
190+
logger.log(`Opening a link in your default browser: ${urlToOpen}`);
191+
await openInBrowser(urlToOpen);
192+
193+
const result = await Promise.race([timeoutPromise, serverPromise]);
194+
controller.abort();
195+
if (result === "timeout") {
196+
throw new UserError(
197+
"Timed out waiting for authorization code, please try again."
198+
);
199+
}
200+
201+
return result as S3AccessKey;
137202
}
138203

139204
// Get R2 bucket information from v4 API

‎packages/wrangler/src/pipelines/index.ts

+37-22
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1+
import { HeadBucketCommand, S3Client } from "@aws-sdk/client-s3";
12
import { readConfig } from "../config";
2-
import { sleep } from "../deploy/deploy";
33
import { FatalError, UserError } from "../errors";
44
import { printWranglerBanner } from "../index";
55
import { logger } from "../logger";
66
import * as metrics from "../metrics";
77
import { APIError } from "../parse";
88
import { requireAuth } from "../user";
9+
import { retryOnError } from "../utils/retry";
910
import {
1011
createPipeline,
1112
deletePipeline,
1213
generateR2ServiceToken,
1314
getPipeline,
1415
getR2Bucket,
1516
listPipelines,
16-
sha256,
1717
updatePipeline,
1818
} from "./client";
1919
import type { CommonYargsArgv, CommonYargsOptions } from "../yargs-types";
@@ -29,38 +29,53 @@ import type { Argv } from "yargs";
2929
let __testSkipDelaysFlag = false;
3030

3131
async function authorizeR2Bucket(
32-
name: string,
32+
pipelineName: string,
3333
accountId: string,
34-
bucket: string
34+
bucketName: string
3535
) {
3636
try {
37-
await getR2Bucket(accountId, bucket);
37+
await getR2Bucket(accountId, bucketName);
3838
} catch (err) {
3939
if (err instanceof APIError) {
4040
if (err.code == 10006) {
41-
throw new FatalError(`The R2 bucket [${bucket}] doesn't exist`);
41+
throw new FatalError(`The R2 bucket [${bucketName}] doesn't exist`);
4242
}
4343
}
4444
throw err;
4545
}
4646

47-
logger.log(`🌀 Authorizing R2 bucket "${bucket}"`);
47+
logger.log(`🌀 Authorizing R2 bucket "${bucketName}"`);
4848

4949
const serviceToken = await generateR2ServiceToken(
50-
`Service token for Pipeline ${name}`,
5150
accountId,
52-
bucket
51+
bucketName,
52+
pipelineName
5353
);
54-
const access_key_id = serviceToken.id;
55-
const secret_access_key = sha256(serviceToken.value);
5654

57-
// wait for token to settle/propagate
58-
!__testSkipDelaysFlag && (await sleep(3000));
55+
const r2 = new S3Client({
56+
region: "auto",
57+
credentials: {
58+
accessKeyId: serviceToken.accessKeyId,
59+
secretAccessKey: serviceToken.secretAccessKey,
60+
},
61+
endpoint: getAccountR2Endpoint(accountId),
62+
});
63+
64+
// Wait for token to settle/propagate, retry up to 10 times, with 1s waits in-between errors
65+
!__testSkipDelaysFlag &&
66+
(await retryOnError(
67+
async () => {
68+
await r2.send(
69+
new HeadBucketCommand({
70+
Bucket: bucketName,
71+
})
72+
);
73+
},
74+
1000,
75+
10
76+
));
5977

60-
return {
61-
secret_access_key,
62-
access_key_id,
63-
};
78+
return serviceToken;
6479
}
6580

6681
function getAccountR2Endpoint(accountId: string) {
@@ -242,8 +257,8 @@ export function pipelines(pipelineYargs: CommonYargsArgv) {
242257
accountId,
243258
pipelineConfig.destination.path.bucket
244259
);
245-
destination.credentials.access_key_id = auth.access_key_id;
246-
destination.credentials.secret_access_key = auth.secret_access_key;
260+
destination.credentials.access_key_id = auth.accessKeyId;
261+
destination.credentials.secret_access_key = auth.secretAccessKey;
247262
}
248263

249264
if (!destination.credentials.access_key_id) {
@@ -418,8 +433,8 @@ export function pipelines(pipelineYargs: CommonYargsArgv) {
418433
accountId,
419434
destination.path.bucket
420435
);
421-
destination.credentials.access_key_id = auth.access_key_id;
422-
destination.credentials.secret_access_key = auth.secret_access_key;
436+
destination.credentials.access_key_id = auth.accessKeyId;
437+
destination.credentials.secret_access_key = auth.secretAccessKey;
423438
}
424439
if (!destination.credentials.access_key_id) {
425440
throw new FatalError("Requires a r2 access key id");
@@ -466,7 +481,7 @@ export function pipelines(pipelineYargs: CommonYargsArgv) {
466481
args.authentication !== undefined
467482
? // if auth specified, use it
468483
args.authentication
469-
: // if auth not specified, use previos value or default(false)
484+
: // if auth not specified, use previous value or default(false)
470485
source?.authentication,
471486
} satisfies HttpSource);
472487
}

‎pnpm-lock.yaml

+1,230
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)
Please sign in to comment.