|
1 | 1 | import type { DurationInput } from "../Duration.js"
|
| 2 | +import * as Duration from "../Duration.js" |
2 | 3 | import * as Effect from "../Effect.js"
|
3 | 4 | import * as FiberRef from "../FiberRef.js"
|
4 | 5 | import { globalValue } from "../GlobalValue.js"
|
5 |
| -import type { RateLimiter } from "../RateLimiter.js" |
6 |
| -import type { Scope } from "../Scope.js" |
7 |
| -import * as SynchronizedRef from "../SynchronizedRef.js" |
| 6 | +import type * as RateLimiter from "../RateLimiter.js" |
| 7 | +import type * as Scope from "../Scope.js" |
8 | 8 |
|
9 | 9 | /** @internal */
|
10 |
| -const currentCost = globalValue( |
11 |
| - Symbol.for("effect/RateLimiter/currentCost"), |
12 |
| - () => FiberRef.unsafeMake(1) |
13 |
| -) |
| 10 | +export const make = ({ |
| 11 | + algorithm = "token-bucket", |
| 12 | + interval, |
| 13 | + limit |
| 14 | +}: RateLimiter.RateLimiter.Options): Effect.Effect< |
| 15 | + RateLimiter.RateLimiter, |
| 16 | + never, |
| 17 | + Scope.Scope |
| 18 | +> => { |
| 19 | + switch (algorithm) { |
| 20 | + case "fixed-window": { |
| 21 | + return fixedWindow(limit, interval) |
| 22 | + } |
| 23 | + case "token-bucket": { |
| 24 | + return tokenBucket(limit, interval) |
| 25 | + } |
| 26 | + } |
| 27 | +} |
14 | 28 |
|
15 |
| -/** @internal */ |
16 |
| -export const make = (limit: number, window: DurationInput): Effect.Effect< |
17 |
| - RateLimiter, |
| 29 | +const tokenBucket = (limit: number, window: DurationInput): Effect.Effect< |
| 30 | + RateLimiter.RateLimiter, |
18 | 31 | never,
|
19 |
| - Scope |
| 32 | + Scope.Scope |
20 | 33 | > =>
|
21 | 34 | Effect.gen(function*(_) {
|
22 |
| - const scope = yield* _(Effect.scope) |
| 35 | + const millisPerToken = Math.ceil(Duration.toMillis(window) / limit) |
23 | 36 | const semaphore = yield* _(Effect.makeSemaphore(limit))
|
24 |
| - const ref = yield* _(SynchronizedRef.make(false)) |
25 |
| - const reset = SynchronizedRef.updateEffect( |
26 |
| - ref, |
27 |
| - (running) => |
28 |
| - running ? Effect.succeed(true) : Effect.sleep(window).pipe( |
29 |
| - Effect.zipRight(SynchronizedRef.set(ref, false)), |
30 |
| - Effect.zipRight(semaphore.releaseAll), |
31 |
| - Effect.forkIn(scope), |
32 |
| - Effect.interruptible, |
33 |
| - Effect.as(true) |
34 |
| - ) |
| 37 | + const latch = yield* _(Effect.makeSemaphore(0)) |
| 38 | + const refill: Effect.Effect<void> = Effect.sleep(millisPerToken).pipe( |
| 39 | + Effect.zipRight(latch.releaseAll), |
| 40 | + Effect.zipRight(semaphore.release(1)), |
| 41 | + Effect.flatMap((free) => free === limit ? Effect.unit : refill) |
35 | 42 | )
|
| 43 | + yield* _( |
| 44 | + latch.take(1), |
| 45 | + Effect.zipRight(refill), |
| 46 | + Effect.forever, |
| 47 | + Effect.forkScoped, |
| 48 | + Effect.interruptible |
| 49 | + ) |
| 50 | + const take = Effect.uninterruptibleMask((restore) => |
| 51 | + Effect.flatMap( |
| 52 | + FiberRef.get(currentCost), |
| 53 | + (cost) => Effect.zipRight(restore(semaphore.take(cost)), latch.release(1)) |
| 54 | + ) |
| 55 | + ) |
| 56 | + return (effect) => Effect.zipRight(take, effect) |
| 57 | + }) |
36 | 58 |
|
37 |
| - const cost = FiberRef.get(currentCost).pipe(Effect.flatMap(semaphore.take)) |
38 |
| - const take = Effect.zipRight(cost, reset) |
39 |
| - |
| 59 | +const fixedWindow = (limit: number, window: DurationInput): Effect.Effect< |
| 60 | + RateLimiter.RateLimiter, |
| 61 | + never, |
| 62 | + Scope.Scope |
| 63 | +> => |
| 64 | + Effect.gen(function*(_) { |
| 65 | + const semaphore = yield* _(Effect.makeSemaphore(limit)) |
| 66 | + const latch = yield* _(Effect.makeSemaphore(0)) |
| 67 | + yield* _( |
| 68 | + latch.take(1), |
| 69 | + Effect.zipRight(Effect.sleep(window)), |
| 70 | + Effect.zipRight(latch.releaseAll), |
| 71 | + Effect.zipRight(semaphore.releaseAll), |
| 72 | + Effect.forever, |
| 73 | + Effect.forkScoped, |
| 74 | + Effect.interruptible |
| 75 | + ) |
| 76 | + const take = Effect.uninterruptibleMask((restore) => |
| 77 | + Effect.flatMap( |
| 78 | + FiberRef.get(currentCost), |
| 79 | + (cost) => Effect.zipRight(restore(semaphore.take(cost)), latch.release(1)) |
| 80 | + ) |
| 81 | + ) |
40 | 82 | return (effect) => Effect.zipRight(take, effect)
|
41 | 83 | })
|
42 | 84 |
|
| 85 | +/** @internal */ |
| 86 | +const currentCost = globalValue( |
| 87 | + Symbol.for("effect/RateLimiter/currentCost"), |
| 88 | + () => FiberRef.unsafeMake(1) |
| 89 | +) |
| 90 | + |
43 | 91 | /** @internal */
|
44 | 92 | export const withCost = (cost: number) => Effect.locally(currentCost, cost)
|
0 commit comments