Skip to content

Commit 7d9c3bf

Browse files
IMax153mikearnaldi
authored andcommittedFeb 21, 2024
Consolidate Effect and Stream async methods (#2087)
1 parent b9cb3a9 commit 7d9c3bf

14 files changed

+320
-432
lines changed
 

‎.changeset/cuddly-parrots-know.md

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
Consolidate `Effect.asyncOption`, `Effect.asyncEither`, `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt`
6+
7+
This PR removes `Effect.asyncOption` and `Effect.asyncEither` as their behavior can be entirely implemented with the new signature of `Effect.async`, which optionally returns a cleanup `Effect` from the registration callback.
8+
9+
```ts
10+
declare const async: <A, E = never, R = never>(
11+
register: (callback: (_: Effect<A, E, R>) => void, signal: AbortSignal) => void | Effect<void, never, R>,
12+
blockingOn?: FiberId
13+
) => Effect<A, E, R>
14+
```
15+
16+
Additionally, this PR removes `Stream.asyncOption`, `Stream.asyncEither`, and `Stream.asyncInterrupt` as their behavior can be entirely implemented with the new signature of `Stream.async`, which can optionally return a cleanup `Effect` from the registration callback.
17+
18+
```ts
19+
declare const async: <A, E = never, R = never>(
20+
register: (emit: Emit<R, E, A, void>) => Effect<void, never, R> | void,
21+
outputBuffer?: number
22+
) => Stream<A, E, R>
23+
```

‎package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"private": true,
33
"type": "module",
4-
"packageManager": "pnpm@8.12.1",
4+
"packageManager": "pnpm@8.15.1",
55
"workspaces": [
66
"packages/*"
77
],

‎packages/effect/src/Effect.ts

+12-51
Original file line numberDiff line numberDiff line change
@@ -976,17 +976,20 @@ export const validateFirst: {
976976
// -------------------------------------------------------------------------------------
977977

978978
/**
979-
* Imports an asynchronous side-effect into a pure `Effect` value.
980-
* The callback function `Effect<A, E, R> => void` must be called at most once.
979+
* Imports an asynchronous side-effect into a pure `Effect` value. The callback
980+
* function `Effect<A, E, R> => void` **MUST** be called at most once.
981981
*
982-
* If an Effect is returned by the registration function, it will be executed
983-
* if the fiber executing the effect is interrupted.
982+
* The registration function can optionally return an Effect, which will be
983+
* executed if the `Fiber` executing this Effect is interrupted.
984984
*
985985
* The registration function can also receive an `AbortSignal` if required for
986986
* interruption.
987987
*
988-
* The `FiberId` of the fiber that may complete the async callback may be
989-
* provided to allow for better diagnostics.
988+
* The `FiberId` of the fiber that may complete the async callback may also be
989+
* specified. This is called the "blocking fiber" because it suspends the fiber
990+
* executing the `async` Effect (i.e. semantically blocks the fiber from making
991+
* progress). Specifying this fiber id in cases where it is known will improve
992+
* diagnostics, but not affect the behavior of the returned effect.
990993
*
991994
* @since 2.0.0
992995
* @category constructors
@@ -1005,51 +1008,9 @@ export const async: <A, E = never, R = never>(
10051008
* @since 2.0.0
10061009
* @category constructors
10071010
*/
1008-
export const asyncEffect: <A, E, R, X, E2, R2>(
1009-
register: (callback: (_: Effect<A, E, R>) => void) => Effect<X, E2, R2>
1010-
) => Effect<A, E | E2, R | R2> = _runtime.asyncEffect
1011-
1012-
/**
1013-
* Imports an asynchronous effect into a pure `Effect` value, possibly returning
1014-
* the value synchronously.
1015-
*
1016-
* If the register function returns a value synchronously, then the callback
1017-
* function `Effect<A, E, R> => void` must not be called. Otherwise the callback
1018-
* function must be called at most once.
1019-
*
1020-
* The `FiberId` of the fiber that may complete the async callback may be
1021-
* provided to allow for better diagnostics.
1022-
*
1023-
* @since 2.0.0
1024-
* @category constructors
1025-
*/
1026-
export const asyncOption: <A, E = never, R = never>(
1027-
register: (callback: (_: Effect<A, E, R>) => void) => Option.Option<Effect<A, E, R>>,
1028-
blockingOn?: FiberId.FiberId
1029-
) => Effect<A, E, R> = effect.asyncOption
1030-
1031-
/**
1032-
* Imports an asynchronous side-effect into an effect. It has the option of
1033-
* returning the value synchronously, which is useful in cases where it cannot
1034-
* be determined if the effect is synchronous or asynchronous until the register
1035-
* is actually executed. It also has the option of returning a canceler,
1036-
* which will be used by the runtime to cancel the asynchronous effect if the fiber
1037-
* executing the effect is interrupted.
1038-
*
1039-
* If the register function returns a value synchronously, then the callback
1040-
* function `Effect<A, E, R> => void` must not be called. Otherwise the callback
1041-
* function must be called at most once.
1042-
*
1043-
* The `FiberId` of the fiber that may complete the async callback may be
1044-
* provided to allow for better diagnostics.
1045-
*
1046-
* @since 2.0.0
1047-
* @category constructors
1048-
*/
1049-
export const asyncEither: <A, E = never, R = never>(
1050-
register: (callback: (effect: Effect<A, E, R>) => void) => Either.Either<Effect<void, never, R>, Effect<A, E, R>>,
1051-
blockingOn?: FiberId.FiberId
1052-
) => Effect<A, E, R> = core.asyncEither
1011+
export const asyncEffect: <A, E, R, R3, E2, R2>(
1012+
register: (callback: (_: Effect<A, E, R>) => void) => Effect<Effect<void, never, R3> | void, E2, R2>
1013+
) => Effect<A, E | E2, R | R2 | R3> = _runtime.asyncEffect
10531014

10541015
/**
10551016
* @since 2.0.0

‎packages/effect/src/Stream.ts

+6-31
Original file line numberDiff line numberDiff line change
@@ -256,15 +256,18 @@ export const as: {
256256
} = internal.as
257257

258258
const _async: <A, E = never, R = never>(
259-
register: (emit: Emit.Emit<R, E, A, void>) => void,
259+
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
260260
outputBuffer?: number
261261
) => Stream<A, E, R> = internal._async
262262

263263
export {
264264
/**
265265
* Creates a stream from an asynchronous callback that can be called multiple
266-
* times. The optionality of the error type `E` can be used to signal the end
267-
* of the stream, by setting it to `None`.
266+
* times. The optionality of the error type `E` in `Emit` can be used to
267+
* signal the end of the stream by setting it to `None`.
268+
*
269+
* The registration function can optionally return an `Effect`, which will be
270+
* executed if the `Fiber` executing this Effect is interrupted.
268271
*
269272
* @since 2.0.0
270273
* @category constructors
@@ -286,34 +289,6 @@ export const asyncEffect: <A, E = never, R = never>(
286289
outputBuffer?: number
287290
) => Stream<A, E, R> = internal.asyncEffect
288291

289-
/**
290-
* Creates a stream from an asynchronous callback that can be called multiple
291-
* times. The registration of the callback returns either a canceler or
292-
* synchronously returns a stream. The optionality of the error type `E` can
293-
* be used to signal the end of the stream, by setting it to `None`.
294-
*
295-
* @since 2.0.0
296-
* @category constructors
297-
*/
298-
export const asyncInterrupt: <A, E = never, R = never>(
299-
register: (emit: Emit.Emit<R, E, A, void>) => Either.Either<Effect.Effect<unknown, never, R>, Stream<A, E, R>>,
300-
outputBuffer?: number
301-
) => Stream<A, E, R> = internal.asyncInterrupt
302-
303-
/**
304-
* Creates a stream from an asynchronous callback that can be called multiple
305-
* times. The registration of the callback can possibly return the stream
306-
* synchronously. The optionality of the error type `E` can be used to signal
307-
* the end of the stream, by setting it to `None`.
308-
*
309-
* @since 2.0.0
310-
* @category constructors
311-
*/
312-
export const asyncOption: <A, E = never, R = never>(
313-
register: (emit: Emit.Emit<R, E, A, void>) => Option.Option<Stream<A, E, R>>,
314-
outputBuffer?: number
315-
) => Stream<A, E, R> = internal.asyncOption
316-
317292
/**
318293
* Creates a stream from an asynchronous callback that can be called multiple
319294
* times. The registration of the callback itself returns an a scoped

‎packages/effect/src/internal/clock.ts

+3-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import type * as Clock from "../Clock.js"
22
import * as Context from "../Context.js"
33
import * as Duration from "../Duration.js"
44
import type * as Effect from "../Effect.js"
5-
import * as Either from "../Either.js"
65
import { constFalse } from "../Function.js"
76
import * as core from "./core.js"
87

@@ -85,9 +84,9 @@ class ClockImpl implements Clock.Clock {
8584
}
8685

8786
sleep(duration: Duration.Duration): Effect.Effect<void> {
88-
return core.asyncEither<void>((cb) => {
89-
const canceler = globalClockScheduler.unsafeSchedule(() => cb(core.unit), duration)
90-
return Either.left(core.asUnit(core.sync(canceler)))
87+
return core.async<void>((resume) => {
88+
const canceler = globalClockScheduler.unsafeSchedule(() => resume(core.unit), duration)
89+
return core.asUnit(core.sync(canceler))
9190
})
9291
}
9392
}

‎packages/effect/src/internal/core-effect.ts

+1-22
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@ import * as Clock from "../Clock.js"
44
import * as Context from "../Context.js"
55
import * as Duration from "../Duration.js"
66
import type * as Effect from "../Effect.js"
7-
import * as Either from "../Either.js"
87
import type * as Fiber from "../Fiber.js"
9-
import * as FiberId from "../FiberId.js"
8+
import type * as FiberId from "../FiberId.js"
109
import type * as FiberRef from "../FiberRef.js"
1110
import * as FiberRefs from "../FiberRefs.js"
1211
import type * as FiberRefsPatch from "../FiberRefsPatch.js"
@@ -75,26 +74,6 @@ export const asSome = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<Opt
7574
export const asSomeError = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, Option.Option<E>, R> =>
7675
core.mapError(self, Option.some)
7776

78-
/* @internal */
79-
export const asyncOption = <A, E = never, R = never>(
80-
register: (callback: (_: Effect.Effect<A, E, R>) => void) => Option.Option<Effect.Effect<A, E, R>>,
81-
blockingOn: FiberId.FiberId = FiberId.none
82-
): Effect.Effect<A, E, R> =>
83-
core.asyncEither(
84-
(cb) => {
85-
const option = register(cb)
86-
switch (option._tag) {
87-
case "None": {
88-
return Either.left(core.unit)
89-
}
90-
case "Some": {
91-
return Either.right(option.value)
92-
}
93-
}
94-
},
95-
blockingOn
96-
)
97-
9877
/* @internal */
9978
export const try_: {
10079
<A, E>(options: {

‎packages/effect/src/internal/core.ts

+6-22
Original file line numberDiff line numberDiff line change
@@ -480,22 +480,6 @@ export const async = <A, E = never, R = never>(
480480
effect
481481
})
482482

483-
/* @internal */
484-
export const asyncEither = <A, E = never, R = never>(
485-
register: (
486-
resume: (effect: Effect.Effect<A, E, R>) => void
487-
) => Either.Either<Effect.Effect<void, never, R>, Effect.Effect<A, E, R>>,
488-
blockingOn: FiberId.FiberId = FiberId.none
489-
): Effect.Effect<A, E, R> =>
490-
async<A, E, R>((resume) => {
491-
const result = register(resume)
492-
if (Either.isRight(result)) {
493-
resume(result.right)
494-
} else {
495-
return result.left
496-
}
497-
}, blockingOn)
498-
499483
/* @internal */
500484
export const catchAllCause = dual<
501485
<E, A2, E2, R2>(
@@ -1462,11 +1446,11 @@ export const zipWith: {
14621446
): Effect.Effect<B, E | E2, R | R2> => flatMap(self, (a) => map(that, (b) => f(a, b))))
14631447

14641448
/* @internal */
1465-
export const never: Effect.Effect<never> = asyncEither<never>(() => {
1449+
export const never: Effect.Effect<never> = async<never>(() => {
14661450
const interval = setInterval(() => {
14671451
//
14681452
}, 2 ** 31 - 1)
1469-
return Either.left(sync(() => clearInterval(interval)))
1453+
return sync(() => clearInterval(interval))
14701454
})
14711455

14721456
// -----------------------------------------------------------------------------
@@ -2844,18 +2828,18 @@ export const deferredMakeAs = <A, E = never>(fiberId: FiberId.FiberId): Effect.E
28442828

28452829
/* @internal */
28462830
export const deferredAwait = <A, E>(self: Deferred.Deferred<A, E>): Effect.Effect<A, E> =>
2847-
asyncEither<A, E>((k) => {
2831+
async<A, E>((resume) => {
28482832
const state = MutableRef.get(self.state)
28492833
switch (state._tag) {
28502834
case DeferredOpCodes.OP_STATE_DONE: {
2851-
return Either.right(state.effect)
2835+
return resume(state.effect)
28522836
}
28532837
case DeferredOpCodes.OP_STATE_PENDING: {
28542838
pipe(
28552839
self.state,
2856-
MutableRef.set(deferred.pending([k, ...state.joiners]))
2840+
MutableRef.set(deferred.pending([resume, ...state.joiners]))
28572841
)
2858-
return Either.left(deferredInterruptJoiner(self, k))
2842+
return deferredInterruptJoiner(self, resume)
28592843
}
28602844
}
28612845
}, self.blockingOn)

‎packages/effect/src/internal/effect/circular.ts

+4-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import type * as Cause from "../../Cause.js"
22
import type * as Deferred from "../../Deferred.js"
33
import * as Duration from "../../Duration.js"
44
import type * as Effect from "../../Effect.js"
5-
import * as Either from "../../Either.js"
65
import * as Equal from "../../Equal.js"
76
import type { Equivalence } from "../../Equivalence.js"
87
import * as Exit from "../../Exit.js"
@@ -45,7 +44,7 @@ class Semaphore {
4544
}
4645

4746
readonly take = (n: number): Effect.Effect<number> =>
48-
core.asyncEither<number>((resume) => {
47+
core.async<number>((resume) => {
4948
if (this.free < n) {
5049
const observer = () => {
5150
if (this.free < n) {
@@ -56,12 +55,12 @@ class Semaphore {
5655
resume(core.succeed(n))
5756
}
5857
this.waiters.add(observer)
59-
return Either.left(core.sync(() => {
58+
return core.sync(() => {
6059
this.waiters.delete(observer)
61-
}))
60+
})
6261
}
6362
this.taken += n
64-
return Either.right(core.succeed(n))
63+
return resume(core.succeed(n))
6564
})
6665

6766
readonly updateTaken = (f: (n: number) => number): Effect.Effect<number> =>

‎packages/effect/src/internal/runtime.ts

+32-19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import * as core from "./core.js"
2222
import * as executionStrategy from "./executionStrategy.js"
2323
import * as FiberRuntime from "./fiberRuntime.js"
2424
import * as fiberScope from "./fiberScope.js"
25+
import { internalize } from "./internalize.js"
2526
import * as OpCodes from "./opCodes/effect.js"
2627
import * as runtimeFlags from "./runtimeFlags.js"
2728
import * as _supervisor from "./supervisor.js"
@@ -450,22 +451,34 @@ export const unsafeRunSyncExitEffect = unsafeRunSyncExit(defaultRuntime)
450451
// circular with Effect
451452

452453
/** @internal */
453-
export const asyncEffect = <A, E, R, X, E2, R2>(
454-
register: (callback: (_: Effect.Effect<A, E, R>) => void) => Effect.Effect<X, E2, R2>
455-
): Effect.Effect<A, E | E2, R | R2> =>
456-
core.flatMap(
457-
core.deferredMake<A, E | E2>(),
458-
(deferred) =>
459-
core.flatMap(runtime<R | R2>(), (runtime) =>
460-
core.uninterruptibleMask((restore) =>
461-
core.zipRight(
462-
FiberRuntime.fork(restore(
463-
core.catchAllCause(
464-
register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
465-
(cause) => core.deferredFailCause(deferred, cause)
466-
)
467-
)),
468-
restore(core.deferredAwait(deferred))
469-
)
470-
))
471-
)
454+
export const asyncEffect = <A, E, R, R3, E2, R2>(
455+
register: (
456+
callback: (_: Effect.Effect<A, E, R>) => void
457+
) => Effect.Effect<Effect.Effect<void, never, R3> | void, E2, R2>
458+
): Effect.Effect<A, E | E2, R | R2 | R3> =>
459+
core.suspend(() => {
460+
internalize(register)
461+
let cleanup: Effect.Effect<void, never, R3> | void = undefined
462+
return core.flatMap(
463+
core.deferredMake<A, E | E2>(),
464+
(deferred) =>
465+
core.flatMap(runtime<R | R2 | R3>(), (runtime) =>
466+
core.uninterruptibleMask((restore) =>
467+
core.zipRight(
468+
FiberRuntime.fork(restore(
469+
core.matchCauseEffect(
470+
register((cb) => unsafeRunCallback(runtime)(core.intoDeferred(cb, deferred))),
471+
{
472+
onFailure: (cause) => core.deferredFailCause(deferred, cause),
473+
onSuccess: (cleanup_) => {
474+
cleanup = cleanup_
475+
return core.unit
476+
}
477+
}
478+
)
479+
)),
480+
restore(core.onInterrupt(core.deferredAwait(deferred), () => cleanup ?? core.unit))
481+
)
482+
))
483+
)
484+
})

‎packages/effect/src/internal/stream.ts

+77-111
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import * as pull from "./stream/pull.js"
4848
import * as SinkEndReason from "./stream/sinkEndReason.js"
4949
import * as ZipAllState from "./stream/zipAllState.js"
5050
import * as ZipChunksState from "./stream/zipChunksState.js"
51-
import * as _take from "./take.js"
51+
import * as InternalTake from "./take.js"
5252

5353
/** @internal */
5454
const StreamSymbolKey = "effect/Stream"
@@ -461,13 +461,57 @@ export const as = dual<
461461

462462
/** @internal */
463463
export const _async = <A, E = never, R = never>(
464-
register: (emit: Emit.Emit<R, E, A, void>) => void,
464+
register: (
465+
emit: Emit.Emit<R, E, A, void>
466+
) => Effect.Effect<void, never, R> | void,
465467
outputBuffer = 16
466468
): Stream.Stream<A, E, R> =>
467-
asyncOption((cb) => {
468-
register(cb)
469-
return Option.none()
470-
}, outputBuffer)
469+
Effect.acquireRelease(
470+
Queue.bounded<Take.Take<A, E>>(outputBuffer),
471+
(queue) => Queue.shutdown(queue)
472+
).pipe(
473+
Effect.flatMap((output) =>
474+
Effect.runtime<R>().pipe(
475+
Effect.flatMap((runtime) =>
476+
Effect.sync(() => {
477+
const runPromiseExit = Runtime.runPromiseExit(runtime)
478+
const canceler = register(emit.make<R, E, A, void>((resume) =>
479+
InternalTake.fromPull(resume).pipe(
480+
Effect.flatMap((take) => Queue.offer(output, take)),
481+
Effect.asUnit,
482+
runPromiseExit
483+
).then((exit) => {
484+
if (Exit.isFailure(exit)) {
485+
if (!Cause.isInterrupted(exit.cause)) {
486+
throw Cause.squash(exit.cause)
487+
}
488+
}
489+
})
490+
))
491+
return canceler
492+
})
493+
),
494+
Effect.map((value) => {
495+
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = Queue.take(output).pipe(
496+
Effect.flatMap((take) => InternalTake.done(take)),
497+
Effect.match({
498+
onFailure: (maybeError) =>
499+
core.fromEffect(Queue.shutdown(output)).pipe(
500+
channel.zipRight(Option.match(maybeError, {
501+
onNone: () => core.unit,
502+
onSome: (error) => core.fail(error)
503+
}))
504+
),
505+
onSuccess: (chunk) => core.write(chunk).pipe(core.flatMap(() => loop))
506+
}),
507+
channel.unwrap
508+
)
509+
return fromChannel(loop).pipe(ensuring(value ?? Effect.unit))
510+
})
511+
)
512+
),
513+
unwrapScoped
514+
)
471515

472516
/** @internal */
473517
export const asyncEffect = <A, E = never, R = never>(
@@ -487,7 +531,7 @@ export const asyncEffect = <A, E = never, R = never>(
487531
register(
488532
emit.make((k) =>
489533
pipe(
490-
_take.fromPull(k),
534+
InternalTake.fromPull(k),
491535
Effect.flatMap((take) => Queue.offer(output, take)),
492536
Effect.asUnit,
493537
Runtime.runPromiseExit(runtime)
@@ -503,7 +547,7 @@ export const asyncEffect = <A, E = never, R = never>(
503547
Effect.map(() => {
504548
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
505549
Queue.take(output),
506-
Effect.flatMap(_take.done),
550+
Effect.flatMap(InternalTake.done),
507551
Effect.match({
508552
onFailure: (maybeError) =>
509553
pipe(
@@ -524,84 +568,6 @@ export const asyncEffect = <A, E = never, R = never>(
524568
fromChannel
525569
)
526570

527-
/** @internal */
528-
export const asyncInterrupt = <A, E = never, R = never>(
529-
register: (
530-
emit: Emit.Emit<R, E, A, void>
531-
) => Either.Either<Effect.Effect<unknown, never, R>, Stream.Stream<A, E, R>>,
532-
outputBuffer = 16
533-
): Stream.Stream<A, E, R> =>
534-
pipe(
535-
Effect.acquireRelease(
536-
Queue.bounded<Take.Take<A, E>>(outputBuffer),
537-
(queue) => Queue.shutdown(queue)
538-
),
539-
Effect.flatMap((output) =>
540-
pipe(
541-
Effect.runtime<R>(),
542-
Effect.flatMap((runtime) =>
543-
pipe(
544-
Effect.sync(() =>
545-
register(
546-
emit.make((k) =>
547-
pipe(
548-
_take.fromPull(k),
549-
Effect.flatMap((take) => Queue.offer(output, take)),
550-
Effect.asUnit,
551-
Runtime.runPromiseExit(runtime)
552-
).then((exit) => {
553-
if (Exit.isFailure(exit)) {
554-
if (!Cause.isInterrupted(exit.cause)) {
555-
throw Cause.squash(exit.cause)
556-
}
557-
}
558-
})
559-
)
560-
)
561-
),
562-
Effect.map(Either.match({
563-
onLeft: (canceler) => {
564-
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
565-
Queue.take(output),
566-
Effect.flatMap(_take.done),
567-
Effect.match({
568-
onFailure: (maybeError) =>
569-
channel.zipRight(
570-
core.fromEffect(Queue.shutdown(output)),
571-
Option.match(maybeError, {
572-
onNone: () => core.unit,
573-
onSome: core.fail
574-
})
575-
),
576-
onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => loop))
577-
}),
578-
channel.unwrap
579-
)
580-
return pipe(fromChannel(loop), ensuring(canceler))
581-
},
582-
onRight: (stream) => unwrap(pipe(Queue.shutdown(output), Effect.as(stream)))
583-
}))
584-
)
585-
)
586-
)
587-
),
588-
unwrapScoped
589-
)
590-
591-
/** @internal */
592-
export const asyncOption = <A, E = never, R = never>(
593-
register: (emit: Emit.Emit<R, E, A, void>) => Option.Option<Stream.Stream<A, E, R>>,
594-
outputBuffer = 16
595-
): Stream.Stream<A, E, R> =>
596-
asyncInterrupt(
597-
(emit) =>
598-
Option.match(register(emit), {
599-
onNone: () => Either.left(Effect.unit),
600-
onSome: Either.right
601-
}),
602-
outputBuffer
603-
)
604-
605571
/** @internal */
606572
export const asyncScoped = <A, E = never, R = never>(
607573
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
@@ -620,7 +586,7 @@ export const asyncScoped = <A, E = never, R = never>(
620586
register(
621587
emit.make((k) =>
622588
pipe(
623-
_take.fromPull(k),
589+
InternalTake.fromPull(k),
624590
Effect.flatMap((take) => Queue.offer(output, take)),
625591
Effect.asUnit,
626592
Runtime.runPromiseExit(runtime)
@@ -642,7 +608,7 @@ export const asyncScoped = <A, E = never, R = never>(
642608
pull.end() :
643609
pipe(
644610
Queue.take(output),
645-
Effect.flatMap(_take.done),
611+
Effect.flatMap(InternalTake.done),
646612
Effect.onError(() =>
647613
pipe(
648614
Ref.set(ref, true),
@@ -884,7 +850,7 @@ export const bufferChunks = dual<
884850
Effect.map(queue, (queue) => {
885851
const process: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
886852
core.fromEffect(Queue.take(queue)),
887-
core.flatMap(_take.match({
853+
core.flatMap(InternalTake.match({
888854
onEnd: () => core.unit,
889855
onFailure: core.failCause,
890856
onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process))
@@ -947,7 +913,7 @@ const bufferUnbounded = <A, E, R>(self: Stream.Stream<A, E, R>): Stream.Stream<A
947913
Effect.map(queue, (queue) => {
948914
const process: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
949915
core.fromEffect(Queue.take(queue)),
950-
core.flatMap(_take.match({
916+
core.flatMap(InternalTake.match({
951917
onEnd: () => core.unit,
952918
onFailure: core.failCause,
953919
onSuccess: (value) => core.flatMap(core.write(value), () => process)
@@ -989,16 +955,16 @@ const bufferSignal = <A, E, R>(
989955
Effect.flatMap(
990956
(deferred) =>
991957
pipe(
992-
Queue.offer(queue, [_take.chunk(input), deferred] as const),
958+
Queue.offer(queue, [InternalTake.chunk(input), deferred] as const),
993959
Effect.flatMap((added) => pipe(Ref.set(ref, deferred), Effect.when(() => added)))
994960
)
995961
),
996962
Effect.asUnit,
997963
core.fromEffect,
998964
core.flatMap(() => producer(queue, ref))
999965
),
1000-
onFailure: (error) => terminate(_take.failCause(error)),
1001-
onDone: () => terminate(_take.end)
966+
onFailure: (error) => terminate(InternalTake.failCause(error)),
967+
onDone: () => terminate(InternalTake.end)
1002968
})
1003969
}
1004970
const consumer = (
@@ -1009,7 +975,7 @@ const bufferSignal = <A, E, R>(
1009975
core.flatMap(([take, deferred]) =>
1010976
channel.zipRight(
1011977
core.fromEffect(Deferred.succeed(deferred, void 0)),
1012-
_take.match(take, {
978+
InternalTake.match(take, {
1013979
onEnd: () => core.unit,
1014980
onFailure: core.failCause,
1015981
onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process))
@@ -1469,19 +1435,19 @@ export const combineChunks = dual<
14691435
core.flatMap(
14701436
core.fromEffect(pipe(
14711437
handoff,
1472-
Handoff.offer<Take.Take<Elem, Err>>(_take.chunk(input))
1438+
Handoff.offer<Take.Take<Elem, Err>>(InternalTake.chunk(input))
14731439
)),
14741440
() => producer(handoff, latch)
14751441
),
14761442
onFailure: (cause) =>
14771443
core.fromEffect(
14781444
Handoff.offer<Take.Take<Elem, Err>>(
14791445
handoff,
1480-
_take.failCause(cause)
1446+
InternalTake.failCause(cause)
14811447
)
14821448
),
14831449
onDone: (): Channel.Channel<never, Chunk.Chunk<Elem>, never, Err, unknown, unknown, R> =>
1484-
core.fromEffect(Handoff.offer<Take.Take<Elem, Err>>(handoff, _take.end))
1450+
core.fromEffect(Handoff.offer<Take.Take<Elem, Err>>(handoff, InternalTake.end))
14851451
})
14861452
)
14871453
return new StreamImpl(
@@ -1515,7 +1481,7 @@ export const combineChunks = dual<
15151481
Effect.zipRight(
15161482
pipe(
15171483
Handoff.take(left),
1518-
Effect.flatMap(_take.done)
1484+
Effect.flatMap(InternalTake.done)
15191485
)
15201486
)
15211487
)
@@ -1525,7 +1491,7 @@ export const combineChunks = dual<
15251491
Effect.zipRight(
15261492
pipe(
15271493
Handoff.take(right),
1528-
Effect.flatMap(_take.done)
1494+
Effect.flatMap(InternalTake.done)
15291495
)
15301496
)
15311497
)
@@ -3388,20 +3354,20 @@ export const interleaveWith = dual<
33883354
onInput: (value: A | A2) =>
33893355
core.flatMap(
33903356
core.fromEffect(
3391-
Handoff.offer<Take.Take<A | A2, E | E2 | E3>>(handoff, _take.of(value))
3357+
Handoff.offer<Take.Take<A | A2, E | E2 | E3>>(handoff, InternalTake.of(value))
33923358
),
33933359
() => producer(handoff)
33943360
),
33953361
onFailure: (cause) =>
33963362
core.fromEffect(
33973363
Handoff.offer<Take.Take<A | A2, E | E2 | E3>>(
33983364
handoff,
3399-
_take.failCause(cause)
3365+
InternalTake.failCause(cause)
34003366
)
34013367
),
34023368
onDone: () =>
34033369
core.fromEffect(
3404-
Handoff.offer<Take.Take<A | A2, E | E2 | E3>>(handoff, _take.end)
3370+
Handoff.offer<Take.Take<A | A2, E | E2 | E3>>(handoff, InternalTake.end)
34053371
)
34063372
})
34073373
return new StreamImpl(
@@ -3437,7 +3403,7 @@ export const interleaveWith = dual<
34373403
if (bool && !leftDone) {
34383404
return pipe(
34393405
core.fromEffect(Handoff.take(left)),
3440-
core.flatMap(_take.match({
3406+
core.flatMap(InternalTake.match({
34413407
onEnd: () => rightDone ? core.unit : process(true, rightDone),
34423408
onFailure: core.failCause,
34433409
onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => process(leftDone, rightDone)))
@@ -3447,7 +3413,7 @@ export const interleaveWith = dual<
34473413
if (!bool && !rightDone) {
34483414
return pipe(
34493415
core.fromEffect(Handoff.take(right)),
3450-
core.flatMap(_take.match({
3416+
core.flatMap(InternalTake.match({
34513417
onEnd: () => leftDone ? core.unit : process(leftDone, true),
34523418
onFailure: core.failCause,
34533419
onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => process(leftDone, rightDone)))
@@ -5450,9 +5416,9 @@ export const runIntoQueueScoped = dual<
54505416
): Effect.Effect<void, never, Scope.Scope | R> => {
54515417
const writer: Channel.Channel<Take.Take<A, E>, Chunk.Chunk<A>, never, E, unknown, unknown, R> = core
54525418
.readWithCause({
5453-
onInput: (input: Chunk.Chunk<A>) => core.flatMap(core.write(_take.chunk(input)), () => writer),
5454-
onFailure: (cause) => core.write(_take.failCause(cause)),
5455-
onDone: () => core.write(_take.end)
5419+
onInput: (input: Chunk.Chunk<A>) => core.flatMap(core.write(InternalTake.chunk(input)), () => writer),
5420+
onFailure: (cause) => core.write(InternalTake.failCause(cause)),
5421+
onDone: () => core.write(InternalTake.end)
54565422
})
54575423
return pipe(
54585424
core.pipeTo(toChannel(self), writer),
@@ -6229,23 +6195,23 @@ export const tapSink = dual<
62296195
.readWithCause({
62306196
onInput: (chunk: Chunk.Chunk<A>) =>
62316197
pipe(
6232-
core.fromEffect(Queue.offer(queue, _take.chunk(chunk))),
6198+
core.fromEffect(Queue.offer(queue, InternalTake.chunk(chunk))),
62336199
core.foldCauseChannel({
62346200
onFailure: () => core.flatMap(core.write(chunk), () => channel.identityChannel()),
62356201
onSuccess: () => core.flatMap(core.write(chunk), () => loop)
62366202
})
62376203
) as Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E | E2, E, unknown, unknown, R2>,
62386204
onFailure: (cause: Cause.Cause<E | E2>) =>
62396205
pipe(
6240-
core.fromEffect(Queue.offer(queue, _take.failCause(cause))),
6206+
core.fromEffect(Queue.offer(queue, InternalTake.failCause(cause))),
62416207
core.foldCauseChannel({
62426208
onFailure: () => core.failCause(cause),
62436209
onSuccess: () => core.failCause(cause)
62446210
})
62456211
),
62466212
onDone: () =>
62476213
pipe(
6248-
core.fromEffect(Queue.offer(queue, _take.end)),
6214+
core.fromEffect(Queue.offer(queue, InternalTake.end)),
62496215
core.foldCauseChannel({
62506216
onFailure: () => core.unit,
62516217
onSuccess: () => core.unit
@@ -6256,7 +6222,7 @@ export const tapSink = dual<
62566222
new StreamImpl(pipe(
62576223
core.pipeTo(toChannel(self), loop),
62586224
channel.ensuring(Effect.zipRight(
6259-
Effect.forkDaemon(Queue.offer(queue, _take.end)),
6225+
Effect.forkDaemon(Queue.offer(queue, InternalTake.end)),
62606226
Deferred.await(deferred)
62616227
))
62626228
)),

‎packages/effect/test/Effect/async.test.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ describe("Effect", () => {
2222
}))
2323
it.effect("simple asyncEffect must return", () =>
2424
Effect.gen(function*($) {
25-
const result = yield* $(Effect.asyncEffect<unknown, unknown, never, void, never, never>((cb) => {
26-
return Effect.succeed(cb(Effect.succeed(42)))
25+
const result = yield* $(Effect.asyncEffect<number, never, never, never, never, never>((resume) => {
26+
return Effect.succeed(resume(Effect.succeed(42)))
2727
}))
2828
assert.strictEqual(result, 42)
2929
}))
@@ -95,18 +95,18 @@ describe("Effect", () => {
9595
assert.deepStrictEqual(unexpected, Chunk.empty())
9696
assert.deepStrictEqual(result, Option.none()) // the timeout should happen
9797
}))
98-
it.live("asyncMaybe should not resume fiber twice after synchronous result", () =>
98+
it.live("async should not resume fiber twice after synchronous result", () =>
9999
Effect.gen(function*($) {
100100
const step = yield* $(Deferred.make<void>())
101101
const unexpectedPlace = yield* $(Ref.make(Chunk.empty<number>()))
102102
const runtime = yield* $(Effect.runtime<never>())
103103
const fiber = yield* $(
104-
Effect.asyncOption<void, never, never>((cb) => {
104+
Effect.async<void, never, never>((resume) => {
105105
Runtime.runCallback(runtime)(pipe(
106106
Deferred.await(step),
107-
Effect.zipRight(Effect.sync(() => cb(Ref.update(unexpectedPlace, Chunk.prepend(1)))))
107+
Effect.zipRight(Effect.sync(() => resume(Ref.update(unexpectedPlace, Chunk.prepend(1)))))
108108
))
109-
return Option.some(Effect.unit)
109+
return Effect.unit
110110
}),
111111
Effect.flatMap(() =>
112112
Effect.async<void, never, never>(() => {

‎packages/effect/test/Effect/concurrency.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ describe("Effect", () => {
4949
const release = yield* $(Deferred.make<number>())
5050
const acquire = yield* $(Deferred.make<void>())
5151
const fiber = yield* $(
52-
Effect.asyncEffect<unknown, unknown, never, unknown, unknown, never>((_) =>
52+
Effect.asyncEffect<void, never, never, never, never, never>((_) =>
5353
// This will never complete because the callback is never invoked
5454
Effect.acquireUseRelease(
5555
Deferred.succeed(acquire, void 0),

‎packages/effect/test/Effect/interruption.test.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -546,11 +546,11 @@ describe("Effect", () => {
546546
it.effect("async cancelation", () =>
547547
Effect.gen(function*($) {
548548
const ref = MutableRef.make(0)
549-
const effect = Effect.asyncEither(() => {
549+
const effect = Effect.async(() => {
550550
pipe(ref, MutableRef.set(MutableRef.get(ref) + 1))
551-
return Either.left(Effect.sync(() => {
551+
return Effect.sync(() => {
552552
pipe(ref, MutableRef.set(MutableRef.get(ref) - 1))
553-
}))
553+
})
554554
})
555555
yield* $(Effect.unit, Effect.race(effect))
556556
const result = MutableRef.get(ref)

‎packages/effect/test/Stream/async.test.ts

+145-156
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.