Skip to content

Commit

Permalink
Added Source/Flow Setup operator (#6788)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
ismaelhamed and Aaronontheweb committed Jun 15, 2023
1 parent 74c59c5 commit 5fa8ef4
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 1 deletion.
14 changes: 14 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ Combine the elements of multiple streams into a stream of sequences using a comb

**completes** when any upstream completes

### Setup

Defer the creation of a `Source` until materialization and access `ActorMaterializer` and `Attributes`.

Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.

## Sink Stages

These built-in sinks are available from ``Akka.Stream.DSL.Sink``:
Expand Down Expand Up @@ -615,6 +622,13 @@ Just like `Scan` but receiving a function that results in a `Task` to the next v

**completes** when upstream completes and the last `Task` is resolved

### Setup

Defer the creation of a `Flow` until materialization and access `ActorMaterializer` and `Attributes`.

Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.

### Aggregate

Start with current value ``zero`` and then apply the current and next value to the given function, when upstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<T, T, Akka.NotUsed> Identity<T>() { }
public static Akka.Streams.Dsl.Flow<T, T, TMat> Identity<T, TMat>() { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TOut, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>>> flowFactory) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<TMat>> Setup<TIn, TOut, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
}
public class static FlowOperations
{
Expand Down Expand Up @@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Never<T>() { }
public static Akka.Streams.Dsl.Source<T, Akka.Streams.ISourceQueueWithComplete<T>> Queue<T>(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Repeat<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<TMat>> Setup<T, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<T, TMat>> factory) { }
public static Akka.Streams.SourceShape<T> Shape<T>(string name) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Single<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, Akka.Actor.ICancelable> Tick<T>(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { }
Expand Down Expand Up @@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<T>>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupFlowStage<TIn, TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<TIn, TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupFlowStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
public Akka.Streams.Inlet<TIn> In { get; }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.FlowShape<TIn, TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupSourceStage<TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupSourceStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<TOut, TMat>> factory) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.SourceShape<TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation
{
public SignalThrewException(string message, System.Exception cause) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<T, T, Akka.NotUsed> Identity<T>() { }
public static Akka.Streams.Dsl.Flow<T, T, TMat> Identity<T, TMat>() { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TOut, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>>> flowFactory) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<TMat>> Setup<TIn, TOut, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
}
public class static FlowOperations
{
Expand Down Expand Up @@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Never<T>() { }
public static Akka.Streams.Dsl.Source<T, Akka.Streams.ISourceQueueWithComplete<T>> Queue<T>(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Repeat<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<TMat>> Setup<T, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<T, TMat>> factory) { }
public static Akka.Streams.SourceShape<T> Shape<T>(string name) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Single<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, Akka.Actor.ICancelable> Tick<T>(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { }
Expand Down Expand Up @@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<T>>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupFlowStage<TIn, TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<TIn, TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupFlowStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
public Akka.Streams.Inlet<TIn> In { get; }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.FlowShape<TIn, TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupSourceStage<TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupSourceStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<TOut, TMat>> factory) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.SourceShape<TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation
{
public SignalThrewException(string message, System.Exception cause) { }
Expand Down
172 changes: 172 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/SetupSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//-----------------------------------------------------------------------
// <copyright file="SeqSinkSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Akka.Streams.Dsl;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
public class SetupSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public SetupSpec(ITestOutputHelper helper)
: base(helper) => Materializer = ActorMaterializer.Create(Sys);

[Fact]
public void SourceSetup_should_expose_materializer()
{
var source = Source.Setup((mat, _) => Source.Single(mat.IsShutdown));
source.RunWith(Sink.First<bool>(), Materializer).Result.Should().BeFalse();
}

[Fact]
public void SourceSetup_should_expose_attributes()
{
var source = Source.Setup((_, attr) => Source.Single(attr.AttributeList));
source.RunWith(Sink.First<IEnumerable<Attributes.IAttribute>>(), Materializer).Result.Should().NotBeEmpty();
}

[Fact]
public void SourceSetup_should_propagate_materialized_value()
{
var source = Source.Setup((_, _) => Source.Maybe<NotUsed>());

var (completion, element) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);
completion.Result.TrySetResult(NotUsed.Instance);
element.Result.ShouldBe(NotUsed.Instance);
}

[Fact]
public void SourceSetup_should_propagate_attributes()
{
var source = Source.Setup((_, attr) => Source.Single(attr.GetNameLifted)).Named("my-name");
source.RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name");
}

[Fact]
public void SourceSetup_should_propagate_attributes_when_nested()
{
var source = Source.Setup((_, _) => Source.Setup((_, attr) => Source.Single(attr.GetNameLifted))).Named("my-name");
source.RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup");
}

[Fact]
public void SourceSetup_should_handle_factory_failure()
{
var error = new ApplicationException("boom");
var source = Source.Setup<NotUsed, NotUsed>((_, _) => throw error);

var (materialized, completion) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}

[Fact]
public void SourceSetup_should_handle_materialization_failure()
{
var error = new ApplicationException("boom");
var source = Source.Setup((_, _) => Source.Empty<NotUsed>().MapMaterializedValue<NotUsed>(_ => throw error));

var (materialized, completion) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}

[Fact]
public void FlowSetup_should_expose_materializer()
{
var flow = Flow.Setup((mat, _) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(mat.IsShutdown)));

Source.Empty<object>().Via(flow).RunWith(Sink.First<bool>(), Materializer).Result.Should().BeFalse();
}

[Fact]
public void FlowSetup_should_expose_attributes()
{
var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(attr.AttributeList)));

Source.Empty<object>().Via(flow).RunWith(Sink.First<IEnumerable<Attributes.IAttribute>>(), Materializer).Result.Should().NotBeEmpty();
}

[Fact]
public void FlowSetup_should_propagate_materialized_value()
{
var flow = Flow.Setup((_, _) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Maybe<NotUsed>(), Keep.Right));

var (completion, element) = Source.Empty<object>()
.ViaMaterialized(flow, Keep.Right)
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);

completion.Result.TrySetResult(NotUsed.Instance);
element.Result.ShouldBe(NotUsed.Instance);
}

[Fact]
public void FlowSetup_should_propagate_attributes()
{
var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(attr.GetNameLifted))).Named("my-name");

Source.Empty<object>().Via(flow).RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name");
}

[Fact]
public void FlowSetup_should_propagate_attributes_when_nested()
{
var flow = Flow.Setup((_, _) => Flow.Setup((_, attr) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(attr.GetNameLifted)))).Named("my-name");

Source.Empty<object>().Via(flow).RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup");
}

[Fact]
public void FlowSetup_should_handle_factory_failure()
{
var error = new ApplicationException("boom");
var flow = Flow.Setup<NotUsed, NotUsed, NotUsed>((_, _) => throw error);

var (materialized, completion) = Source.Empty<NotUsed>()
.ViaMaterialized(flow, Keep.Right)
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both)
.Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}

[Fact]
public void FlowSetup_should_handle_materialization_failure()
{
var error = new ApplicationException("boom");
var flow = Flow.Setup((_, _) => Flow.Create<NotUsed>().MapMaterializedValue<NotUsed>(_ => throw error));

var (materialized, completion) = Source.Empty<NotUsed>()
.ViaMaterialized(flow, Keep.Right)
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both)
.Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}
}
}
13 changes: 13 additions & 0 deletions src/core/Akka.Streams/Dsl/Flow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,19 @@ public static class Flow
public static Flow<TIn, TOut, TMat> FromGraph<TIn, TOut, TMat>(IGraph<FlowShape<TIn, TOut>, TMat> graph)
=> graph as Flow<TIn, TOut, TMat> ?? new Flow<TIn, TOut, TMat>(graph.Module);

/// <summary>
/// Defers the creation of a <see cref="Flow"/> until materialization. The <paramref name="factory"/>
/// function exposes <see cref="ActorMaterializer"/> which is going to be used during materialization and
/// <see cref="Attributes"/> of the <see cref="Flow"/> returned by this method.
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="factory">TBD</param>
/// <returns>TBD</returns>
public static Flow<TIn, TOut, Task<TMat>> Setup<TIn, TOut, TMat>(Func<ActorMaterializer, Attributes, Flow<TIn, TOut, TMat>> factory)
=> FromGraph(new SetupFlowStage<TIn, TOut, TMat>(factory));

/// <summary>
/// Creates a <see cref="Flow{TIn,TOut,TMat}"/> from a <see cref="Sink{TIn,TMat}"/> and a <see cref="Source{TOut,TMat}"/> where the flow's input
/// will be sent to the sink and the flow's output will come from the source.
Expand Down
12 changes: 12 additions & 0 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,18 @@ public static class Source
public static Source<T, TMat> FromGraph<T, TMat>(IGraph<SourceShape<T>, TMat> source)
=> source as Source<T, TMat> ?? new Source<T, TMat>(source.Module);

/// <summary>
/// Defers the creation of a <see cref="Source"/> until materialization. The <paramref name="factory"/>
/// function exposes <see cref="ActorMaterializer"/> which is going to be used during materialization and
/// <see cref="Attributes"/> of the <see cref="Source"/> returned by this method.
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="factory">TBD</param>
/// <returns>TBD</returns>
public static Source<T, Task<TMat>> Setup<T, TMat>(Func<ActorMaterializer, Attributes, Source<T, TMat>> factory)
=> FromGraph(new SetupSourceStage<T, TMat>(factory));

/// <summary>
/// Start a new <see cref="Source{TOut,TMat}"/> from the given <see cref="Task{T}"/>. The stream will consist of
/// one element when the <see cref="Task{T}"/> is completed with a successful value, which
Expand Down

0 comments on commit 5fa8ef4

Please sign in to comment.