Skip to content

Commit

Permalink
add allocation free path for close notifications when task state is s…
Browse files Browse the repository at this point in the history
…etup correctly
  • Loading branch information
Wraith2 committed Apr 9, 2023
1 parent a5ad838 commit 6286277
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 25 deletions.
Expand Up @@ -2550,7 +2550,10 @@ private Task<int> InternalExecuteNonQueryAsync(CancellationToken cancellationTok
SqlClientEventSource.Log.TryCorrelationTraceEvent("SqlCommand.InternalExecuteNonQueryAsync | API | Correlation | Object Id {0}, Activity Id {1}, Client Connection Id {2}, Command Text '{3}'", ObjectID, ActivityCorrelator.Current, Connection?.ClientConnectionId, CommandText);
Guid operationId = s_diagnosticListener.WriteCommandBefore(this, _transaction);

TaskCompletionSource<int> source = new TaskCompletionSource<int>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<int> source = new TaskCompletionSource<int>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand Down Expand Up @@ -2674,7 +2677,10 @@ private Task<SqlDataReader> InternalExecuteReaderAsync(CommandBehavior behavior,
operationId = s_diagnosticListener.WriteCommandBefore(this, _transaction);
}

TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand Down Expand Up @@ -2846,7 +2852,10 @@ private Task<XmlReader> InternalExecuteXmlReaderAsync(CancellationToken cancella
SqlClientEventSource.Log.TryCorrelationTraceEvent("SqlCommand.InternalExecuteXmlReaderAsync | API | Correlation | Object Id {0}, Activity Id {1}, Client Connection Id {2}, Command Text '{3}'", ObjectID, ActivityCorrelator.Current, Connection?.ClientConnectionId, CommandText);
Guid operationId = s_diagnosticListener.WriteCommandBefore(this, _transaction);

TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand Down
Expand Up @@ -2231,17 +2231,44 @@ private static void ChangePassword(string connectionString, SqlConnectionString
internal Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outerTask, object value, int tag)
{
// Connection exists, schedule removal, will be added to ref collection after calling ValidateAndReconnect

object state = null;
if (outerTask.AsyncState == this)
{
// if the caller created the TaskCompletionSource for outerTask with this connection
// as the state parameter (which is immutable) we can use task.AsyncState and state
// to carry the two pieces of state that we need into the continuation avoiding the
// allocation of a new state object to carry them
state = value;
}
else
{
// otherwise we need to create a Tuple to carry the two pieces of state
state = Tuple.Create(this, value);
}

return outerTask.ContinueWith(
continuationFunction: static (task, state) =>
{
Tuple<SqlConnection, object> parameters = (Tuple<SqlConnection, object>)state;
SqlConnection connection = parameters.Item1;
object obj = parameters.Item2;
SqlConnection connection = null;
object obj = null;
if (state is Tuple<SqlConnection, object> tuple)
{
// special state tuple, unpack it
connection = tuple.Item1;
obj = tuple.Item2;
}
else
{
// use state on task and state object
connection = (SqlConnection)task.AsyncState;
obj = state;
}
connection.RemoveWeakReference(obj);
return task;
},
state: Tuple.Create(this, value),
state: state,
scheduler: TaskScheduler.Default
).Unwrap();
}
Expand Down
Expand Up @@ -2310,7 +2310,7 @@ private Task ReadWriteColumnValueAsync(int col)
return writeTask;
}

private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
private Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outterTask)
{
SqlConnection connection = _connection;
if (connection == null)
Expand All @@ -2319,7 +2319,7 @@ private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
throw ADP.ClosedConnectionError();
}

connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.BulkCopyTag);
return connection.RegisterForConnectionCloseNotification(outterTask, this, SqlReferenceCollection.BulkCopyTag);
}

// Runs a loop to copy all columns of a single row.
Expand Down Expand Up @@ -3139,9 +3139,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
if (_isAsyncBulkCopy)
{
source = new TaskCompletionSource<object>(); // Creating the completion source/Task that we pass to application
resultTask = source.Task;

RegisterForConnectionCloseNotification(ref resultTask);
resultTask = RegisterForConnectionCloseNotification(source.Task);
}

if (_destinationTableName == null)
Expand Down
Expand Up @@ -2964,7 +2964,10 @@ private Task<int> InternalExecuteNonQueryAsync(CancellationToken cancellationTok
SqlClientEventSource.Log.TryCorrelationTraceEvent("<sc.SqlCommand.ExecuteNonQueryAsync|API|Correlation> ObjectID {0}, ActivityID {1}", ObjectID, ActivityCorrelator.Current);
SqlConnection.ExecutePermission.Demand();

TaskCompletionSource<int> source = new TaskCompletionSource<int>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<int> source = new TaskCompletionSource<int>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand All @@ -2980,7 +2983,7 @@ private Task<int> InternalExecuteNonQueryAsync(CancellationToken cancellationTok
Task<int> returnedTask = source.Task;
try
{
RegisterForConnectionCloseNotification(ref returnedTask);
returnedTask = RegisterForConnectionCloseNotification(returnedTask);

Task<int>.Factory.FromAsync(BeginExecuteNonQueryAsync, EndExecuteNonQueryAsync, null).ContinueWith((t) =>
{
Expand Down Expand Up @@ -3061,7 +3064,10 @@ private Task<SqlDataReader> InternalExecuteReaderAsync(CommandBehavior behavior,
SqlClientEventSource.Log.TryCorrelationTraceEvent("<sc.SqlCommand.ExecuteReaderAsync|API|Correlation> ObjectID {0}, behavior={1}, ActivityID {2}", ObjectID, (int)behavior, ActivityCorrelator.Current);
SqlConnection.ExecutePermission.Demand();

TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<SqlDataReader> source = new TaskCompletionSource<SqlDataReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand All @@ -3077,7 +3083,7 @@ private Task<SqlDataReader> InternalExecuteReaderAsync(CommandBehavior behavior,
Task<SqlDataReader> returnedTask = source.Task;
try
{
RegisterForConnectionCloseNotification(ref returnedTask);
returnedTask = RegisterForConnectionCloseNotification(returnedTask);

Task<SqlDataReader>.Factory.FromAsync(BeginExecuteReaderAsync, EndExecuteReaderAsync, behavior, null).ContinueWith((t) =>
{
Expand Down Expand Up @@ -3207,7 +3213,10 @@ private Task<XmlReader> InternalExecuteXmlReaderAsync(CancellationToken cancella
SqlClientEventSource.Log.TryCorrelationTraceEvent("<sc.SqlCommand.ExecuteXmlReaderAsync|API|Correlation> ObjectID {0}, ActivityID {1}", ObjectID, ActivityCorrelator.Current);
SqlConnection.ExecutePermission.Demand();

TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>();
// connection can be used as state in RegisterForConnectionCloseNotification continuation
// to avoid an allocation so use it as the state value if possible but it can be changed if
// you need it for a more important piece of data that justifies the tuple allocation later
TaskCompletionSource<XmlReader> source = new TaskCompletionSource<XmlReader>(_activeConnection);

CancellationTokenRegistration registration = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
Expand All @@ -3223,7 +3232,7 @@ private Task<XmlReader> InternalExecuteXmlReaderAsync(CancellationToken cancella
Task<XmlReader> returnedTask = source.Task;
try
{
RegisterForConnectionCloseNotification(ref returnedTask);
returnedTask = RegisterForConnectionCloseNotification(returnedTask);

Task<XmlReader>.Factory.FromAsync(BeginExecuteXmlReaderAsync, EndExecuteXmlReaderAsync, null).ContinueWith((t) =>
{
Expand Down Expand Up @@ -5814,7 +5823,7 @@ object ICloneable.Clone()
return Clone();
}

private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
private Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outterTask)
{
SqlConnection connection = _activeConnection;
if (connection == null)
Expand All @@ -5823,7 +5832,7 @@ private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask)
throw ADP.ClosedConnectionError();
}

connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.CommandTag);
return connection.RegisterForConnectionCloseNotification(outterTask, this, SqlReferenceCollection.CommandTag);
}

// validates that a command has commandText and a non-busy open connection
Expand Down
Expand Up @@ -2805,16 +2805,52 @@ private static void ChangePassword(string connectionString, SqlConnectionString
SqlConnectionFactory.SingletonInstance.ClearPool(key);
}

internal void RegisterForConnectionCloseNotification<T>(ref Task<T> outerTask, object value, int tag)
internal Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outerTask, object value, int tag)
{
// Connection exists, schedule removal, will be added to ref collection after calling ValidateAndReconnect
outerTask = outerTask.ContinueWith(task =>

object state = null;
if (outerTask.AsyncState == this)
{
// if the caller created the TaskCompletionSource for outerTask with this connection
// as the state parameter (which is immutable) we can use task.AsyncState and state
// to carry the two pieces of state that we need into the continuation avoiding the
// allocation of a new state object to carry them
state = value;
}
else
{
RemoveWeakReference(value);
return task;
}, TaskScheduler.Default).Unwrap();
// otherwise we need to create a Tuple to carry the two pieces of state
state = Tuple.Create(this, value);
}

return outerTask.ContinueWith(
continuationFunction: static (task, state) =>
{
SqlConnection connection = null;
object obj = null;
if (state is Tuple<SqlConnection, object> tuple)
{
// special state tuple, unpack it
connection = tuple.Item1;
obj = tuple.Item2;
}
else
{
// use state on task and state object
connection = (SqlConnection)task.AsyncState;
obj = state;
}
connection.RemoveWeakReference(obj);
return task;
},
state: state,
scheduler: TaskScheduler.Default
).Unwrap();
}


// updates our context with any changes made to the memory-mapped data by an external process
static private void RefreshMemoryMappedData(SqlDebugContext sdc)
{
Expand Down

0 comments on commit 6286277

Please sign in to comment.