Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise happy path in ZChannel.mapOutZIOPar* #9123

Merged
merged 15 commits into from
Nov 2, 2024

Conversation

regiskuckaertz
Copy link
Member

@regiskuckaertz regiskuckaertz commented Aug 16, 2024

Changes a few things:

  • in mapOutZIOPar, the queue now harbours the fibers themselves instead of promises
  • optimises for the common path by changing the effect from IO[Unit, Either[OutDone, OutElem]] to IO[Either[OutDone, Unit], OutElem]
  • uses a version of toPullIn that does the same thing: in toPullInAlt, I'm using Exit instead of ZIO but I'm not sure that's correct, let me know what you think
  • move error/end of source handling out of repetition
  • move race out of worker fiber
  • fork worker fibers in the scope of the consumer: the error signal will interrupt the consumer fiber, which will interrupt all its children

This is not going as far as the previous optimised implementation.

@regiskuckaertz
Copy link
Member Author

erh, looks like some history got mixed up

Changes a few things:

- in `mapOutZIOPar`, the queue now harbours the fibers themselves instead of promises
- optimises for the common path by changing the effect from `IO[Unit, Either[OutDone, OutElem]]`
  to `IO[Either[OutDone, Unit], OutElem]`
- uses a version of `toPullIn` that does the same thing: in `toPullInAlt`, I'm using `Exit` instead
  of `ZIO` but I'm not sure that's correct, let me know what you think

This is not going as far as the previous optimised implementation.

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
fmt
Copy link
Collaborator

@hearnadam hearnadam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems good to me, though I am not very familiar with the internals of ZChannel.

core/shared/src/main/scala/zio/ZIO.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
case f: Exit.Failure[Either[Unit, OutDone]] =>
f.cause.failureOrCause match {
case Left(_: Left[Unit, OutDone]) => ZChannel.unwrap(failure.get.map(ZChannel.refailCause(_)))
case Left(x: Right[Unit, OutDone]) => ZChannel.succeedNow(x.value)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this become case Left(Right(value)) => .. for readability? I guess it is nice to see alignment with 699.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it like that, if we cache exit values even for errors then this seems like going in the same direction

streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
@regiskuckaertz
Copy link
Member Author

@kyri-petrou take a look when you have some time please

@kyri-petrou
Copy link
Contributor

Sorry I've been fairly busy the past few weeks so I didn't get time to go through it. I'll try to review it over the weekend

Copy link
Contributor

@kyri-petrou kyri-petrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, mostly minor comments and some questions. I'll probably take another look at it once the comments are resolved just to make sure I didn't miss anything in the first pass 😅

Comment on lines +1913 to +1922
n <- ZIO.succeed(n)
bufferSize <- ZIO.succeed(bufferSize)
mergeStrategy <- ZIO.succeed(mergeStrategy)
outgoing <- Queue.bounded[ZIO[Env, Either[OutErr, OutDone], OutElem]](bufferSize)
_ <- scope.addFinalizer(outgoing.shutdown)
cancelers <- Queue.unbounded[Promise[Nothing, Unit]]
_ <- scope.addFinalizer(cancelers.shutdown)
lastDone <- Ref.make[Option[OutDone]](None)
errorSignal <- Promise.make[Nothing, Unit]
permits <- Semaphore.make(n.toLong)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe outside the scope of this PR, but this can be made a lot more lightweight by removing the redundant ZIO.succeed and using the unsafe API of Promise / Ref / Queue / Semaphore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't pay much attention to this to be honest as it's executed only at the creation of the channel but yeah it might have a cost for something like zio-grpc.

}
_ <- permits
.withPermit(latch.succeed(()) *> raceIOs)
.fork
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale behind the change from forkIn to fork here? I did notice that fork is faster, but what worries me about it is that fork is somewhat leaky for long-running fibers.

Since fork adds any child fibers to it's internal fiber set, the set might grow too large for long-running streams

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, but these worker fibers are short-lived, aren't they removed from the set after they're done?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is where it gets a bit tricky; they're removed from the set once we no longer hold any reference to them (since they're stored in a WeakHashMap). If for some the FiberRuntime keeps being referenced after completion, it will remain in the children set which might cause it to grow indefinitely.

Having said that (and thinking about it a bit more), if some part of the application is causing fibers to be indefinitely referenced, then memory leak is going to occur regardless. So I think using fork probably isn't as big of an issue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if that were to happen that would be flagged as a bug either in the runtime or in the layer above that uses the stream API. IMHO there is very little reason to create a scope within a scope, I only ever use them to escape the implicit scope of the parent fiber.

streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
@regiskuckaertz
Copy link
Member Author

#9123 (comment)

It is used quite liberally b/c iirc some users pass imperative code in these parameters, like in ZIO.succeed. Maybe in this case it's too much, I agree, but I think let's think about this as a separate issue as it applies to many operators. Is there not a bincompat issue?

@regiskuckaertz
Copy link
Member Author

@kyri-petrou 🙏

@regiskuckaertz
Copy link
Member Author

@kyri-petrou 😄

Copy link
Contributor

@kyri-petrou kyri-petrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just 1 main comment that needs to be addressed, otherwise looks good to me.

Having said that, I think we should test all the methods / branches within uninterruptible regions to ensure that we didn't cause any regressions on that front as it seems that behaviour is not currently being tested

streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
@regiskuckaertz
Copy link
Member Author

@kyri-petrou i've restored them also around the forever just to be safe, it can't hurt anyway.

Copy link
Contributor

@kyri-petrou kyri-petrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for keep adding comments / asking for changes.

I'm a bit paranoid with altering ZStream behaviour since our test coverage is evidently not too great especially when it comes to interruption, and I keep spotting things that seem they might cause an issue when I re-review changes.

Copy link
Contributor

@kyri-petrou kyri-petrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, and sorry again for taking this long to review this. By the way do you know what the perf improvement is now after the most recent changes? I'm suspecting altering the interruption will add a bit of overhead

@regiskuckaertz
Copy link
Member Author

@kyri-petrou not at all, your comments helped a lot and were challenging, thank you 🙇 I'll run the benchmarks, to be honest I think the cost is still dominated by creating fibers: there must be a way to avoid the eager copy of all FiberRefs.

@regiskuckaertz
Copy link
Member Author

[info] StreamParBenchmark.zioMerge                     10000         5000              50  thrpt   15  4292.100 ± 27.658  ops/s
[info] StreamParBenchmark.zioMerge2                    10000         5000              50  thrpt   15  3699.083 ± 32.063  ops/s
[info] StreamParBenchmark.zioMergeWithIdentity         10000         5000              50  thrpt   15  3637.756 ± 55.500  ops/s
[info] StreamParBenchmark.zioMapPar                  10000         5000              50  thrpt   15  0.115 ± 0.008  ops/s
[info] StreamParBenchmark.zioMapParUnordered         10000         5000              50  thrpt   15  0.123 ± 0.004  ops/s

then

[info] StreamParBenchmark.zioMerge                     10000         5000              50  thrpt   15  4611.105 ± 55.894  ops/s
[info] StreamParBenchmark.zioMerge2                    10000         5000              50  thrpt   15  3738.064 ± 81.570  ops/s
[info] StreamParBenchmark.zioMergeWithIdentity         10000         5000              50  thrpt   15  3364.208 ± 61.184  ops/s
[info] StreamParBenchmark.zioMapPar                  10000         5000              50  thrpt   15  0.158 ± 0.013  ops/s
[info] StreamParBenchmark.zioMapParUnordered         10000         5000              50  thrpt   15  0.118 ± 0.006  ops/s

just a marginal difference for these benchmarks.

@regiskuckaertz regiskuckaertz merged commit 97d99b2 into zio:series/2.x Nov 2, 2024
18 checks passed
@regiskuckaertz regiskuckaertz deleted the rk-streampar-faster branch November 2, 2024 11:45
@kyri-petrou
Copy link
Contributor

@regiskuckaertz there seems to be a notable regression in the zioMergeWithIdentity benchmark. Is this due to some inconsistency between benchmark iterations or is it something we should look into?

@regiskuckaertz
Copy link
Member Author

@kyri-petrou we should look :-) I'll spend some time this weekend, in my mind I wanted a baseline implementation that was easy to reason about, though I was secretly hopeful that simpler would be faster 😅

ZIO.environmentWithZIO[Env] { environment =>
scope.addFinalizerExit { exit =>
val finalizer = exec.close(exit)
if (finalizer ne null) finalizer.provideEnvironment(environment)
Copy link
Member

@guizmaii guizmaii Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@regiskuckaertz @kyri-petrou @hearnadam

Why are we testing if the finalizer is not null here? It doesn't seem like exec.close(exit) can return null
Shouldn't it be environment ne null? 🤔

Copy link
Contributor

@kyri-petrou kyri-petrou Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good question. Although I don't think it's possible for neither the finalizer nor the environment to be null 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does ZIO.environmentWithZIO[Env] return when Env is Any? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be the current environment (this is always the case, the environment might contain more services than just R) without any requirement that it will contain any services. So at the very "minimum" it will be ZEnvironment.empty

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants