Skip to content

Commit

Permalink
Streams: Fix SelectAsync race condition bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Sep 12, 2024
1 parent 6f937b5 commit 3ce0931
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2529,13 +2529,15 @@ private class Holder<T>
{
private readonly Action<Holder<T>> _callback;

public Holder(Result<T> element, Action<Holder<T>> callback)
public Holder(object message, Result<T> element, Action<Holder<T>> callback)
{
_callback = callback;
Message = message;
Element = element;
}

public Result<T> Element { get; private set; }
public object Message { get; }

public void SetElement(Result<T> result)
{
Expand Down Expand Up @@ -2575,7 +2577,7 @@ public override void OnPush()
try
{
var task = _stage._mapFunc(message);
var holder = new Holder<TOut>(NotYetThere, _taskCallback);
var holder = new Holder<TOut>(message, NotYetThere, _taskCallback);
_buffer.Enqueue(holder);

// We dispatch the task if it's ready to optimize away
Expand Down Expand Up @@ -2642,9 +2644,29 @@ private void PushOne()
}
else
{
var result = _buffer.Dequeue().Element;
var holder = _buffer.Dequeue();
var result = holder.Element;
if (!result.IsSuccess)
{
// this could happen if we are looping in PushOne and end up on a failed Task before the
// HolderCompleted callback has run
var strategy = _decider(result.Exception);
Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(result.Exception);
return;

case Directive.Resume:
case Directive.Restart:
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", result.Exception);
}
continue;
}

Push(_stage.Out, result.Value);
if (Todo < _stage._parallelism && !HasBeenPulled(inlet))
Expand Down

0 comments on commit 3ce0931

Please sign in to comment.