-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed race conditions + unsafe struct assignment in SelectAsync
#7521
Fixed race conditions + unsafe struct assignment in SelectAsync
#7521
Conversation
close akkadotnet#7518
Co-authored-by: Michael Buck <mhbuck@gmail.com>
`null` might be a perfectly acceptable value inside a `SelectAsync` stage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detailed my changes
@@ -2512,61 +2513,47 @@ public Expand(Func<TIn, IEnumerator<TOut>> extrapolate) | |||
/// </returns> | |||
public override string ToString() => "Expand"; | |||
} | |||
|
|||
#nullable enable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enable nullable
to take advantage of the changes made in #7520
[InternalApi] | ||
public sealed class SelectAsync<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>> | ||
{ | ||
#region internal classes | ||
|
||
private sealed class Logic : InAndOutGraphStageLogic | ||
{ | ||
private class Holder<T> | ||
private sealed class Holder<T>(object message, Result<T> element) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used a primary CTOR because I felt like it 🤫
} | ||
|
||
private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception()); | ||
|
||
private readonly SelectAsync<TIn, TOut> _stage; | ||
private readonly Decider _decider; | ||
private IBuffer<Holder<TOut>> _buffer; | ||
private readonly Action<Holder<TOut>> _taskCallback; | ||
private readonly Action<(Holder<TOut>, Result<TOut>)> _taskCallback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncCallbacks in Akka.Streams can only accept a single input parameter, so we have to go with a ValueTuple
here
|
||
public Result<T> Element { get; private set; } | ||
public object Message { get; } | ||
public object? Message { get; private set; } = message; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null
might be a legal value for the message for all we know
task.ContinueWith(t => holder.Invoke(Result.FromTask(t)), | ||
TaskContinuationOptions.ExecuteSynchronously); | ||
{ | ||
async Task WaitForTask() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used the latest .NET meta for relying on a local function + detached Task
+ await
on parent task instead of using ContinueWith
- this is designed to tremendously simplify error-handling mostly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the side effect of eliminating the AggregateException
s previously emitted from SelectAsync
, which is why I've had to update some of the unit tests.
@@ -2677,17 +2678,18 @@ private void PushOne() | |||
} | |||
} | |||
|
|||
private void HolderCompleted(Holder<TOut> holder) | |||
private void HolderCompleted(Holder<TOut> holder, Result<TOut> result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the real fix - we now pass in both the Holder<TOut>
and the Result<TOut>
together into the StageActor's context and perform the assignment there, where it's guaranteed to be thread-safe, rather than doing it half-in / half-out like before.
@@ -2663,12 +2664,12 @@ private void PushOne() | |||
break; | |||
|
|||
default: | |||
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception); | |||
throw new ArgumentOutOfRangeException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception); | |||
} | |||
continue; | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since SetElement()
always get invoked in the same thread now, does that mean that this bit of code isn't needed anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole error handling code inside PushOne()
I mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we decided in Slack that we probably still need error-handling in both places since there can be a delay on FailStage
taking effect in the AsyncCallback
- we'd still want to check for errors whenever else we're calling PushOne
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ntinueWith` Replicates some of the behaviors and fixes we made to `SelectAsync` in akkadotnet#7521
Changes
close #7518
struct
assignments are not guaranteed to be atomic in .NET because each field, individually, has to be assigned - as explained by the brilliant Eric Lippert: https://ericlippert.com/2011/05/31/atomicity-volatility-and-immutability-are-different-part-two/This PR fixes an unsafe
struct
assignment that caused #7518Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
SelectAsync
stopped working even when async code block is guarded by a try...catch block #7518