Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove some state machines #6787

Merged
merged 1 commit into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,12 @@ public abstract class AbstractQueryExecutor : IJournalQueryExecutor
/// <param name="cancellationToken">TBD</param>
/// <param name="offset">TBD</param>
/// <returns>TBD</returns>
public virtual async Task<ImmutableArray<string>> SelectAllPersistenceIdsAsync(
DbConnection connection,
public virtual Task<ImmutableArray<string>> SelectAllPersistenceIdsAsync(
DbConnection connection,
CancellationToken cancellationToken,
long offset)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, AllPersistenceIdsSql);
command.Transaction = tx;
Expand Down Expand Up @@ -685,15 +685,15 @@ public abstract class AbstractQueryExecutor : IJournalQueryExecutor
});
}

public virtual async Task<long> SelectAllEventsAsync(
public virtual Task<long> SelectAllEventsAsync(
DbConnection connection,
CancellationToken cancellationToken,
CancellationToken cancellationToken,
long fromOffset,
long toOffset,
long max,
long max,
Action<ReplayedEvent> callback)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
long maxOrdering;
using (var command = GetCommand(connection, HighestOrderingSql))
Expand Down Expand Up @@ -736,9 +736,9 @@ public abstract class AbstractQueryExecutor : IJournalQueryExecutor
/// <param name="cancellationToken">TBD</param>
/// <param name="persistenceId">TBD</param>
/// <returns>TBD</returns>
public virtual async Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId)
public virtual Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, HighestSequenceNrSql);
command.Transaction = tx;
Expand All @@ -749,9 +749,9 @@ public virtual async Task<long> SelectHighestSequenceNrAsync(DbConnection connec
});
}

public virtual async Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken)
public virtual Task<long> SelectHighestSequenceNrAsync(DbConnection connection, CancellationToken cancellationToken)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, HighestOrderingSql);
command.Transaction = tx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,18 +548,18 @@ protected virtual void SetManifestParameters(object snapshot, DbCommand command)
/// <param name="maxSequenceNr">TBD</param>
/// <param name="maxTimestamp">TBD</param>
/// <returns>TBD</returns>
public virtual async Task<SelectedSnapshot> SelectSnapshotAsync(
public virtual Task<SelectedSnapshot> SelectSnapshotAsync(
DbConnection connection,
CancellationToken cancellationToken,
string persistenceId,
long maxSequenceNr,
DateTime maxTimestamp)
{
return await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
return connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var command = GetCommand(connection, SelectSnapshotSql);
command.Transaction = tx;

SetPersistenceIdParameter(persistenceId, command);
SetSequenceNrParameter(maxSequenceNr, command);
SetTimestampParameter(maxTimestamp, command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,23 @@ private async Task<IActorRef> Here()
return identity.Subject;
}

private async Task<bool> Throttle(ThrottleTransportAdapter.Direction direction, ThrottleMode mode)
private Task<bool> Throttle(ThrottleTransportAdapter.Direction direction, ThrottleMode mode)
{
var rootBAddress = new Address("akka", "systemB", "localhost", RootB.Address.Port.Value);
var transport =
Sys.AsInstanceOf<ExtendedActorSystem>().Provider.AsInstanceOf<RemoteActorRefProvider>().Transport;
return await transport.ManagementCommand(new SetThrottle(rootBAddress, direction, mode))

return transport.ManagementCommand(new SetThrottle(rootBAddress, direction, mode))
.ShouldCompleteWithin(DefaultTimeout);
}

private async Task<bool> Disassociate()
private Task<bool> Disassociate()
{
var rootBAddress = new Address("akka", "systemB", "localhost", RootB.Address.Port.Value);
var transport =
Sys.AsInstanceOf<ExtendedActorSystem>().Provider.AsInstanceOf<RemoteActorRefProvider>().Transport;
return await transport.ManagementCommand(new ForceDisassociate(rootBAddress))

return transport.ManagementCommand(new ForceDisassociate(rootBAddress))
.ShouldCompleteWithin(DefaultTimeout);
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ public override async Task<(Address, TaskCompletionSource<IAssociationEventListe
}
}

public override async Task<AssociationHandle> Associate(Address remoteAddress)
public override Task<AssociationHandle> Associate(Address remoteAddress)
{
if (!ServerChannel.Open)
throw new ChannelException("Transport is not open");

return await AssociateInternal(remoteAddress).ConfigureAwait(false);
return AssociateInternal(remoteAddress);
}

protected abstract Task<AssociationHandle> AssociateInternal(Address remoteAddress);
Expand Down
35 changes: 15 additions & 20 deletions src/core/Akka.TestKit/EventFilter/Internal/EventFilterApplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,16 @@ public T ExpectOne<T>(Func<T> func, CancellationToken cancellationToken = defaul
/// <summary>
/// Async version of ExpectOne
/// </summary>
public async Task<T> ExpectOneAsync<T>(
public Task<T> ExpectOneAsync<T>(
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: null,
expectedOccurrences: 1,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -262,19 +261,18 @@ public T ExpectOne<T>(Func<T> func, CancellationToken cancellationToken = defaul
/// <summary>
/// Async version of ExpectOne
/// </summary>
public async Task<T> ExpectOneAsync<T>(
public Task<T> ExpectOneAsync<T>(
TimeSpan timeout,
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: timeout,
expectedOccurrences: 1,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -297,19 +295,18 @@ public T ExpectOne<T>(Func<T> func, CancellationToken cancellationToken = defaul
/// <summary>
/// Async version of Expect
/// </summary>
public async Task<T> ExpectAsync<T>(
public Task<T> ExpectAsync<T>(
int expectedCount,
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: null,
expectedOccurrences: expectedCount,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -335,20 +332,19 @@ public T ExpectOne<T>(Func<T> func, CancellationToken cancellationToken = defaul
/// Async version of Expect
/// Note: <paramref name="func"/> might not get awaited.
/// </summary>
public async Task<T> ExpectAsync<T>(
public Task<T> ExpectAsync<T>(
int expectedCount,
TimeSpan timeout,
Func<Task<T>> func,
CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: timeout,
expectedOccurrences: expectedCount,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand All @@ -367,16 +363,15 @@ public T Mute<T>(Func<T> func, CancellationToken cancellationToken = default)
/// <summary>
/// Async version of Mute
/// </summary>
public async Task<T> MuteAsync<T>(Func<Task<T>> func, CancellationToken cancellationToken = default)
public Task<T> MuteAsync<T>(Func<Task<T>> func, CancellationToken cancellationToken = default)
{
return await InterceptAsync(
return InterceptAsync(
func: func,
system: _actorSystem,
timeout: null,
expectedOccurrences: null,
matchedEventHandler: null,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
cancellationToken: cancellationToken);
}

/// <summary>
Expand Down