Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf: add allocation free path for close notifications #1198

Merged
merged 1 commit into from Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2550,7 +2550,10 @@ private Task<int> InternalExecuteNonQueryAsync(CancellationToken cancellationTok
SqlClientEventSource.Log.TryCorrelationTraceEvent("SqlCommand.InternalExecuteNonQueryAsync | API | Correlation | Object Id {0}, Activity Id {1}, Client Connection Id {2}, Command Text '{3}'", ObjectID, ActivityCorrelator.Current, Connection?.ClientConnectionId, CommandText);
Guid operationId = s_diagnosticListener.WriteCommandBefore(this, _transaction);

TaskCompletionSource<int> source = new TaskCompletionSource<int>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<int> source = new TaskCompletionSource<int>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand Down Expand Up @@ -2674,7 +2677,10 @@ private Task<SqlDataReader> InternalExecuteReaderAsync(CommandBehavior behavior,
operationId = s_diagnosticListener.WriteCommandBefore(this, _transaction);
}

TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand Down Expand Up @@ -2846,7 +2852,10 @@ private Task<XmlReader> InternalExecuteXmlReaderAsync(CancellationToken cancella
SqlClientEventSource.Log.TryCorrelationTraceEvent("SqlCommand.InternalExecuteXmlReaderAsync | API | Correlation | Object Id {0}, Activity Id {1}, Client Connection Id {2}, Command Text '{3}'", ObjectID, ActivityCorrelator.Current, Connection?.ClientConnectionId, CommandText);
Guid operationId = s_diagnosticListener.WriteCommandBefore(this, _transaction);

TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand Down
Expand Up @@ -2231,17 +2231,44 @@ private static void ChangePassword(string connectionString, SqlConnectionString
internal Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outerTask, object value, int tag)
{
// Connection exists, schedule removal, will be added to ref collection after calling ValidateAndReconnect

object state = null;
if (outerTask.AsyncState == this)
{
// if the caller created the TaskCompletionSource for outerTask with this connection
// as the state parameter (which is immutable) we can use task.AsyncState and state
// to carry the two pieces of state that we need into the continuation avoiding the
// allocation of a new state object to carry them
state = value;
}
else
{
// otherwise we need to create a Tuple to carry the two pieces of state
state = Tuple.Create(this, value);
}

return outerTask.ContinueWith(
continuationFunction: static (task, state) =>
{
Tuple<SqlConnection, object> parameters = (Tuple<SqlConnection, object>)state;
SqlConnection connection = parameters.Item1;
object obj = parameters.Item2;
SqlConnection connection = null;
object obj = null;
if (state is Tuple<SqlConnection, object> tuple)
{
// special state tuple, unpack it
connection = tuple.Item1;
obj = tuple.Item2;
}
else
{
// use state on task and state object
connection = (SqlConnection)task.AsyncState;
obj = state;
}

connection.RemoveWeakReference(obj);
return task;
},
state: Tuple.Create(this, value),
state: state,
scheduler: TaskScheduler.Default
).Unwrap();
}
Expand Down
Expand Up @@ -2310,7 +2310,7 @@ private Task ReadWriteColumnValueAsync(int col)
return writeTask;
}

private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
private Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outterTask)
{
SqlConnection connection = _connection;
if (connection == null)
Expand All @@ -2319,7 +2319,7 @@ private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
throw ADP.ClosedConnectionError();
}

connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.BulkCopyTag);
return connection.RegisterForConnectionCloseNotification(outterTask, this, SqlReferenceCollection.BulkCopyTag);
}

// Runs a loop to copy all columns of a single row.
Expand Down Expand Up @@ -3139,9 +3139,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
if (_isAsyncBulkCopy)
{
source = new TaskCompletionSource<object>(); // Creating the completion source/Task that we pass to application
resultTask = source.Task;

RegisterForConnectionCloseNotification(ref resultTask);
resultTask = RegisterForConnectionCloseNotification(source.Task);
}

if (_destinationTableName == null)
Expand Down
Expand Up @@ -2964,7 +2964,10 @@ private Task<int> InternalExecuteNonQueryAsync(CancellationToken cancellationTok
SqlClientEventSource.Log.TryCorrelationTraceEvent("<sc.SqlCommand.ExecuteNonQueryAsync|API|Correlation> ObjectID {0}, ActivityID {1}", ObjectID, ActivityCorrelator.Current);
SqlConnection.ExecutePermission.Demand();

TaskCompletionSource<int> source = new TaskCompletionSource<int>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<int> source = new TaskCompletionSource<int>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand All @@ -2980,7 +2983,7 @@ private Task<int> InternalExecuteNonQueryAsync(CancellationToken cancellationTok
Task<int> returnedTask = source.Task;
try
{
RegisterForConnectionCloseNotification(ref returnedTask);
returnedTask = RegisterForConnectionCloseNotification(returnedTask);

Task<int>.Factory.FromAsync(BeginExecuteNonQueryAsync, EndExecuteNonQueryAsync, null).ContinueWith((t) =>
{
Expand Down Expand Up @@ -3061,7 +3064,10 @@ private Task<SqlDataReader> InternalExecuteReaderAsync(CommandBehavior behavior,
SqlClientEventSource.Log.TryCorrelationTraceEvent("<sc.SqlCommand.ExecuteReaderAsync|API|Correlation> ObjectID {0}, behavior={1}, ActivityID {2}", ObjectID, (int)behavior, ActivityCorrelator.Current);
SqlConnection.ExecutePermission.Demand();

TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand All @@ -3077,7 +3083,7 @@ private Task<SqlDataReader> InternalExecuteReaderAsync(CommandBehavior behavior,
Task<SqlDataReader> returnedTask = source.Task;
try
{
RegisterForConnectionCloseNotification(ref returnedTask);
returnedTask = RegisterForConnectionCloseNotification(returnedTask);

Task<SqlDataReader>.Factory.FromAsync(BeginExecuteReaderAsync, EndExecuteReaderAsync, behavior, null).ContinueWith((t) =>
{
Expand Down Expand Up @@ -3207,7 +3213,10 @@ private Task<XmlReader> InternalExecuteXmlReaderAsync(CancellationToken cancella
SqlClientEventSource.Log.TryCorrelationTraceEvent("<sc.SqlCommand.ExecuteXmlReaderAsync|API|Correlation> ObjectID {0}, ActivityID {1}", ObjectID, ActivityCorrelator.Current);
SqlConnection.ExecutePermission.Demand();

TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand All @@ -3223,7 +3232,7 @@ private Task<XmlReader> InternalExecuteXmlReaderAsync(CancellationToken cancella
Task<XmlReader> returnedTask = source.Task;
try
{
RegisterForConnectionCloseNotification(ref returnedTask);
returnedTask = RegisterForConnectionCloseNotification(returnedTask);

Task<XmlReader>.Factory.FromAsync(BeginExecuteXmlReaderAsync, EndExecuteXmlReaderAsync, null).ContinueWith((t) =>
{
Expand Down Expand Up @@ -5814,7 +5823,7 @@ object ICloneable.Clone()
return Clone();
}

private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
private Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outterTask)
{
SqlConnection connection = _activeConnection;
if (connection == null)
Expand All @@ -5823,7 +5832,7 @@ private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
throw ADP.ClosedConnectionError();
}

connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.CommandTag);
return connection.RegisterForConnectionCloseNotification(outterTask, this, SqlReferenceCollection.CommandTag);
}

// validates that a command has commandText and a non-busy open connection
Expand Down
Expand Up @@ -2805,16 +2805,52 @@ private static void ChangePassword(string connectionString, SqlConnectionString
SqlConnectionFactory.SingletonInstance.ClearPool(key);
}

internal void RegisterForConnectionCloseNotification<T>(ref Task<T> outerTask, object value, int tag)
internal Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outerTask, object value, int tag)
{
// Connection exists, schedule removal, will be added to ref collection after calling ValidateAndReconnect
outerTask = outerTask.ContinueWith(task =>

object state = null;
if (outerTask.AsyncState == this)
{
// if the caller created the TaskCompletionSource for outerTask with this connection
// as the state parameter (which is immutable) we can use task.AsyncState and state
// to carry the two pieces of state that we need into the continuation avoiding the
// allocation of a new state object to carry them
state = value;
}
else
{
RemoveWeakReference(value);
return task;
}, TaskScheduler.Default).Unwrap();
// otherwise we need to create a Tuple to carry the two pieces of state
state = Tuple.Create(this, value);
}

return outerTask.ContinueWith(
continuationFunction: static (task, state) =>
{
SqlConnection connection = null;
object obj = null;
if (state is Tuple<SqlConnection, object> tuple)
{
// special state tuple, unpack it
connection = tuple.Item1;
obj = tuple.Item2;
}
else
{
// use state on task and state object
connection = (SqlConnection)task.AsyncState;
obj = state;
}

connection.RemoveWeakReference(obj);
return task;
},
state: state,
scheduler: TaskScheduler.Default
).Unwrap();
}


// updates our context with any changes made to the memory-mapped data by an external process
static private void RefreshMemoryMappedData(SqlDebugContext sdc)
{
Expand Down