Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement query throttling #485

Merged
merged 5 commits into from
Oct 23, 2024

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Oct 8, 2024

Implement query throttling scheme, similar to akkadotnet/akka.net#6436

Changes

  • Add new akka.persistence.query.journal.sql.max-concurrent-queries HOCON setting
  • Port QueryThrottler class
  • Make sure that all query DB operations are guarded by throttle permit
  • Add Hosting implementation
  • Add unit test

Sorry, something went wrong.

@Arkatufus Arkatufus marked this pull request as draft October 8, 2024 07:03
Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design self-review

Comment on lines 57 to 65
await queryPermitter.Ask<QueryStartGranted>(RequestQueryStart.Instance);
try
{
return await factory.ExecuteWithTransactionAsync(level, token, handler);
}
finally
{
queryPermitter.Tell(ReturnQueryStart.Instance);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the query throttling mechanism, we always ask for a permission before we execute any DB operations

Comment on lines 104 to 112
await queryPermitter.Ask<QueryStartGranted>(RequestQueryStart.Instance);
try
{
return await factory.ConnectionFactory.ExecuteWithTransactionAsync(state, factory.IsolationLevel, factory.ShutdownToken, handler);
}
finally
{
queryPermitter.Tell(ReturnQueryStart.Instance);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the query throttling mechanism, we always ask for a permission before we execute any DB operations

static async input =>
{
return await input._dbStateHolder.ExecuteWithTransactionAsync(
return await input._dbStateHolder.ExecuteQueryWithTransactionAsync(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace all ExecuteWithTransactionAsync() method call inside the read journal with ExecuteQueryWithTransactionAsync() to make sure that all DB queries are throttled

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add query timeouts

@@ -142,7 +145,8 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
DatabaseMapping? databaseMapping = null,
TagMode? tagStorageMode = null,
bool? deleteCompatibilityMode = null,
bool? useWriterUuidColumn = null)
bool? useWriterUuidColumn = null,
int? maxConcurrentQueries = null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically a breaking change without a method overload, but I don't think there's any other meta-packages built on top of Akka.Persistence.Sql.Hosting so it's probably fine as it's source-compatible

CancellationToken token,
Func<AkkaDataConnection, CancellationToken, Task<T>> handler)
{
await queryPermitter.Ask<QueryStartGranted>(RequestQueryStart.Instance);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use a timeout or a CTS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

TState state,
Func<AkkaDataConnection, CancellationToken, TState, Task<T>> handler)
{
return factory.ConnectionFactory.ExecuteWithTransactionAsync(state, factory.IsolationLevel, factory.ShutdownToken, handler);
await queryPermitter.Ask<QueryStartGranted>(RequestQueryStart.Instance);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use a timeout or a CTS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Arkatufus Arkatufus marked this pull request as ready for review October 16, 2024 19:05
@Aaronontheweb Aaronontheweb merged commit 9fcd553 into akkadotnet:dev Oct 23, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants