Skip to content

Commit 8709856

Browse files
authoredApr 8, 2024··
add Readable & Subscribable modules (#2472)
1 parent 43c5d93 commit 8709856

File tree

13 files changed

+275
-23
lines changed

13 files changed

+275
-23
lines changed
 

‎.changeset/shy-bees-train.md

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
"@effect/experimental": patch
3+
"effect": patch
4+
---
5+
6+
add Subscribable trait / module
7+
8+
Subscribable represents a resource that has a current value and can be subscribed to for updates.
9+
10+
The following data types are subscribable:
11+
12+
- A `SubscriptionRef`
13+
- An `Actor` from the experimental `Machine` module

‎.changeset/thin-experts-check.md

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
---
2+
"@effect/experimental": patch
3+
"effect": patch
4+
---
5+
6+
add Readable module / trait
7+
8+
`Readable` is a common interface for objects that can be read from using a `get`
9+
Effect.
10+
11+
For example, `Ref`'s implement `Readable`:
12+
13+
```ts
14+
import { Effect, Readable, Ref } from "effect";
15+
import assert from "assert";
16+
17+
Effect.gen(function* (_) {
18+
const ref = yield* _(Ref.make(123));
19+
assert(Readable.isReadable(ref));
20+
21+
const result = yield* _(ref.get);
22+
assert(result === 123);
23+
});
24+
```

‎packages/effect/src/Readable.ts

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* @since 2.0.0
3+
*/
4+
import type { Effect } from "./Effect.js"
5+
import { dual } from "./Function.js"
6+
import * as core from "./internal/core.js"
7+
import { type Pipeable, pipeArguments } from "./Pipeable.js"
8+
import { hasProperty } from "./Predicate.js"
9+
10+
/**
11+
* @since 2.0.0
12+
* @category type ids
13+
*/
14+
export const TypeId = Symbol.for("effect/Readable")
15+
16+
/**
17+
* @since 2.0.0
18+
* @category type ids
19+
*/
20+
export type TypeId = typeof TypeId
21+
22+
/**
23+
* @since 2.0.0
24+
* @category models
25+
*/
26+
export interface Readable<A, E = never, R = never> extends Pipeable {
27+
readonly [TypeId]: TypeId
28+
readonly get: Effect<A, E, R>
29+
}
30+
31+
/**
32+
* @since 2.0.0
33+
* @category refinements
34+
*/
35+
export const isReadable = (u: unknown): u is Readable<unknown, unknown, unknown> => hasProperty(u, TypeId)
36+
37+
const Proto: Omit<Readable<any>, "get"> = {
38+
[TypeId]: TypeId,
39+
pipe() {
40+
return pipeArguments(this, arguments)
41+
}
42+
}
43+
44+
/**
45+
* @since 2.0.0
46+
* @category constructors
47+
*/
48+
export const make = <A, E, R>(get: Effect<A, E, R>): Readable<A, E, R> => {
49+
const self = Object.create(Proto)
50+
self.get = get
51+
return self
52+
}
53+
54+
/**
55+
* @since 2.0.0
56+
* @category combinators
57+
*/
58+
export const map: {
59+
<A, B>(f: (a: NoInfer<A>) => B): <E, R>(fa: Readable<A, E, R>) => Readable<B, E, R>
60+
<A, E, R, B>(self: Readable<A, E, R>, f: (a: NoInfer<A>) => B): Readable<B, E, R>
61+
} = dual(
62+
2,
63+
<A, E, R, B>(self: Readable<A, E, R>, f: (a: NoInfer<A>) => B): Readable<B, E, R> => make(core.map(self.get, f))
64+
)
65+
66+
/**
67+
* @since 2.0.0
68+
* @category combinators
69+
*/
70+
export const mapEffect: {
71+
<A, B, E2, R2>(
72+
f: (a: NoInfer<A>) => Effect<B, E2, R2>
73+
): <E, R>(fa: Readable<A, E, R>) => Readable<B, E | E2, R | R2>
74+
<A, E, R, B, E2, R2>(
75+
self: Readable<A, E, R>,
76+
f: (a: NoInfer<A>) => Effect<B, E2, R2>
77+
): Readable<B, E | E2, R | R2>
78+
} = dual(2, <A, E, R, B, E2, R2>(
79+
self: Readable<A, E, R>,
80+
f: (a: NoInfer<A>) => Effect<B, E2, R2>
81+
): Readable<B, E | E2, R | R2> => make(core.flatMap(self.get, f)))

‎packages/effect/src/Ref.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import type * as Effect from "./Effect.js"
55
import * as internal from "./internal/ref.js"
66
import type * as Option from "./Option.js"
7-
import type { Pipeable } from "./Pipeable.js"
7+
import type { Readable } from "./Readable.js"
88
import type * as Types from "./Types.js"
99

1010
/**
@@ -23,7 +23,7 @@ export type RefTypeId = typeof RefTypeId
2323
* @since 2.0.0
2424
* @category models
2525
*/
26-
export interface Ref<in out A> extends Ref.Variance<A>, Pipeable {
26+
export interface Ref<in out A> extends Ref.Variance<A>, Readable<A> {
2727
modify<B>(f: (a: A) => readonly [B, A]): Effect.Effect<B>
2828
}
2929

‎packages/effect/src/Subscribable.ts

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* @since 2.0.0
3+
*/
4+
import * as Effect from "./Effect.js"
5+
import { dual } from "./Function.js"
6+
import { pipeArguments } from "./Pipeable.js"
7+
import { hasProperty } from "./Predicate.js"
8+
import * as Readable from "./Readable.js"
9+
import * as Stream from "./Stream.js"
10+
11+
/**
12+
* @since 2.0.0
13+
* @category type ids
14+
*/
15+
export const TypeId = Symbol.for("effect/Subscribable")
16+
17+
/**
18+
* @since 2.0.0
19+
* @category type ids
20+
*/
21+
export type TypeId = typeof TypeId
22+
23+
/**
24+
* @since 2.0.0
25+
* @category models
26+
*/
27+
export interface Subscribable<A, E = never, R = never> extends Readable.Readable<A, E, R> {
28+
readonly [TypeId]: TypeId
29+
readonly changes: Stream.Stream<A, E, R>
30+
}
31+
32+
/**
33+
* @since 2.0.0
34+
* @category refinements
35+
*/
36+
export const isSubscribable = (u: unknown): u is Subscribable<unknown, unknown, unknown> => hasProperty(u, TypeId)
37+
38+
const Proto: Omit<Subscribable<any>, "get" | "changes"> = {
39+
[Readable.TypeId]: Readable.TypeId,
40+
[TypeId]: TypeId,
41+
pipe() {
42+
return pipeArguments(this, arguments)
43+
}
44+
}
45+
46+
/**
47+
* @since 2.0.0
48+
* @category constructors
49+
*/
50+
export const make = <A, E, R>(options: {
51+
readonly get: Effect.Effect<A, E, R>
52+
readonly changes: Stream.Stream<A, E, R>
53+
}): Subscribable<A, E, R> => Object.assign(Object.create(Proto), options)
54+
55+
/**
56+
* @since 2.0.0
57+
* @category combinators
58+
*/
59+
export const map: {
60+
<A, B>(f: (a: NoInfer<A>) => B): <E, R>(fa: Subscribable<A, E, R>) => Subscribable<B, E, R>
61+
<A, E, R, B>(self: Subscribable<A, E, R>, f: (a: NoInfer<A>) => B): Subscribable<B, E, R>
62+
} = dual(2, <A, E, R, B>(self: Subscribable<A, E, R>, f: (a: NoInfer<A>) => B): Subscribable<B, E, R> =>
63+
make({
64+
get: Effect.map(self.get, f),
65+
changes: Stream.map(self.changes, f)
66+
}))
67+
68+
/**
69+
* @since 2.0.0
70+
* @category combinators
71+
*/
72+
export const mapEffect: {
73+
<A, B, E2, R2>(
74+
f: (a: NoInfer<A>) => Effect.Effect<B, E2, R2>
75+
): <E, R>(fa: Subscribable<A, E, R>) => Subscribable<B, E | E2, R | R2>
76+
<A, E, R, B, E2, R2>(
77+
self: Subscribable<A, E, R>,
78+
f: (a: NoInfer<A>) => Effect.Effect<B, E2, R2>
79+
): Subscribable<B, E | E2, R | R2>
80+
} = dual(2, <A, E, R, B, E2, R2>(
81+
self: Subscribable<A, E, R>,
82+
f: (a: NoInfer<A>) => Effect.Effect<B, E2, R2>
83+
): Subscribable<B, E | E2, R | R2> =>
84+
make({
85+
get: Effect.flatMap(self.get, f),
86+
changes: Stream.mapEffect(self.changes, f)
87+
}))

‎packages/effect/src/SubscriptionRef.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
import type * as Effect from "./Effect.js"
55
import * as internal from "./internal/subscriptionRef.js"
66
import type * as Option from "./Option.js"
7-
import type { Pipeable } from "./Pipeable.js"
87
import type * as PubSub from "./PubSub.js"
98
import * as Ref from "./Ref.js"
109
import type * as Stream from "./Stream.js"
10+
import type { Subscribable } from "./Subscribable.js"
1111
import * as Synchronized from "./SynchronizedRef.js"
1212
import type * as Types from "./Types.js"
1313

@@ -31,7 +31,7 @@ export type SubscriptionRefTypeId = typeof SubscriptionRefTypeId
3131
* @category models
3232
*/
3333
export interface SubscriptionRef<in out A>
34-
extends SubscriptionRef.Variance<A>, Synchronized.SynchronizedRef<A>, Pipeable
34+
extends SubscriptionRef.Variance<A>, Synchronized.SynchronizedRef<A>, Subscribable<A>
3535
{
3636
/** @internal */
3737
readonly ref: Ref.Ref<A>

‎packages/effect/src/index.ts

+10
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,11 @@ export * as Random from "./Random.js"
570570
*/
571571
export * as RateLimiter from "./RateLimiter.js"
572572

573+
/**
574+
* @since 2.0.0
575+
*/
576+
export * as Readable from "./Readable.js"
577+
573578
/**
574579
* This module provides utility functions for working with arrays in TypeScript.
575580
*
@@ -740,6 +745,11 @@ export * as String from "./String.js"
740745
*/
741746
export * as Struct from "./Struct.js"
742747

748+
/**
749+
* @since 2.0.0
750+
*/
751+
export * as Subscribable from "./Subscribable.js"
752+
743753
/**
744754
* @since 2.0.0
745755
*/

‎packages/effect/src/internal/effect/circular.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import * as MutableHashMap from "../../MutableHashMap.js"
1515
import * as Option from "../../Option.js"
1616
import { pipeArguments } from "../../Pipeable.js"
1717
import * as Predicate from "../../Predicate.js"
18+
import * as Readable from "../../Readable.js"
1819
import type * as Ref from "../../Ref.js"
1920
import type * as Schedule from "../../Schedule.js"
2021
import { currentScheduler } from "../../Scheduler.js"
@@ -556,10 +557,15 @@ export const synchronizedVariance = {
556557
class SynchronizedImpl<in out A> implements Synchronized.SynchronizedRef<A> {
557558
readonly [SynchronizedTypeId] = synchronizedVariance
558559
readonly [internalRef.RefTypeId] = internalRef.refVariance
560+
readonly [Readable.TypeId]: Readable.TypeId
559561
constructor(
560562
readonly ref: Ref.Ref<A>,
561563
readonly withLock: <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
562-
) {}
564+
) {
565+
this[Readable.TypeId] = Readable.TypeId
566+
this.get = internalRef.get(this.ref)
567+
}
568+
readonly get: Effect.Effect<A>
563569
modify<B>(f: (a: A) => readonly [B, A]): Effect.Effect<B> {
564570
return this.modifyEffect((a) => core.succeed(f(a)))
565571
}

‎packages/effect/src/internal/ref.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { dual } from "../Function.js"
33
import * as MutableRef from "../MutableRef.js"
44
import * as Option from "../Option.js"
55
import { pipeArguments } from "../Pipeable.js"
6+
import * as Readable from "../Readable.js"
67
import type * as Ref from "../Ref.js"
78
import * as core from "./core.js"
89

@@ -17,7 +18,12 @@ export const refVariance = {
1718

1819
class RefImpl<in out A> implements Ref.Ref<A> {
1920
readonly [RefTypeId] = refVariance
20-
constructor(readonly ref: MutableRef.MutableRef<A>) {}
21+
readonly [Readable.TypeId]: Readable.TypeId
22+
constructor(readonly ref: MutableRef.MutableRef<A>) {
23+
this[Readable.TypeId] = Readable.TypeId
24+
this.get = core.sync(() => MutableRef.get(this.ref))
25+
}
26+
readonly get: Effect.Effect<A>
2127
modify<B>(f: (a: A) => readonly [B, A]): Effect.Effect<B> {
2228
return core.sync(() => {
2329
const current = MutableRef.get(this.ref)
@@ -40,7 +46,7 @@ export const unsafeMake = <A>(value: A): Ref.Ref<A> => new RefImpl(MutableRef.ma
4046
export const make = <A>(value: A): Effect.Effect<Ref.Ref<A>> => core.sync(() => unsafeMake(value))
4147

4248
/** @internal */
43-
export const get = <A>(self: Ref.Ref<A>) => self.modify((a) => [a, a])
49+
export const get = <A>(self: Ref.Ref<A>) => self.get
4450

4551
/** @internal */
4652
export const set = dual<

‎packages/effect/src/internal/subscriptionRef.ts

+8
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ import * as Effect from "../Effect.js"
22
import { dual, pipe } from "../Function.js"
33
import { pipeArguments } from "../Pipeable.js"
44
import * as PubSub from "../PubSub.js"
5+
import * as Readable from "../Readable.js"
56
import * as Ref from "../Ref.js"
67
import type { Stream } from "../Stream.js"
8+
import * as Subscribable from "../Subscribable.js"
79
import type * as SubscriptionRef from "../SubscriptionRef.js"
810
import * as Synchronized from "../SynchronizedRef.js"
911
import * as _circular from "./effect/circular.js"
@@ -25,6 +27,8 @@ const subscriptionRefVariance = {
2527

2628
/** @internal */
2729
class SubscriptionRefImpl<in out A> implements SubscriptionRef.SubscriptionRef<A> {
30+
readonly [Readable.TypeId]: Readable.TypeId
31+
readonly [Subscribable.TypeId]: Subscribable.TypeId
2832
readonly [Ref.RefTypeId] = _ref.refVariance
2933
readonly [Synchronized.SynchronizedRefTypeId] = _circular.synchronizedVariance
3034
readonly [SubscriptionRefTypeId] = subscriptionRefVariance
@@ -33,10 +37,14 @@ class SubscriptionRefImpl<in out A> implements SubscriptionRef.SubscriptionRef<A
3337
readonly pubsub: PubSub.PubSub<A>,
3438
readonly semaphore: Effect.Semaphore
3539
) {
40+
this[Readable.TypeId] = Readable.TypeId
41+
this[Subscribable.TypeId] = Subscribable.TypeId
42+
this.get = Ref.get(this.ref)
3643
}
3744
pipe() {
3845
return pipeArguments(this, arguments)
3946
}
47+
readonly get: Effect.Effect<A>
4048
get changes(): Stream<A> {
4149
return pipe(
4250
Ref.get(this.ref),

‎packages/effect/test/Ref.test.ts

+8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Readable } from "effect"
12
import * as it from "effect-test/utils/extend"
23
import * as Effect from "effect/Effect"
34
import * as Option from "effect/Option"
@@ -30,6 +31,13 @@ const isChanged = (self: State): boolean => self._tag === "Changed"
3031
const isClosed = (self: State): boolean => self._tag === "Closed"
3132

3233
describe("Ref", () => {
34+
it.effect("implements Readable", () =>
35+
Effect.gen(function*(_) {
36+
const ref = yield* _(Ref.make(123))
37+
assert.isTrue(Readable.isReadable(ref))
38+
assert.strictEqual(yield* _(ref.get), 123)
39+
}))
40+
3341
it.effect("get", () =>
3442
Effect.gen(function*($) {
3543
const result = yield* $(Ref.make(current), Effect.flatMap(Ref.get))

‎packages/experimental/src/Machine.ts

+18-9
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@ import { dual, identity, pipe } from "effect/Function"
1717
import { globalValue } from "effect/GlobalValue"
1818
import * as MutableHashMap from "effect/MutableHashMap"
1919
import * as Option from "effect/Option"
20-
import { type Pipeable, pipeArguments } from "effect/Pipeable"
20+
import type { Pipeable } from "effect/Pipeable"
21+
import { pipeArguments } from "effect/Pipeable"
2122
import * as PubSub from "effect/PubSub"
2223
import * as Queue from "effect/Queue"
24+
import * as Readable from "effect/Readable"
2325
import * as ReadonlyArray from "effect/ReadonlyArray"
2426
import type { Request } from "effect/Request"
2527
import type * as Schedule from "effect/Schedule"
2628
import type * as Scope from "effect/Scope"
2729
import * as Stream from "effect/Stream"
30+
import * as Subscribable from "effect/Subscribable"
2831
import * as Tracer from "effect/Tracer"
2932
import * as Procedure from "./Machine/Procedure.js"
3033
import type { ProcedureList } from "./Machine/ProcedureList.js"
@@ -287,19 +290,26 @@ export declare namespace Machine {
287290
* @since 1.0.0
288291
* @category models
289292
*/
290-
export interface Actor<M extends Machine.Any> {
293+
export interface Actor<M extends Machine.Any> extends Subscribable.Subscribable<Machine.State<M>> {
291294
readonly [ActorTypeId]: ActorTypeId
292295
readonly machine: M
293296
readonly input: Machine.Input<M>
294297
readonly send: <Req extends Machine.Public<M>>(request: Req) => Effect.Effect<
295298
Request.Success<Req>,
296299
Request.Error<Req>
297300
>
298-
readonly state: Effect.Effect<Machine.State<M>>
299-
readonly changes: Stream.Stream<Machine.State<M>>
300301
readonly join: Effect.Effect<never, Machine.InitError<M> | MachineDefect>
301302
}
302303

304+
const ActorProto = {
305+
[ActorTypeId]: ActorTypeId,
306+
[Readable.TypeId]: Readable.TypeId,
307+
[Subscribable.TypeId]: Subscribable.TypeId,
308+
pipe() {
309+
return pipeArguments(this, arguments)
310+
}
311+
}
312+
303313
/**
304314
* @since 1.0.0
305315
* @category models
@@ -810,19 +820,18 @@ export const boot = <
810820

811821
yield* _(Deferred.await(latch))
812822

813-
return identity<SerializableActor<M>>({
814-
[ActorTypeId]: ActorTypeId,
823+
return identity<SerializableActor<M>>(Object.assign(Object.create(ActorProto), {
815824
machine: self,
816825
input: input!,
817-
state: Effect.sync(() => currentState),
826+
get: Effect.sync(() => currentState),
818827
changes: Stream.concat(
819828
Stream.sync(() => currentState),
820829
Stream.fromPubSub(pubsub)
821830
),
822831
send: sendExternal,
823832
sendUnknown,
824833
join: Fiber.join(fiber)
825-
}) as any
834+
})) as any
826835
})
827836

828837
/**
@@ -852,7 +861,7 @@ export const snapshot = <
852861
): Effect.Effect<[input: unknown, state: unknown], ParseResult.ParseError, SR> =>
853862
Effect.zip(
854863
Schema.encode(self.machine.schemaInput)(self.input),
855-
Effect.flatMap(self.state, Schema.encode(self.machine.schemaState))
864+
Effect.flatMap(self.get, Schema.encode(self.machine.schemaState))
856865
)
857866

858867
/**

‎packages/experimental/test/Machine.test.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ describe("Machine", () => {
157157

158158
const booted = yield* _(Machine.boot(counter, 0))
159159
yield* _(Effect.sleep(10))
160-
assert.strictEqual(yield* _(booted.state), 0)
160+
assert.strictEqual(yield* _(booted.get), 0)
161161
assert.strictEqual(yield* _(booted.send(new Increment())), 1)
162162
assert.strictEqual(yield* _(booted.send(new Increment())), 2)
163163
assert.strictEqual(yield* _(booted.send(new IncrementBy({ number: 2 }))), 4)
@@ -171,7 +171,7 @@ describe("Machine", () => {
171171
test("init context", () =>
172172
Effect.gen(function*(_) {
173173
const booted = yield* _(Machine.boot(withContext, 20))
174-
assert.strictEqual(yield* _(booted.state), 20)
174+
assert.strictEqual(yield* _(booted.get), 20)
175175
assert.strictEqual(yield* _(booted.send(new Multiply())), 40)
176176
}).pipe(
177177
Effect.scoped,
@@ -182,16 +182,16 @@ describe("Machine", () => {
182182
test("forkWithState", () =>
183183
Effect.gen(function*(_) {
184184
const booted = yield* _(Machine.boot(delayedCounter, 2))
185-
assert.strictEqual(yield* _(booted.state), 2)
185+
assert.strictEqual(yield* _(booted.get), 2)
186186
assert.deepStrictEqual(
187187
// @ts-expect-error
188188
yield* _(booted.send(new IncrementBy({ number: 2 })), Effect.exit),
189189
Exit.die("Request IncrementBy marked as internal")
190190
)
191191
assert.strictEqual(yield* _(booted.send(new DelayedIncrementBy({ number: 2, delay: 10 }))), undefined)
192-
assert.strictEqual(yield* _(booted.state), 2)
192+
assert.strictEqual(yield* _(booted.get), 2)
193193
yield* _(Effect.sleep(10))
194-
assert.strictEqual(yield* _(booted.state), 4)
194+
assert.strictEqual(yield* _(booted.get), 4)
195195
}).pipe(Effect.scoped, Effect.runPromise))
196196

197197
test("changes", () =>
@@ -238,7 +238,7 @@ describe("SerializableMachine", () => {
238238
Effect.gen(function*(_) {
239239
const actor = yield* _(Machine.boot(counterSerializable, 10))
240240

241-
assert.strictEqual(yield* _(actor.state), 10)
241+
assert.strictEqual(yield* _(actor.get), 10)
242242
assert.strictEqual(yield* _(actor.send(new Increment())), 11)
243243
assert.strictEqual(yield* _(actor.send(new Increment())), 12)
244244
assert.deepStrictEqual(yield* _(actor.sendUnknown({ _tag: "Decrement" })), {
@@ -249,6 +249,6 @@ describe("SerializableMachine", () => {
249249
assert.deepStrictEqual(snapshot, [10, "11"])
250250

251251
const restored = yield* _(Machine.restore(counterSerializable, snapshot))
252-
assert.strictEqual(yield* _(restored.state), 11)
252+
assert.strictEqual(yield* _(restored.get), 11)
253253
}).pipe(Effect.scoped, Effect.runPromise))
254254
})

0 commit comments

Comments
 (0)
Please sign in to comment.