Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Source/Flow Setup operator #6788

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ Combine the elements of multiple streams into a stream of sequences using a comb

**completes** when any upstream completes

### Setup

Defer the creation of a `Source` until materialization and access `ActorMaterializer` and `Attributes`.

Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.

## Sink Stages

These built-in sinks are available from ``Akka.Stream.DSL.Sink``:
Expand Down Expand Up @@ -615,6 +622,13 @@ Just like `Scan` but receiving a function that results in a `Task` to the next v

**completes** when upstream completes and the last `Task` is resolved

### Setup

Defer the creation of a `Flow` until materialization and access `ActorMaterializer` and `Attributes`.

Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.

### Aggregate

Start with current value ``zero`` and then apply the current and next value to the given function, when upstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<T, T, Akka.NotUsed> Identity<T>() { }
public static Akka.Streams.Dsl.Flow<T, T, TMat> Identity<T, TMat>() { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TOut, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>>> flowFactory) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<TMat>> Setup<TIn, TOut, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
}
public class static FlowOperations
{
Expand Down Expand Up @@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Never<T>() { }
public static Akka.Streams.Dsl.Source<T, Akka.Streams.ISourceQueueWithComplete<T>> Queue<T>(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Repeat<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<TMat>> Setup<T, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<T, TMat>> factory) { }
public static Akka.Streams.SourceShape<T> Shape<T>(string name) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Single<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, Akka.Actor.ICancelable> Tick<T>(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { }
Expand Down Expand Up @@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<T>>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupFlowStage<TIn, TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<TIn, TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupFlowStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
public Akka.Streams.Inlet<TIn> In { get; }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.FlowShape<TIn, TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupSourceStage<TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupSourceStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<TOut, TMat>> factory) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.SourceShape<TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation
{
public SignalThrewException(string message, System.Exception cause) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<T, T, Akka.NotUsed> Identity<T>() { }
public static Akka.Streams.Dsl.Flow<T, T, TMat> Identity<T, TMat>() { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<Akka.Util.Option<TMat>>> LazyInitAsync<TIn, TOut, TMat>(System.Func<System.Threading.Tasks.Task<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>>> flowFactory) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, System.Threading.Tasks.Task<TMat>> Setup<TIn, TOut, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
}
public class static FlowOperations
{
Expand Down Expand Up @@ -2033,6 +2034,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Never<T>() { }
public static Akka.Streams.Dsl.Source<T, Akka.Streams.ISourceQueueWithComplete<T>> Queue<T>(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Repeat<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<TMat>> Setup<T, TMat>(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<T, TMat>> factory) { }
public static Akka.Streams.SourceShape<T> Shape<T>(string name) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Single<T>(T element) { }
public static Akka.Streams.Dsl.Source<T, Akka.Actor.ICancelable> Tick<T>(System.TimeSpan initialDelay, System.TimeSpan interval, T tick) { }
Expand Down Expand Up @@ -3582,6 +3584,25 @@ namespace Akka.Streams.Implementation
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<T>>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupFlowStage<TIn, TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.FlowShape<TIn, TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupFlowStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> factory) { }
public Akka.Streams.Inlet<TIn> In { get; }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.FlowShape<TIn, TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class SetupSourceStage<TOut, TMat> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, System.Threading.Tasks.Task<TMat>>
{
public SetupSourceStage(System.Func<Akka.Streams.ActorMaterializer, Akka.Streams.Attributes, Akka.Streams.Dsl.Source<TOut, TMat>> factory) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public Akka.Streams.Outlet<TOut> Out { get; }
public override Akka.Streams.SourceShape<TOut> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<TMat>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
}
public class SignalThrewException : Akka.Pattern.IllegalStateException, Akka.Streams.Implementation.ISpecViolation
{
public SignalThrewException(string message, System.Exception cause) { }
Expand Down
172 changes: 172 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/SetupSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//-----------------------------------------------------------------------
// <copyright file="SeqSinkSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Akka.Streams.Dsl;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
public class SetupSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public SetupSpec(ITestOutputHelper helper)
: base(helper) => Materializer = ActorMaterializer.Create(Sys);

[Fact]
public void SourceSetup_should_expose_materializer()
{
var source = Source.Setup((mat, _) => Source.Single(mat.IsShutdown));
source.RunWith(Sink.First<bool>(), Materializer).Result.Should().BeFalse();
}

[Fact]
public void SourceSetup_should_expose_attributes()
{
var source = Source.Setup((_, attr) => Source.Single(attr.AttributeList));
source.RunWith(Sink.First<IEnumerable<Attributes.IAttribute>>(), Materializer).Result.Should().NotBeEmpty();
}

[Fact]
public void SourceSetup_should_propagate_materialized_value()
{
var source = Source.Setup((_, _) => Source.Maybe<NotUsed>());

var (completion, element) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);
completion.Result.TrySetResult(NotUsed.Instance);
element.Result.ShouldBe(NotUsed.Instance);
}

[Fact]
public void SourceSetup_should_propagate_attributes()
{
var source = Source.Setup((_, attr) => Source.Single(attr.GetNameLifted)).Named("my-name");
source.RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name");
}

[Fact]
public void SourceSetup_should_propagate_attributes_when_nested()
{
var source = Source.Setup((_, _) => Source.Setup((_, attr) => Source.Single(attr.GetNameLifted))).Named("my-name");
source.RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup");
}

[Fact]
public void SourceSetup_should_handle_factory_failure()
{
var error = new ApplicationException("boom");
var source = Source.Setup<NotUsed, NotUsed>((_, _) => throw error);

var (materialized, completion) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}

[Fact]
public void SourceSetup_should_handle_materialization_failure()
{
var error = new ApplicationException("boom");
var source = Source.Setup((_, _) => Source.Empty<NotUsed>().MapMaterializedValue<NotUsed>(_ => throw error));

var (materialized, completion) = source.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}

[Fact]
public void FlowSetup_should_expose_materializer()
{
var flow = Flow.Setup((mat, _) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(mat.IsShutdown)));

Source.Empty<object>().Via(flow).RunWith(Sink.First<bool>(), Materializer).Result.Should().BeFalse();
}

[Fact]
public void FlowSetup_should_expose_attributes()
{
var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(attr.AttributeList)));

Source.Empty<object>().Via(flow).RunWith(Sink.First<IEnumerable<Attributes.IAttribute>>(), Materializer).Result.Should().NotBeEmpty();
}

[Fact]
public void FlowSetup_should_propagate_materialized_value()
{
var flow = Flow.Setup((_, _) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Maybe<NotUsed>(), Keep.Right));

var (completion, element) = Source.Empty<object>()
.ViaMaterialized(flow, Keep.Right)
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both).Run(Materializer);

completion.Result.TrySetResult(NotUsed.Instance);
element.Result.ShouldBe(NotUsed.Instance);
}

[Fact]
public void FlowSetup_should_propagate_attributes()
{
var flow = Flow.Setup((_, attr) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(attr.GetNameLifted))).Named("my-name");

Source.Empty<object>().Via(flow).RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name");
}

[Fact]
public void FlowSetup_should_propagate_attributes_when_nested()
{
var flow = Flow.Setup((_, _) => Flow.Setup((_, attr) => Flow.FromSinkAndSource(
Sink.Ignore<object>().MapMaterializedValue(_ => NotUsed.Instance),
Source.Single(attr.GetNameLifted)))).Named("my-name");

Source.Empty<object>().Via(flow).RunWith(Sink.First<Func<string>>(), Materializer).Result.Invoke().ShouldBe("setup-my-name-setup");
}

[Fact]
public void FlowSetup_should_handle_factory_failure()
{
var error = new ApplicationException("boom");
var flow = Flow.Setup<NotUsed, NotUsed, NotUsed>((_, _) => throw error);

var (materialized, completion) = Source.Empty<NotUsed>()
.ViaMaterialized(flow, Keep.Right)
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both)
.Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}

[Fact]
public void FlowSetup_should_handle_materialization_failure()
{
var error = new ApplicationException("boom");
var flow = Flow.Setup((_, _) => Flow.Create<NotUsed>().MapMaterializedValue<NotUsed>(_ => throw error));

var (materialized, completion) = Source.Empty<NotUsed>()
.ViaMaterialized(flow, Keep.Right)
.ToMaterialized(Sink.First<NotUsed>(), Keep.Both)
.Run(Materializer);

Assert.Throws<AggregateException>(() => materialized.Result).InnerException?.Should().BeOfType<ApplicationException>();
Assert.Throws<AggregateException>(() => completion.Result).InnerException?.Should().BeOfType<ApplicationException>();
}
}
}
13 changes: 13 additions & 0 deletions src/core/Akka.Streams/Dsl/Flow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,19 @@ public static class Flow
public static Flow<TIn, TOut, TMat> FromGraph<TIn, TOut, TMat>(IGraph<FlowShape<TIn, TOut>, TMat> graph)
=> graph as Flow<TIn, TOut, TMat> ?? new Flow<TIn, TOut, TMat>(graph.Module);

/// <summary>
/// Defers the creation of a <see cref="Flow"/> until materialization. The <paramref name="factory"/>
/// function exposes <see cref="ActorMaterializer"/> which is going to be used during materialization and
/// <see cref="Attributes"/> of the <see cref="Flow"/> returned by this method.
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="factory">TBD</param>
/// <returns>TBD</returns>
public static Flow<TIn, TOut, Task<TMat>> Setup<TIn, TOut, TMat>(Func<ActorMaterializer, Attributes, Flow<TIn, TOut, TMat>> factory)
=> FromGraph(new SetupFlowStage<TIn, TOut, TMat>(factory));

/// <summary>
/// Creates a <see cref="Flow{TIn,TOut,TMat}"/> from a <see cref="Sink{TIn,TMat}"/> and a <see cref="Source{TOut,TMat}"/> where the flow's input
/// will be sent to the sink and the flow's output will come from the source.
Expand Down
12 changes: 12 additions & 0 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,18 @@ public static class Source
public static Source<T, TMat> FromGraph<T, TMat>(IGraph<SourceShape<T>, TMat> source)
=> source as Source<T, TMat> ?? new Source<T, TMat>(source.Module);

/// <summary>
/// Defers the creation of a <see cref="Source"/> until materialization. The <paramref name="factory"/>
/// function exposes <see cref="ActorMaterializer"/> which is going to be used during materialization and
/// <see cref="Attributes"/> of the <see cref="Source"/> returned by this method.
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="factory">TBD</param>
/// <returns>TBD</returns>
public static Source<T, Task<TMat>> Setup<T, TMat>(Func<ActorMaterializer, Attributes, Source<T, TMat>> factory)
=> FromGraph(new SetupSourceStage<T, TMat>(factory));

/// <summary>
/// Start a new <see cref="Source{TOut,TMat}"/> from the given <see cref="Task{T}"/>. The stream will consist of
/// one element when the <see cref="Task{T}"/> is completed with a successful value, which
Expand Down