Skip to content

Commit adf7d7a

Browse files
committedSep 15, 2024
add Mailbox module, a queue which can have done or failure signals (#3580)
1 parent 273565e commit adf7d7a

File tree

12 files changed

+985
-148
lines changed

12 files changed

+985
-148
lines changed
 

‎.changeset/new-dancers-ring.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@effect/platform": patch
3+
"@effect/rpc": patch
4+
---
5+
6+
use Mailbox for Workers, Socket & Rpc

‎.changeset/thick-dingos-melt.md

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
add Mailbox module, a queue which can have done or failure signals
6+
7+
```ts
8+
import { Effect, Mailbox } from "effect"
9+
import * as assert from "node:assert"
10+
11+
Effect.gen(function* () {
12+
const mailbox = yield* Mailbox.make<number, string>()
13+
14+
// add messages to the mailbox
15+
yield* mailbox.offer(1)
16+
yield* mailbox.offer(2)
17+
yield* mailbox.offerAll([3, 4, 5])
18+
19+
// take messages from the mailbox
20+
const [messages, done] = yield* mailbox.takeAll
21+
assert.deepStrictEqual(messages, [1, 2, 3, 4, 5])
22+
assert.strictEqual(done, false)
23+
24+
// signal that the mailbox is done
25+
yield* mailbox.end
26+
const [messages2, done2] = yield* mailbox.takeAll
27+
assert.deepStrictEqual(messages2, [])
28+
assert.strictEqual(done2, true)
29+
30+
// signal that the mailbox is failed
31+
yield* mailbox.fail("boom")
32+
})
33+
```

‎packages/effect/src/Mailbox.ts

+236
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/**
2+
* @since 3.8.0
3+
* @experimental
4+
*/
5+
import type { Cause, NoSuchElementException } from "./Cause.js"
6+
import type { Channel } from "./Channel.js"
7+
import type { Chunk } from "./Chunk.js"
8+
import type { Effect } from "./Effect.js"
9+
import type { Exit } from "./Exit.js"
10+
import type { Inspectable } from "./Inspectable.js"
11+
import * as internal from "./internal/mailbox.js"
12+
import type { Option } from "./Option.js"
13+
import { hasProperty } from "./Predicate.js"
14+
import type { Stream } from "./Stream.js"
15+
16+
/**
17+
* @since 3.8.0
18+
* @experimental
19+
* @category type ids
20+
*/
21+
export const TypeId: unique symbol = internal.TypeId
22+
23+
/**
24+
* @since 3.8.0
25+
* @experimental
26+
* @category type ids
27+
*/
28+
export type TypeId = typeof TypeId
29+
30+
/**
31+
* @since 3.8.0
32+
* @experimental
33+
* @category type ids
34+
*/
35+
export const ReadonlyTypeId: unique symbol = internal.ReadonlyTypeId
36+
37+
/**
38+
* @since 3.8.0
39+
* @experimental
40+
* @category type ids
41+
*/
42+
export type ReadonlyTypeId = typeof ReadonlyTypeId
43+
44+
/**
45+
* @since 3.8.0
46+
* @experimental
47+
* @category guards
48+
*/
49+
export const isMailbox = <A = unknown, E = unknown>(u: unknown): u is Mailbox<A, E> => hasProperty(u, TypeId)
50+
51+
/**
52+
* @since 3.8.0
53+
* @experimental
54+
* @category guards
55+
*/
56+
export const isReadonlyMailbox = <A = unknown, E = unknown>(u: unknown): u is ReadonlyMailbox<A, E> =>
57+
hasProperty(u, ReadonlyTypeId)
58+
59+
/**
60+
* A `Mailbox` is a queue that can be signaled to be done or failed.
61+
*
62+
* @since 3.8.0
63+
* @experimental
64+
* @category models
65+
*/
66+
export interface Mailbox<in out A, in out E = never> extends ReadonlyMailbox<A, E> {
67+
readonly [TypeId]: TypeId
68+
/**
69+
* Add a message to the mailbox. Returns `false` if the mailbox is done.
70+
*/
71+
readonly offer: (message: A) => Effect<boolean>
72+
/**
73+
* Add a message to the mailbox. Returns `false` if the mailbox is done.
74+
*/
75+
readonly unsafeOffer: (message: A) => boolean
76+
/**
77+
* Add multiple messages to the mailbox. Returns the remaining messages that
78+
* were not added.
79+
*/
80+
readonly offerAll: (messages: Iterable<A>) => Effect<Chunk<A>>
81+
/**
82+
* Add multiple messages to the mailbox. Returns the remaining messages that
83+
* were not added.
84+
*/
85+
readonly unsafeOfferAll: (messages: Iterable<A>) => Chunk<A>
86+
/**
87+
* Fail the mailbox with an error. If the mailbox is already done, `false` is
88+
* returned.
89+
*/
90+
readonly fail: (error: E) => Effect<boolean>
91+
/**
92+
* Fail the mailbox with a cause. If the mailbox is already done, `false` is
93+
* returned.
94+
*/
95+
readonly failCause: (cause: Cause<E>) => Effect<boolean>
96+
/**
97+
* Signal that the mailbox is complete. If the mailbox is already done, `false` is
98+
* returned.
99+
*/
100+
readonly end: Effect<boolean>
101+
/**
102+
* Signal that the mailbox is done. If the mailbox is already done, `false` is
103+
* returned.
104+
*/
105+
readonly done: (exit: Exit<void, E>) => Effect<boolean>
106+
/**
107+
* Signal that the mailbox is done. If the mailbox is already done, `false` is
108+
* returned.
109+
*/
110+
readonly unsafeDone: (exit: Exit<void, E>) => boolean
111+
/**
112+
* Shutdown the mailbox, canceling any pending operations.
113+
* If the mailbox is already done, `false` is returned.
114+
*/
115+
readonly shutdown: Effect<boolean>
116+
}
117+
118+
/**
119+
* A `ReadonlyMailbox` represents a mailbox that can only be read from.
120+
*
121+
* @since 3.8.0
122+
* @experimental
123+
* @category models
124+
*/
125+
export interface ReadonlyMailbox<out A, out E = never>
126+
extends Effect<readonly [messages: Chunk<A>, done: boolean], E>, Inspectable
127+
{
128+
readonly [ReadonlyTypeId]: ReadonlyTypeId
129+
/**
130+
* Take all messages from the mailbox, returning an empty Chunk if the mailbox
131+
* is empty or done.
132+
*/
133+
readonly clear: Effect<Chunk<A>, E>
134+
/**
135+
* Take all messages from the mailbox, or wait for messages to be available.
136+
*
137+
* If the mailbox is done, the `done` flag will be `true`. If the mailbox
138+
* fails, the Effect will fail with the error.
139+
*/
140+
readonly takeAll: Effect<readonly [messages: Chunk<A>, done: boolean], E>
141+
/**
142+
* Take a specified number of messages from the mailbox. It will only take
143+
* up to the capacity of the mailbox.
144+
*
145+
* If the mailbox is done, the `done` flag will be `true`. If the mailbox
146+
* fails, the Effect will fail with the error.
147+
*/
148+
readonly takeN: (n: number) => Effect<readonly [messages: Chunk<A>, done: boolean], E>
149+
/**
150+
* Take a single message from the mailbox, or wait for a message to be
151+
* available.
152+
*
153+
* If the mailbox is done, it will fail with `NoSuchElementException`. If the
154+
* mailbox fails, the Effect will fail with the error.
155+
*/
156+
readonly take: Effect<A, E | NoSuchElementException>
157+
/** Wait for the mailbox to be done. */
158+
readonly await: Effect<void, E>
159+
/**
160+
* Check the size of the mailbox.
161+
*
162+
* If the mailbox is complete, it will return `None`.
163+
*/
164+
readonly size: Effect<Option<number>>
165+
/**
166+
* Check the size of the mailbox.
167+
*
168+
* If the mailbox is complete, it will return `None`.
169+
*/
170+
readonly unsafeSize: () => Option<number>
171+
}
172+
173+
/**
174+
* A `Mailbox` is a queue that can be signaled to be done or failed.
175+
*
176+
* @since 3.8.0
177+
* @experimental
178+
* @category constructors
179+
* @example
180+
* import { Effect, Mailbox } from "effect"
181+
*
182+
* Effect.gen(function*() {
183+
* const mailbox = yield* Mailbox.make<number, string>()
184+
*
185+
* // add messages to the mailbox
186+
* yield* mailbox.offer(1)
187+
* yield* mailbox.offer(2)
188+
* yield* mailbox.offerAll([3, 4, 5])
189+
*
190+
* // take messages from the mailbox
191+
* const [messages, done] = yield* mailbox.takeAll
192+
* assert.deepStrictEqual(messages, [1, 2, 3, 4, 5])
193+
* assert.strictEqual(done, false)
194+
*
195+
* // signal that the mailbox is done
196+
* yield* mailbox.end
197+
* const [messages2, done2] = yield* mailbox.takeAll
198+
* assert.deepStrictEqual(messages2, [])
199+
* assert.strictEqual(done2, true)
200+
*
201+
* // signal that the mailbox has failed
202+
* yield* mailbox.fail("boom")
203+
* })
204+
*/
205+
export const make: <A, E = never>(capacity?: number | undefined) => Effect<Mailbox<A, E>> = internal.make
206+
207+
/**
208+
* Run an `Effect` into a `Mailbox`, where success ends the mailbox and failure
209+
* fails the mailbox.
210+
*
211+
* @since 3.8.0
212+
* @experimental
213+
* @category combinators
214+
*/
215+
export const into: {
216+
<A, E>(self: Mailbox<A, E>): <AX, EX extends E, RX>(effect: Effect<AX, EX, RX>) => Effect<boolean, never, RX>
217+
<AX, E, EX extends E, RX, A>(effect: Effect<AX, EX, RX>, self: Mailbox<A, E>): Effect<boolean, never, RX>
218+
} = internal.into
219+
220+
/**
221+
* Create a `Channel` from a `Mailbox`.
222+
*
223+
* @since 3.8.0
224+
* @experimental
225+
* @category conversions
226+
*/
227+
export const toChannel: <A, E>(self: ReadonlyMailbox<A, E>) => Channel<Chunk<A>, unknown, E> = internal.toChannel
228+
229+
/**
230+
* Create a `Stream` from a `Mailbox`.
231+
*
232+
* @since 3.8.0
233+
* @experimental
234+
* @category conversions
235+
*/
236+
export const toStream: <A, E>(self: ReadonlyMailbox<A, E>) => Stream<A, E> = internal.toStream

‎packages/effect/src/index.ts

+6
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,12 @@ export * as LogSpan from "./LogSpan.js"
409409
*/
410410
export * as Logger from "./Logger.js"
411411

412+
/**
413+
* @since 3.8.0
414+
* @experimental
415+
*/
416+
export * as Mailbox from "./Mailbox.js"
417+
412418
/**
413419
* @since 2.0.0
414420
*/

‎packages/effect/src/internal/core.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,7 @@ export const unsafeAsync = <A, E = never, R = never>(
516516
cancelerRef = register(resume)
517517
}
518518
effect.effect_instruction_i1 = blockingOn
519-
return cancelerRef !== undefined ?
520-
onInterrupt(effect, (_) => cancelerRef!) :
521-
effect
519+
return onInterrupt(effect, (_) => isEffect(cancelerRef) ? cancelerRef : void_)
522520
}
523521

524522
/* @internal */

0 commit comments

Comments
 (0)
Please sign in to comment.