Skip to content

Commit 569a801

Browse files
KhraksMamtsovmaksim.khramtsov
authored andcommittedSep 15, 2024
Dequeue<A> and Queue<A> is subtype of Effect<A> (#3591)
Co-authored-by: maksim.khramtsov <maksim.khramtsov@btsdigital.kz>
1 parent f1b5b3c commit 569a801

File tree

8 files changed

+114
-7
lines changed

8 files changed

+114
-7
lines changed
 

‎.changeset/proud-cats-shake.md

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
`Dequeue<A>` and `Queue<A>` is subtype of `Effect<A>`. This means that now it can be used as an `Effect`, and when called, it will automatically extract and return an item from the queue, without having to explicitly use the `Queue.take` function.
6+
7+
```ts
8+
Effect.gen(function* () {
9+
const queue = yield* Queue.unbounded<number>()
10+
yield* Queue.offer(queue, 1)
11+
yield* Queue.offer(queue, 2)
12+
const oldWay = yield* Queue.take(queue)
13+
const newWay = yield* queue
14+
})
15+
```

‎packages/effect/dtslint/Unify.ts

+17-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type * as Fiber from "effect/Fiber"
66
import type * as FiberRef from "effect/FiberRef"
77
import type * as Micro from "effect/Micro"
88
import type * as Option from "effect/Option"
9+
import type * as Queue from "effect/Queue"
910
import type * as RcRef from "effect/RcRef"
1011
import type * as Ref from "effect/Ref"
1112
import type * as Stream from "effect/Stream"
@@ -90,7 +91,18 @@ export type RuntimeFiberUnify = Unify.Unify<
9091
| Fiber.RuntimeFiber<"a", "b">
9192
>
9293

93-
// $ExpectType 0 | Option<string | number> | Ref<1> | SynchronizedRef<1> | SubscriptionRef<1> | Deferred<1, 2> | Deferred<"a", "b"> | Fiber<"a" | 1, "b" | 2> | RuntimeFiber<"a" | 1, "b" | 2> | Ref<"A"> | SynchronizedRef<"A"> | SubscriptionRef<"A"> | FiberRef<12> | FiberRef<"a2"> | Either<1 | "A", 0 | "E"> | Effect<1 | "A", 0 | "E", "R" | "R1"> | RcRef<1 | "A", 0 | "E">
94+
// $ExpectType Queue<1> | Queue<"a">
95+
export type QueueUnify = Unify.Unify<
96+
| Queue.Queue<1>
97+
| Queue.Queue<"a">
98+
>
99+
// $ExpectType Dequeue<"a" | 1>
100+
export type DequeueUnify = Unify.Unify<
101+
| Queue.Dequeue<1>
102+
| Queue.Dequeue<"a">
103+
>
104+
105+
// $ExpectType 0 | Option<string | number> | Ref<1> | SynchronizedRef<1> | SubscriptionRef<1> | Deferred<1, 2> | Deferred<"a", "b"> | Fiber<"a" | 1, "b" | 2> | RuntimeFiber<"a" | 1, "b" | 2> | Queue<1> | Queue<"a"> | Dequeue<"a" | 1> | Ref<"A"> | SynchronizedRef<"A"> | SubscriptionRef<"A"> | FiberRef<12> | FiberRef<"a2"> | Either<1 | "A", 0 | "E"> | Effect<1 | "A", 0 | "E", "R" | "R1"> | RcRef<1 | "A", 0 | "E">
94106
export type AllUnify = Unify.Unify<
95107
| Either.Either<1, 0>
96108
| Either.Either<"A", "E">
@@ -114,5 +126,9 @@ export type AllUnify = Unify.Unify<
114126
| Fiber.Fiber<"a", "b">
115127
| Fiber.RuntimeFiber<1, 2>
116128
| Fiber.RuntimeFiber<"a", "b">
129+
| Queue.Queue<1>
130+
| Queue.Queue<"a">
131+
| Queue.Dequeue<1>
132+
| Queue.Dequeue<"a">
117133
| 0
118134
>

‎packages/effect/src/Queue.ts

+42-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import type * as MutableRef from "./MutableRef.js"
1010
import type * as Option from "./Option.js"
1111
import type { Pipeable } from "./Pipeable.js"
1212
import type * as Types from "./Types.js"
13+
import type * as Unify from "./Unify.js"
1314

1415
/**
1516
* @since 2.0.0
@@ -74,6 +75,26 @@ export interface Queue<in out A> extends Enqueue<A>, Dequeue<A>, Pipeable {
7475
readonly shutdownFlag: MutableRef.MutableRef<boolean>
7576
/** @internal */
7677
readonly strategy: Strategy<A>
78+
79+
readonly [Unify.typeSymbol]?: unknown
80+
readonly [Unify.unifySymbol]?: QueueUnify<this>
81+
readonly [Unify.ignoreSymbol]?: QueueUnifyIgnore
82+
}
83+
84+
/**
85+
* @category models
86+
* @since 3.8.0
87+
*/
88+
export interface QueueUnify<A extends { [Unify.typeSymbol]?: any }> extends DequeueUnify<A> {
89+
Queue?: () => Extract<A[Unify.typeSymbol], Queue<any>>
90+
}
91+
92+
/**
93+
* @category models
94+
* @since 3.8.0
95+
*/
96+
export interface QueueUnifyIgnore extends DequeueUnifyIgnore {
97+
Dequeue?: true
7798
}
7899

79100
/**
@@ -113,7 +134,7 @@ export interface Enqueue<in A> extends Queue.EnqueueVariance<A>, BaseQueue, Pipe
113134
* @since 2.0.0
114135
* @category models
115136
*/
116-
export interface Dequeue<out A> extends Queue.DequeueVariance<A>, BaseQueue, Pipeable {
137+
export interface Dequeue<out A> extends Effect.Effect<A>, Queue.DequeueVariance<A>, BaseQueue, Pipeable {
117138
/**
118139
* Takes the oldest value in the queue. If the queue is empty, this will return
119140
* a computation that resumes when an item has been added to the queue.
@@ -137,6 +158,26 @@ export interface Dequeue<out A> extends Queue.DequeueVariance<A>, BaseQueue, Pip
137158
* suspends until at least the minimum number of elements have been collected.
138159
*/
139160
takeBetween(min: number, max: number): Effect.Effect<Chunk.Chunk<A>>
161+
162+
readonly [Unify.typeSymbol]?: unknown
163+
readonly [Unify.unifySymbol]?: DequeueUnify<this>
164+
readonly [Unify.ignoreSymbol]?: DequeueUnifyIgnore
165+
}
166+
167+
/**
168+
* @category models
169+
* @since 3.8.0
170+
*/
171+
export interface DequeueUnify<A extends { [Unify.typeSymbol]?: any }> extends Effect.EffectUnify<A> {
172+
Dequeue?: () => A[Unify.typeSymbol] extends Dequeue<infer A0> | infer _ ? Dequeue<A0> : never
173+
}
174+
175+
/**
176+
* @category models
177+
* @since 3.8.0
178+
*/
179+
export interface DequeueUnifyIgnore extends Effect.EffectUnifyIgnore {
180+
Effect?: true
140181
}
141182

142183
/**

‎packages/effect/src/internal/groupBy.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type * as Channel from "../Channel.js"
33
import * as Chunk from "../Chunk.js"
44
import * as Deferred from "../Deferred.js"
55
import * as Effect from "../Effect.js"
6+
import * as Effectable from "../Effectable.js"
67
import * as Exit from "../Exit.js"
78
import { dual, pipe } from "../Function.js"
89
import type * as GroupBy from "../GroupBy.js"
@@ -316,7 +317,7 @@ export const bindEffect = dual<
316317

317318
const mapDequeue = <A, B>(dequeue: Queue.Dequeue<A>, f: (a: A) => B): Queue.Dequeue<B> => new MapDequeue(dequeue, f)
318319

319-
class MapDequeue<in out A, out B> implements Queue.Dequeue<B> {
320+
class MapDequeue<in out A, out B> extends Effectable.Class<B> implements Queue.Dequeue<B> {
320321
readonly [Queue.DequeueTypeId] = {
321322
_Out: (_: never) => _
322323
}
@@ -325,6 +326,7 @@ class MapDequeue<in out A, out B> implements Queue.Dequeue<B> {
325326
readonly dequeue: Queue.Dequeue<A>,
326327
readonly f: (a: A) => B
327328
) {
329+
super()
328330
}
329331

330332
capacity(): number {
@@ -390,6 +392,10 @@ class MapDequeue<in out A, out B> implements Queue.Dequeue<B> {
390392
pipe() {
391393
return pipeArguments(this, arguments)
392394
}
395+
396+
commit() {
397+
return this.take
398+
}
393399
}
394400

395401
/** @internal */

‎packages/effect/src/internal/pubsub.ts

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as Chunk from "../Chunk.js"
22
import type * as Deferred from "../Deferred.js"
33
import type * as Effect from "../Effect.js"
4+
import * as Effectable from "../Effectable.js"
45
import { dual, pipe } from "../Function.js"
56
import * as MutableQueue from "../MutableQueue.js"
67
import * as MutableRef from "../MutableRef.js"
@@ -909,7 +910,7 @@ class UnboundedPubSubSubscription<in out A> implements Subscription<A> {
909910
}
910911

911912
/** @internal */
912-
class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
913+
class SubscriptionImpl<in out A> extends Effectable.Class<A> implements Queue.Dequeue<A> {
913914
[queue.DequeueTypeId] = queue.dequeueVariance
914915

915916
constructor(
@@ -921,7 +922,13 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
921922
readonly shutdownFlag: MutableRef.MutableRef<boolean>,
922923
readonly strategy: PubSubStrategy<A>,
923924
readonly replayWindow: ReplayWindow<A>
924-
) {}
925+
) {
926+
super()
927+
}
928+
929+
commit() {
930+
return this.take
931+
}
925932

926933
pipe() {
927934
return pipeArguments(this, arguments)

‎packages/effect/src/internal/queue.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as Arr from "../Array.js"
22
import * as Chunk from "../Chunk.js"
33
import type * as Deferred from "../Deferred.js"
44
import type * as Effect from "../Effect.js"
5+
import * as Effectable from "../Effectable.js"
56
import { dual, pipe } from "../Function.js"
67
import * as MutableQueue from "../MutableQueue.js"
78
import * as MutableRef from "../MutableRef.js"
@@ -63,7 +64,7 @@ export const dequeueVariance = {
6364
}
6465

6566
/** @internal */
66-
class QueueImpl<in out A> implements Queue.Queue<A> {
67+
class QueueImpl<in out A> extends Effectable.Class<A> implements Queue.Queue<A> {
6768
readonly [EnqueueTypeId] = enqueueVariance
6869
readonly [DequeueTypeId] = dequeueVariance
6970

@@ -79,12 +80,17 @@ class QueueImpl<in out A> implements Queue.Queue<A> {
7980
/** @internal */
8081
readonly strategy: Queue.Strategy<A>
8182
) {
83+
super()
8284
}
8385

8486
pipe() {
8587
return pipeArguments(this, arguments)
8688
}
8789

90+
commit() {
91+
return this.take
92+
}
93+
8894
capacity(): number {
8995
return this.queue.capacity()
9096
}

‎packages/effect/test/Queue.test.ts

+10
Original file line numberDiff line numberDiff line change
@@ -775,4 +775,14 @@ describe("Queue", () => {
775775
expect(queue.pipe(identity)).toBe(queue)
776776
})
777777
)
778+
it.effect(
779+
"is subtype of Effect",
780+
() =>
781+
Effect.gen(function*() {
782+
const queue = yield* Queue.unbounded<number>()
783+
yield* Queue.offer(queue, 1)
784+
const result1 = yield* queue
785+
assert.strictEqual(result1, 1)
786+
})
787+
)
778788
})

‎packages/effect/test/Sink/constructors.test.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type * as Chunk from "effect/Chunk"
22
import * as Deferred from "effect/Deferred"
33
import * as Effect from "effect/Effect"
4+
import * as Effectable from "effect/Effectable"
45
import * as Exit from "effect/Exit"
56
import * as Fiber from "effect/Fiber"
67
import { pipe } from "effect/Function"
@@ -87,7 +88,7 @@ describe("Sink", () => {
8788

8889
const createQueueSpy = <A>(queue: Queue.Queue<A>): Queue.Queue<A> => new QueueSpy(queue)
8990

90-
class QueueSpy<A> implements Queue.Queue<A> {
91+
class QueueSpy<A> extends Effectable.Class<A> implements Queue.Queue<A> {
9192
readonly [Queue.DequeueTypeId] = internalQueue.dequeueVariance
9293
readonly [Queue.EnqueueTypeId] = internalQueue.enqueueVariance
9394
private isShutdownInternal = false
@@ -98,13 +99,18 @@ class QueueSpy<A> implements Queue.Queue<A> {
9899
readonly takers: MutableQueue.MutableQueue<Deferred.Deferred<A, never>>
99100

100101
constructor(readonly backingQueue: Queue.Queue<A>) {
102+
super()
101103
this.queue = backingQueue.queue
102104
this.shutdownFlag = backingQueue.shutdownFlag
103105
this.shutdownHook = backingQueue.shutdownHook
104106
this.strategy = backingQueue.strategy
105107
this.takers = backingQueue.takers
106108
}
107109

110+
commit() {
111+
return this.take
112+
}
113+
108114
pipe() {
109115
return pipeArguments(this, arguments)
110116
}

0 commit comments

Comments
 (0)
Please sign in to comment.