Skip to content

Commit f01e7db

Browse files
tim-smartTim Smart
and
Tim Smart
committedJul 10, 2024
add PubSub replay option (#3135)
Co-authored-by: Tim Smart <tim.smart@arisechurch.com>
1 parent ac71f37 commit f01e7db

File tree

4 files changed

+357
-63
lines changed

4 files changed

+357
-63
lines changed
 

‎.changeset/seven-ghosts-move.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
add `replay` option to PubSub constructors
6+
7+
This option adds a replay buffer in front of the given PubSub. The buffer will
8+
replay the last `n` messages to any new subscriber.
9+
10+
```ts
11+
Effect.gen(function*() {
12+
const messages = [1, 2, 3, 4, 5]
13+
const pubsub = yield* PubSub.bounded<number>({ capacity: 16, replay: 3 })
14+
yield* PubSub.publishAll(pubsub, messages)
15+
const sub = yield* PubSub.subscribe(pubsub)
16+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [3, 4, 5])
17+
}))
18+
```

‎packages/effect/src/PubSub.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ export interface PubSub<in out A> extends Queue.Enqueue<A>, Pipeable {
4646
* @since 2.0.0
4747
* @category constructors
4848
*/
49-
export const bounded: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>> = internal.bounded
49+
export const bounded: <A>(
50+
capacity: number | { readonly capacity: number; readonly replay?: number | undefined }
51+
) => Effect.Effect<PubSub<A>> = internal.bounded
5052

5153
/**
5254
* Creates a bounded `PubSub` with the dropping strategy. The `PubSub` will drop new
@@ -57,7 +59,9 @@ export const bounded: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>>
5759
* @since 2.0.0
5860
* @category constructors
5961
*/
60-
export const dropping: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>> = internal.dropping
62+
export const dropping: <A>(
63+
capacity: number | { readonly capacity: number; readonly replay?: number | undefined }
64+
) => Effect.Effect<PubSub<A>> = internal.dropping
6165

6266
/**
6367
* Creates a bounded `PubSub` with the sliding strategy. The `PubSub` will add new
@@ -68,15 +72,18 @@ export const dropping: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>
6872
* @since 2.0.0
6973
* @category constructors
7074
*/
71-
export const sliding: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>> = internal.sliding
75+
export const sliding: <A>(
76+
capacity: number | { readonly capacity: number; readonly replay?: number | undefined }
77+
) => Effect.Effect<PubSub<A>> = internal.sliding
7278

7379
/**
7480
* Creates an unbounded `PubSub`.
7581
*
7682
* @since 2.0.0
7783
* @category constructors
7884
*/
79-
export const unbounded: <A>() => Effect.Effect<PubSub<A>> = internal.unbounded
85+
export const unbounded: <A>(options?: { readonly replay?: number | undefined }) => Effect.Effect<PubSub<A>> =
86+
internal.unbounded
8087

8188
/**
8289
* Returns the number of elements the queue can hold.

‎packages/effect/src/internal/pubsub.ts

Lines changed: 249 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export interface AtomicPubSub<in out A> {
2828
publishAll(elements: Iterable<A>): Chunk.Chunk<A>
2929
slide(): void
3030
subscribe(): Subscription<A>
31+
replayWindow(): ReplayWindow<A>
3132
}
3233

3334
/** @internal */
@@ -73,32 +74,49 @@ const removeSubscribers = <A>(
7374
}
7475

7576
/** @internal */
76-
export const bounded = <A>(requestedCapacity: number): Effect.Effect<PubSub.PubSub<A>> =>
77-
pipe(
78-
core.sync(() => makeBoundedPubSub<A>(requestedCapacity)),
79-
core.flatMap((atomicPubSub) => makePubSub(atomicPubSub, new BackPressureStrategy()))
80-
)
77+
export const bounded = <A>(
78+
capacity: number | {
79+
readonly capacity: number
80+
readonly replay?: number | undefined
81+
}
82+
): Effect.Effect<PubSub.PubSub<A>> =>
83+
core.suspend(() => {
84+
const pubsub = makeBoundedPubSub<A>(capacity)
85+
return makePubSub(pubsub, new BackPressureStrategy())
86+
})
8187

8288
/** @internal */
83-
export const dropping = <A>(requestedCapacity: number): Effect.Effect<PubSub.PubSub<A>> =>
84-
pipe(
85-
core.sync(() => makeBoundedPubSub<A>(requestedCapacity)),
86-
core.flatMap((atomicPubSub) => makePubSub(atomicPubSub, new DroppingStrategy()))
87-
)
89+
export const dropping = <A>(
90+
capacity: number | {
91+
readonly capacity: number
92+
readonly replay?: number | undefined
93+
}
94+
): Effect.Effect<PubSub.PubSub<A>> =>
95+
core.suspend(() => {
96+
const pubsub = makeBoundedPubSub<A>(capacity)
97+
return makePubSub(pubsub, new DroppingStrategy())
98+
})
8899

89100
/** @internal */
90-
export const sliding = <A>(requestedCapacity: number): Effect.Effect<PubSub.PubSub<A>> =>
91-
pipe(
92-
core.sync(() => makeBoundedPubSub<A>(requestedCapacity)),
93-
core.flatMap((atomicPubSub) => makePubSub(atomicPubSub, new SlidingStrategy()))
94-
)
101+
export const sliding = <A>(
102+
capacity: number | {
103+
readonly capacity: number
104+
readonly replay?: number | undefined
105+
}
106+
): Effect.Effect<PubSub.PubSub<A>> =>
107+
core.suspend(() => {
108+
const pubsub = makeBoundedPubSub<A>(capacity)
109+
return makePubSub(pubsub, new SlidingStrategy())
110+
})
95111

96112
/** @internal */
97-
export const unbounded = <A>(): Effect.Effect<PubSub.PubSub<A>> =>
98-
pipe(
99-
core.sync(() => makeUnboundedPubSub<A>()),
100-
core.flatMap((atomicPubSub) => makePubSub(atomicPubSub, new DroppingStrategy()))
101-
)
113+
export const unbounded = <A>(options?: {
114+
readonly replay?: number | undefined
115+
}): Effect.Effect<PubSub.PubSub<A>> =>
116+
core.suspend(() => {
117+
const pubsub = makeUnboundedPubSub<A>(options)
118+
return makePubSub(pubsub, new DroppingStrategy())
119+
})
102120

103121
/** @internal */
104122
export const capacity = <A>(self: PubSub.PubSub<A>): number => self.capacity()
@@ -138,21 +156,28 @@ export const subscribe = <A>(self: PubSub.PubSub<A>): Effect.Effect<Queue.Dequeu
138156
self.subscribe
139157

140158
/** @internal */
141-
const makeBoundedPubSub = <A>(requestedCapacity: number): AtomicPubSub<A> => {
142-
ensureCapacity(requestedCapacity)
143-
if (requestedCapacity === 1) {
144-
return new BoundedPubSubSingle()
145-
} else if (nextPow2(requestedCapacity) === requestedCapacity) {
146-
return new BoundedPubSubPow2(requestedCapacity)
159+
const makeBoundedPubSub = <A>(
160+
capacity: number | {
161+
readonly capacity: number
162+
readonly replay?: number | undefined
163+
}
164+
): AtomicPubSub<A> => {
165+
const options = typeof capacity === "number" ? { capacity } : capacity
166+
ensureCapacity(options.capacity)
167+
const replayBuffer = options.replay && options.replay > 0 ? new ReplayBuffer<A>(Math.ceil(options.replay)) : undefined
168+
if (options.capacity === 1) {
169+
return new BoundedPubSubSingle(replayBuffer)
170+
} else if (nextPow2(options.capacity) === options.capacity) {
171+
return new BoundedPubSubPow2(options.capacity, replayBuffer)
147172
} else {
148-
return new BoundedPubSubArb(requestedCapacity)
173+
return new BoundedPubSubArb(options.capacity, replayBuffer)
149174
}
150175
}
151176

152177
/** @internal */
153-
const makeUnboundedPubSub = <A>(): AtomicPubSub<A> => {
154-
return new UnboundedPubSub()
155-
}
178+
const makeUnboundedPubSub = <A>(options?: {
179+
readonly replay?: number | undefined
180+
}): AtomicPubSub<A> => new UnboundedPubSub(options?.replay ? new ReplayBuffer(options.replay) : undefined)
156181

157182
/** @internal */
158183
const makeSubscription = <A>(
@@ -180,17 +205,17 @@ export const unsafeMakeSubscription = <A>(
180205
shutdownHook: Deferred.Deferred<void>,
181206
shutdownFlag: MutableRef.MutableRef<boolean>,
182207
strategy: PubSubStrategy<A>
183-
): Queue.Dequeue<A> => {
184-
return new SubscriptionImpl(
208+
): Queue.Dequeue<A> =>
209+
new SubscriptionImpl(
185210
pubsub,
186211
subscribers,
187212
subscription,
188213
pollers,
189214
shutdownHook,
190215
shutdownFlag,
191-
strategy
216+
strategy,
217+
pubsub.replayWindow()
192218
)
193-
}
194219

195220
/** @internal */
196221
class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
@@ -200,12 +225,13 @@ class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
200225
subscriberCount = 0
201226
subscribersIndex = 0
202227

203-
readonly capacity: number
228+
constructor(readonly capacity: number, readonly replayBuffer: ReplayBuffer<A> | undefined) {
229+
this.array = Array.from({ length: capacity })
230+
this.subscribers = Array.from({ length: capacity })
231+
}
204232

205-
constructor(requestedCapacity: number) {
206-
this.array = Array.from({ length: requestedCapacity })
207-
this.subscribers = Array.from({ length: requestedCapacity })
208-
this.capacity = requestedCapacity
233+
replayWindow(): ReplayWindow<A> {
234+
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
209235
}
210236

211237
isEmpty(): boolean {
@@ -230,11 +256,17 @@ class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
230256
this.subscribers[index] = this.subscriberCount
231257
this.publisherIndex += 1
232258
}
259+
if (this.replayBuffer) {
260+
this.replayBuffer.offer(value)
261+
}
233262
return true
234263
}
235264

236265
publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
237266
if (this.subscriberCount === 0) {
267+
if (this.replayBuffer) {
268+
this.replayBuffer.offerAll(elements)
269+
}
238270
return Chunk.empty()
239271
}
240272
const chunk = Chunk.fromIterable(elements)
@@ -253,6 +285,9 @@ class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
253285
this.array[index] = a
254286
this.subscribers[index] = this.subscriberCount
255287
this.publisherIndex += 1
288+
if (this.replayBuffer) {
289+
this.replayBuffer.offer(a)
290+
}
256291
}
257292
return Chunk.drop(chunk, iteratorIndex)
258293
}
@@ -264,6 +299,9 @@ class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
264299
this.subscribers[index] = 0
265300
this.subscribersIndex += 1
266301
}
302+
if (this.replayBuffer) {
303+
this.replayBuffer.slide()
304+
}
267305
}
268306

269307
subscribe(): Subscription<A> {
@@ -368,13 +406,14 @@ class BoundedPubSubPow2<in out A> implements AtomicPubSub<A> {
368406
subscriberCount = 0
369407
subscribersIndex = 0
370408

371-
readonly capacity: number
409+
constructor(readonly capacity: number, readonly replayBuffer: ReplayBuffer<A> | undefined) {
410+
this.array = Array.from({ length: capacity })
411+
this.mask = capacity - 1
412+
this.subscribers = Array.from({ length: capacity })
413+
}
372414

373-
constructor(requestedCapacity: number) {
374-
this.array = Array.from({ length: requestedCapacity })
375-
this.mask = requestedCapacity - 1
376-
this.subscribers = Array.from({ length: requestedCapacity })
377-
this.capacity = requestedCapacity
415+
replayWindow(): ReplayWindow<A> {
416+
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
378417
}
379418

380419
isEmpty(): boolean {
@@ -399,11 +438,17 @@ class BoundedPubSubPow2<in out A> implements AtomicPubSub<A> {
399438
this.subscribers[index] = this.subscriberCount
400439
this.publisherIndex += 1
401440
}
441+
if (this.replayBuffer) {
442+
this.replayBuffer.offer(value)
443+
}
402444
return true
403445
}
404446

405447
publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
406448
if (this.subscriberCount === 0) {
449+
if (this.replayBuffer) {
450+
this.replayBuffer.offerAll(elements)
451+
}
407452
return Chunk.empty()
408453
}
409454
const chunk = Chunk.fromIterable(elements)
@@ -422,6 +467,9 @@ class BoundedPubSubPow2<in out A> implements AtomicPubSub<A> {
422467
this.array[index] = elem
423468
this.subscribers[index] = this.subscriberCount
424469
this.publisherIndex += 1
470+
if (this.replayBuffer) {
471+
this.replayBuffer.offer(elem)
472+
}
425473
}
426474
return Chunk.drop(chunk, iteratorIndex)
427475
}
@@ -433,6 +481,9 @@ class BoundedPubSubPow2<in out A> implements AtomicPubSub<A> {
433481
this.subscribers[index] = 0
434482
this.subscribersIndex += 1
435483
}
484+
if (this.replayBuffer) {
485+
this.replayBuffer.slide()
486+
}
436487
}
437488

438489
subscribe(): Subscription<A> {
@@ -536,6 +587,11 @@ class BoundedPubSubSingle<in out A> implements AtomicPubSub<A> {
536587
value: A = AbsentValue as unknown as A
537588

538589
readonly capacity = 1
590+
constructor(readonly replayBuffer: ReplayBuffer<A> | undefined) {}
591+
592+
replayWindow(): ReplayWindow<A> {
593+
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
594+
}
539595

540596
pipe() {
541597
return pipeArguments(this, arguments)
@@ -562,11 +618,17 @@ class BoundedPubSubSingle<in out A> implements AtomicPubSub<A> {
562618
this.subscribers = this.subscriberCount
563619
this.publisherIndex += 1
564620
}
621+
if (this.replayBuffer) {
622+
this.replayBuffer.offer(value)
623+
}
565624
return true
566625
}
567626

568627
publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
569628
if (this.subscriberCount === 0) {
629+
if (this.replayBuffer) {
630+
this.replayBuffer.offerAll(elements)
631+
}
570632
return Chunk.empty()
571633
}
572634
const chunk = Chunk.fromIterable(elements)
@@ -585,6 +647,9 @@ class BoundedPubSubSingle<in out A> implements AtomicPubSub<A> {
585647
this.subscribers = 0
586648
this.value = AbsentValue as unknown as A
587649
}
650+
if (this.replayBuffer) {
651+
this.replayBuffer.slide()
652+
}
588653
}
589654

590655
subscribe(): Subscription<A> {
@@ -673,6 +738,11 @@ class UnboundedPubSub<in out A> implements AtomicPubSub<A> {
673738
subscribersIndex = 0
674739

675740
readonly capacity = Number.MAX_SAFE_INTEGER
741+
constructor(readonly replayBuffer: ReplayBuffer<A> | undefined) {}
742+
743+
replayWindow(): ReplayWindow<A> {
744+
return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
745+
}
676746

677747
isEmpty(): boolean {
678748
return this.publisherHead === this.publisherTail
@@ -697,6 +767,9 @@ class UnboundedPubSub<in out A> implements AtomicPubSub<A> {
697767
this.publisherTail = this.publisherTail.next
698768
this.publisherIndex += 1
699769
}
770+
if (this.replayBuffer) {
771+
this.replayBuffer.offer(value)
772+
}
700773
return true
701774
}
702775

@@ -705,6 +778,8 @@ class UnboundedPubSub<in out A> implements AtomicPubSub<A> {
705778
for (const a of elements) {
706779
this.publish(a)
707780
}
781+
} else if (this.replayBuffer) {
782+
this.replayBuffer.offerAll(elements)
708783
}
709784
return Chunk.empty()
710785
}
@@ -715,6 +790,9 @@ class UnboundedPubSub<in out A> implements AtomicPubSub<A> {
715790
this.publisherHead.value = AbsentValue
716791
this.subscribersIndex += 1
717792
}
793+
if (this.replayBuffer) {
794+
this.replayBuffer.slide()
795+
}
718796
}
719797

720798
subscribe(): Subscription<A> {
@@ -841,9 +919,9 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
841919
readonly pollers: MutableQueue.MutableQueue<Deferred.Deferred<A>>,
842920
readonly shutdownHook: Deferred.Deferred<void>,
843921
readonly shutdownFlag: MutableRef.MutableRef<boolean>,
844-
readonly strategy: PubSubStrategy<A>
845-
) {
846-
}
922+
readonly strategy: PubSubStrategy<A>,
923+
readonly replayWindow: ReplayWindow<A>
924+
) {}
847925

848926
pipe() {
849927
return pipeArguments(this, arguments)
@@ -861,19 +939,23 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
861939
return core.suspend(() =>
862940
MutableRef.get(this.shutdownFlag)
863941
? core.interrupt
864-
: core.succeed(this.subscription.size())
942+
: core.succeed(this.subscription.size() + this.replayWindow.remaining)
865943
)
866944
}
867945

868946
unsafeSize(): Option.Option<number> {
869947
if (MutableRef.get(this.shutdownFlag)) {
870948
return Option.none()
871949
}
872-
return Option.some(this.subscription.size())
950+
return Option.some(this.subscription.size() + this.replayWindow.remaining)
873951
}
874952

875953
get isFull(): Effect.Effect<boolean> {
876-
return core.map(this.size, (size) => size === this.capacity())
954+
return core.suspend(() =>
955+
MutableRef.get(this.shutdownFlag)
956+
? core.interrupt
957+
: core.succeed(this.subscription.size() === this.capacity())
958+
)
877959
}
878960

879961
get isEmpty(): Effect.Effect<boolean> {
@@ -915,6 +997,10 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
915997
if (MutableRef.get(this.shutdownFlag)) {
916998
return core.interrupt
917999
}
1000+
if (this.replayWindow.remaining > 0) {
1001+
const message = this.replayWindow.take()!
1002+
return core.succeed(message)
1003+
}
9181004
const message = MutableQueue.isEmpty(this.pollers)
9191005
? this.subscription.poll(MutableQueue.EmptyMutableQueue)
9201006
: MutableQueue.EmptyMutableQueue
@@ -950,6 +1036,9 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
9501036
? unsafePollAllSubscription(this.subscription)
9511037
: Chunk.empty()
9521038
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers)
1039+
if (this.replayWindow.remaining > 0) {
1040+
return core.succeed(Chunk.appendAll(this.replayWindow.takeAll(), as))
1041+
}
9531042
return core.succeed(as)
9541043
})
9551044
}
@@ -959,11 +1048,19 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
9591048
if (MutableRef.get(this.shutdownFlag)) {
9601049
return core.interrupt
9611050
}
1051+
let replay: Chunk.Chunk<A> | undefined = undefined
1052+
if (this.replayWindow.remaining >= max) {
1053+
const as = this.replayWindow.takeN(max)
1054+
return core.succeed(as)
1055+
} else if (this.replayWindow.remaining > 0) {
1056+
replay = this.replayWindow.takeAll()
1057+
max = max - replay.length
1058+
}
9621059
const as = MutableQueue.isEmpty(this.pollers)
9631060
? unsafePollN(this.subscription, max)
9641061
: Chunk.empty()
9651062
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers)
966-
return core.succeed(as)
1063+
return replay ? core.succeed(Chunk.appendAll(replay, as)) : core.succeed(as)
9671064
})
9681065
}
9691066

@@ -1019,8 +1116,7 @@ class PubSubImpl<in out A> implements PubSub.PubSub<A> {
10191116
readonly shutdownHook: Deferred.Deferred<void>,
10201117
readonly shutdownFlag: MutableRef.MutableRef<boolean>,
10211118
readonly strategy: PubSubStrategy<A>
1022-
) {
1023-
}
1119+
) {}
10241120

10251121
capacity(): number {
10261122
return this.pubsub.capacity
@@ -1075,7 +1171,7 @@ class PubSubImpl<in out A> implements PubSub.PubSub<A> {
10751171
return core.interrupt
10761172
}
10771173

1078-
if ((this.pubsub as AtomicPubSub<unknown>).publish(value)) {
1174+
if (this.pubsub.publish(value)) {
10791175
this.strategy.unsafeCompleteSubscribers(this.pubsub, this.subscribers)
10801176
return core.succeed(true)
10811177
}
@@ -1179,9 +1275,7 @@ export const unsafeMakePubSub = <A>(
11791275
shutdownHook: Deferred.Deferred<void>,
11801276
shutdownFlag: MutableRef.MutableRef<boolean>,
11811277
strategy: PubSubStrategy<A>
1182-
): PubSub.PubSub<A> => {
1183-
return new PubSubImpl(pubsub, subscribers, scope, shutdownHook, shutdownFlag, strategy)
1184-
}
1278+
): PubSub.PubSub<A> => new PubSubImpl(pubsub, subscribers, scope, shutdownHook, shutdownFlag, strategy)
11851279

11861280
/** @internal */
11871281
const ensureCapacity = (capacity: number): void => {
@@ -1562,3 +1656,100 @@ const unsafeStrategyCompleteSubscribers = <A>(
15621656
}
15631657
}
15641658
}
1659+
1660+
interface ReplayNode<A> {
1661+
value: A | AbsentValue
1662+
next: ReplayNode<A> | null
1663+
}
1664+
1665+
class ReplayBuffer<A> {
1666+
constructor(readonly capacity: number) {}
1667+
1668+
head: ReplayNode<A> = { value: AbsentValue, next: null }
1669+
tail: ReplayNode<A> = this.head
1670+
size = 0
1671+
index = 0
1672+
1673+
slide() {
1674+
this.index++
1675+
}
1676+
offer(a: A): void {
1677+
this.tail.value = a
1678+
this.tail.next = {
1679+
value: AbsentValue,
1680+
next: null
1681+
}
1682+
this.tail = this.tail.next
1683+
if (this.size === this.capacity) {
1684+
this.head = this.head.next!
1685+
} else {
1686+
this.size += 1
1687+
}
1688+
}
1689+
offerAll(as: Iterable<A>): void {
1690+
for (const a of as) {
1691+
this.offer(a)
1692+
}
1693+
}
1694+
}
1695+
1696+
interface ReplayWindow<A> {
1697+
take(): A | undefined
1698+
takeN(n: number): Chunk.Chunk<A>
1699+
takeAll(): Chunk.Chunk<A>
1700+
readonly remaining: number
1701+
}
1702+
1703+
class ReplayWindowImpl<A> implements ReplayWindow<A> {
1704+
head: ReplayNode<A>
1705+
index: number
1706+
remaining: number
1707+
constructor(readonly buffer: ReplayBuffer<A>) {
1708+
this.index = buffer.index
1709+
this.remaining = buffer.size
1710+
this.head = buffer.head
1711+
}
1712+
fastForward() {
1713+
while (this.index < this.buffer.index) {
1714+
this.head = this.head.next!
1715+
this.index++
1716+
}
1717+
}
1718+
take(): A | undefined {
1719+
if (this.remaining === 0) {
1720+
return undefined
1721+
} else if (this.index < this.buffer.index) {
1722+
this.fastForward()
1723+
}
1724+
this.remaining--
1725+
const value = this.head.value
1726+
this.head = this.head.next!
1727+
return value as A
1728+
}
1729+
takeN(n: number): Chunk.Chunk<A> {
1730+
if (this.remaining === 0) {
1731+
return Chunk.empty()
1732+
} else if (this.index < this.buffer.index) {
1733+
this.fastForward()
1734+
}
1735+
const len = Math.min(n, this.remaining)
1736+
const items = new Array(len)
1737+
for (let i = 0; i < len; i++) {
1738+
const value = this.head.value as A
1739+
this.head = this.head.next!
1740+
items[i] = value
1741+
}
1742+
this.remaining -= len
1743+
return Chunk.unsafeFromArray(items)
1744+
}
1745+
takeAll(): Chunk.Chunk<A> {
1746+
return this.takeN(this.remaining)
1747+
}
1748+
}
1749+
1750+
const emptyReplayWindow: ReplayWindow<never> = {
1751+
remaining: 0,
1752+
take: () => undefined,
1753+
takeN: () => Chunk.empty(),
1754+
takeAll: () => Chunk.empty()
1755+
}

‎packages/effect/test/PubSub.test.ts

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Option } from "effect"
1+
import { Chunk, Option } from "effect"
22
import * as Array from "effect/Array"
33
import * as Deferred from "effect/Deferred"
44
import * as Effect from "effect/Effect"
@@ -661,4 +661,82 @@ describe("PubSub", () => {
661661
yield* PubSub.publishAll(pubsub, [1, 2])
662662
assert.deepStrictEqual(pubsub.unsafeSize(), Option.some(0))
663663
}))
664+
665+
describe("replay", () => {
666+
it.scoped("unbounded", () =>
667+
Effect.gen(function*() {
668+
const messages = [1, 2, 3, 4, 5]
669+
const pubsub = yield* PubSub.unbounded<number>({ replay: 3 })
670+
yield* PubSub.publishAll(pubsub, messages)
671+
const sub = yield* PubSub.subscribe(pubsub)
672+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [3, 4, 5])
673+
}))
674+
675+
it.effect("unbounded takeUpTo", () => {
676+
const messages = [1, 2, 3, 4, 5]
677+
return PubSub.unbounded<number>({ replay: 3 }).pipe(
678+
Effect.flatMap((pubsub) =>
679+
Effect.scoped(
680+
Effect.gen(function*() {
681+
yield* PubSub.publishAll(pubsub, messages)
682+
683+
const dequeue1 = yield* PubSub.subscribe(pubsub)
684+
yield* PubSub.publish(pubsub, 6)
685+
const dequeue2 = yield* PubSub.subscribe(pubsub)
686+
687+
assert.strictEqual(yield* Queue.size(dequeue1), 4)
688+
assert.strictEqual(yield* Queue.size(dequeue2), 3)
689+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeUpTo(dequeue1, 2)), [3, 4])
690+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeUpTo(dequeue1, 2)), [5, 6])
691+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeUpTo(dequeue2, 3)), [4, 5, 6])
692+
})
693+
)
694+
)
695+
)
696+
})
697+
698+
it.scoped("dropping", () =>
699+
Effect.gen(function*() {
700+
const messages = [1, 2, 3, 4, 5]
701+
const pubsub = yield* PubSub.dropping<number>({ capacity: 2, replay: 3 })
702+
703+
yield* PubSub.publishAll(pubsub, messages)
704+
const sub = yield* PubSub.subscribe(pubsub)
705+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [3, 4, 5])
706+
yield* PubSub.publishAll(pubsub, [6, 7])
707+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [6, 7])
708+
709+
const sub2 = yield* PubSub.subscribe(pubsub)
710+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub2)), [5, 6, 7])
711+
712+
yield* PubSub.publishAll(pubsub, [8, 9, 10, 11])
713+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [8, 9])
714+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub2)), [8, 9])
715+
716+
const sub3 = yield* PubSub.subscribe(pubsub)
717+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub3)), [7, 8, 9])
718+
}))
719+
720+
it.scoped("sliding", () =>
721+
Effect.gen(function*() {
722+
const messages = [1, 2, 3, 4, 5]
723+
const pubsub = yield* PubSub.sliding<number>({ capacity: 4, replay: 3 })
724+
725+
yield* PubSub.publishAll(pubsub, messages)
726+
const sub = yield* PubSub.subscribe(pubsub)
727+
assert.deepStrictEqual(yield* Queue.take(sub), 3)
728+
yield* PubSub.publishAll(pubsub, [6, 7, 8, 9, 10])
729+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [5, 6, 7, 8, 9, 10])
730+
731+
const sub2 = yield* PubSub.subscribe(pubsub)
732+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub2)), [8, 9, 10])
733+
734+
yield* PubSub.publishAll(pubsub, [11, 12, 13, 14, 15, 16])
735+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [13, 14, 15, 16])
736+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub2)), [13, 14, 15, 16])
737+
738+
const sub3 = yield* PubSub.subscribe(pubsub)
739+
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub3)), [14, 15, 16])
740+
}))
741+
})
664742
})

0 commit comments

Comments
 (0)
Please sign in to comment.