Skip to content

Commit

Permalink
Fix timing issue in parallel execution (#4629)
Browse files Browse the repository at this point in the history
  • Loading branch information
nohwnd committed Aug 2, 2023
1 parent f60b983 commit 9a0c418
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 49 deletions.
Expand Up @@ -9,6 +9,7 @@

using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;

namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client;

Expand Down Expand Up @@ -63,6 +64,8 @@ public ParallelOperationManager(Func<TestRuntimeProviderInfo, TWorkload, TManage

private void ClearSlots(bool acceptMoreWork)
{
EqtTrace.Verbose($"ParallelOperationManager.ClearSlots: Clearing all slots. Slots should accept more work: {acceptMoreWork}");

lock (_lock)
{
_acceptMoreWork = acceptMoreWork;
Expand All @@ -76,6 +79,13 @@ private void SetOccupiedSlotCount()
{
AvailableSlotCount = _managerSlots.Count(s => !s.HasWork);
OccupiedSlotCount = _managerSlots.Count - AvailableSlotCount;

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.SetOccupiedSlotCount: Setting slot counts AvailableSlotCount = {AvailableSlotCount}, OccupiedSlotCount = {OccupiedSlotCount}.");
EqtTrace.Verbose($"Occupied slots:\n{(string.Join("\n", _managerSlots.Where(s => s.HasWork).Select((slot) => $"{slot.Index}: {GetSourcesForSlotExpensive(slot)}").ToArray()))}");

}
}

public void StartWork(
Expand All @@ -91,6 +101,7 @@ private void SetOccupiedSlotCount()
_initializeWorkload = initializeWorkload ?? throw new ArgumentNullException(nameof(initializeWorkload));
_runWorkload = runWorkload ?? throw new ArgumentNullException(nameof(runWorkload));

EqtTrace.Verbose($"ParallelOperationManager.StartWork: Starting adding {workloads.Count} workloads.");
_workloads.AddRange(workloads);

ClearSlots(acceptMoreWork: true);
Expand Down Expand Up @@ -123,7 +134,10 @@ private bool RunWorkInParallel()
// so when it is allowed to enter it will try to add more work, but we already cancelled,
// so we should not start more work.
if (!_acceptMoreWork)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We don't accept more work, returning false.");
return false;
}

// We grab all empty slots.
var availableSlots = _managerSlots.Where(slot => !slot.HasWork).ToImmutableArray();
Expand All @@ -136,11 +150,10 @@ private bool RunWorkInParallel()
var workloadsToAdd = availableWorkloads.Take(amount).ToImmutableArray();

// We associate each workload to a slot, if we reached the max parallel
// level, then we will run only initalize step of the given workload.
// level, then we will run only initialize step of the given workload.
for (int i = 0; i < amount; i++)
{
var slot = availableSlots[i];
slot.HasWork = true;
var workload = workloadsToAdd[i];
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;

Expand All @@ -152,6 +165,13 @@ private bool RunWorkInParallel()
slot.Work = workload.Work;

_workloads.Remove(workload);

EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Adding 1 workload to slot, remaining workloads {_workloads.Count}.");

// This must be set last, every loop below looks at this property,
// and they can do so from a different thread. So if we mark it as HasWork before actually assigning the work
// we can pick up the slot, but it has no associated work yet.
slot.HasWork = true;
}

slots = _managerSlots.ToArray();
Expand All @@ -172,12 +192,16 @@ private bool RunWorkInParallel()
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host for work (source) {GetSourcesForSlotExpensive(slot)}: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
}
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);

// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
break;
}
}
Expand All @@ -194,14 +218,18 @@ private bool RunWorkInParallel()
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Started work on a host.");
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Started host in slot number {slot.Index} for work (source): {GetSourcesForSlotExpensive(slot)}.");
}
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
}
}

// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
break;
}
}
Expand All @@ -215,14 +243,19 @@ private bool RunWorkInParallel()
preStartedWork++;
slot.PreStartTime = DateTime.Now.TimeOfDay;
slot.IsPreStarted = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Pre-starting a host.");
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Pre-starting a host for work (source): {GetSourcesForSlotExpensive(slot)}.");
}
slot.InitTask = _initializeWorkload!(slot.Manager!, slot.EventHandler!, slot.Work!);
}
}

// Return true when we started more work. Or false, when there was nothing more to do.
// This will propagate to handling of partial discovery or partial run.
return preStartedWork + startedWork > 0;
var weAddedMoreWork = preStartedWork + startedWork > 0;
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {preStartedWork + startedWork} work items in here, returning {weAddedMoreWork}.");
return weAddedMoreWork;
}

public bool RunNextWork(TManager completedManager)
Expand Down Expand Up @@ -258,6 +291,10 @@ private void ClearCompletedSlot(TManager completedManager)
throw new InvalidOperationException("The provided manager was found in multiple slots.");
}

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.ClearCompletedSlot: Clearing slot number {completedSlot[0].Index} with work (source): {GetSourcesForSlotExpensive(completedSlot[0])}.");
}
var slot = completedSlot[0];
slot.PreStartTime = TimeSpan.Zero;
slot.Work = default(TWorkload);
Expand All @@ -273,8 +310,14 @@ private void ClearCompletedSlot(TManager completedManager)
}
}

private static string GetSourcesForSlotExpensive(ParallelOperationManager<TManager, TEventHandler, TWorkload>.Slot slot)
{
return string.Join(", ", (slot.Work as DiscoveryCriteria)?.Sources ?? (slot.Work as TestRunCriteria)?.Sources ?? Array.Empty<string>());
}

public void DoActionOnAllManagers(Action<TManager> action, bool doActionsInParallel = false)
{
EqtTrace.Verbose($"ParallelOperationManager.DoActionOnAllManagers: Calling an action on all managers.");
// We don't need to lock here, we just grab the current list of
// slots that are occupied (have managers) and run action on each one of them.
var managers = _managerSlots.Where(slot => slot.HasWork).Select(slot => slot.Manager).ToImmutableArray();
Expand Down Expand Up @@ -320,11 +363,13 @@ private static void DoManagerAction(Action action)

internal void StopAllManagers()
{
EqtTrace.Verbose($"ParallelOperationManager.StopAllManagers: Stopping all managers.");
ClearSlots(acceptMoreWork: false);
}

public void Dispose()
{
EqtTrace.Verbose($"ParallelOperationManager.Dispose: Disposing all managers.");
ClearSlots(acceptMoreWork: false);
}

Expand Down
Expand Up @@ -287,6 +287,12 @@ private Task InitializeDiscoverTestsOnConcurrentManager(IProxyDiscoveryManager p
bool initialized,
Task? task)
{
// If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before.
if (discoveryCriteria == null)
{
throw new ArgumentNullException(nameof(discoveryCriteria));
}

// Kick off another discovery task for the next source
Task.Run(() =>
{
Expand Down
Expand Up @@ -159,11 +159,12 @@ public void Close()
? _runCompletedClients == _runStartedClients
: _runCompletedClients == _availableWorkloads;

EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total completed clients = {0}, Run complete = {1}, Run canceled: {2}.", _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled);
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total workloads = {0}, Total started clients = {1} Total completed clients = {2}, Run complete = {3}, Run canceled: {4}.", _availableWorkloads, _runStartedClients, _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled);
}

if (allRunsCompleted)
{
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: All runs completed stopping all managers.");
_parallelOperationManager.StopAllManagers();
return true;
}
Expand All @@ -185,8 +186,13 @@ public void Close()
// {
// return true;
// }
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Not cancelled or aborted, running next work.");
var _ = _parallelOperationManager.RunNextWork(proxyExecutionManager);
}
else
{
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Cancelled or aborted, not running next work.");
}

return false;
}
Expand Down Expand Up @@ -403,7 +409,7 @@ private Task PrepareTestRunOnConcurrentManager(IProxyExecutionManager proxyExecu
// clients to be done running their workloads when aborting/cancelling and that doesn't
// happen with an initialized workload that is never run.
//
// Interlocked.Increment(ref _runStartedClients);
// Interlocked.Increment(ref _runStartedClients); <- BUG: Is this a bug waiting to happen for pre-started hosts?
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
});
}
Expand All @@ -421,54 +427,60 @@ private Task PrepareTestRunOnConcurrentManager(IProxyExecutionManager proxyExecu
bool initialized,
Task? initTask)
{
if (testRunCriteria != null)
// If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before.
if (testRunCriteria == null)
{
Task.Run(() =>
{
if (!initialized)
{
if (!proxyExecutionManager.IsInitialized)
{
proxyExecutionManager.Initialize(_skipDefaultAdapters);
}
throw new ArgumentNullException(nameof(testRunCriteria));
}

Interlocked.Increment(ref _runStartedClients);
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
}
else
Task.Run(() =>
{
if (!initialized)
{
if (!proxyExecutionManager.IsInitialized)
{
initTask!.Wait();
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing uninitialized client. Started clients: " + _runStartedClients);
proxyExecutionManager.Initialize(_skipDefaultAdapters);
}
EqtTrace.Verbose("ParallelProxyExecutionManager: Execution started. Started clients: " + _runStartedClients);
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing test run. Started clients: " + _runStartedClients);
Interlocked.Increment(ref _runStartedClients);
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
}
else
{
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Waiting for pre-initialized client to finish initialization. Started clients: " + _runStartedClients);
initTask!.Wait();
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Pre-initialized client finished initialization. Started clients: " + _runStartedClients);
}
proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler);
})
.ContinueWith(t =>
{
// Just in case, the actual execution couldn't start for an instance. Ensure that
// we call execution complete since we have already fetched a source. Otherwise
// execution will not terminate
EqtTrace.Error("ParallelProxyExecutionManager: Failed to trigger execution. Exception: " + t.Exception);
var handler = eventHandler;
var exceptionToString = t.Exception?.ToString();
var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString };
handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload));
handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString);
// Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun
// Differences:
// Aborted is sent to allow the current execution manager replaced with another instance
// Ensure that the test run aggregator in parallel run events handler doesn't add these statistics
// (since the test run didn't even start)
var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection<AttachmentSet>(), new Collection<InvokedDataCollector>(), TimeSpan.Zero);
handler.HandleTestRunComplete(completeArgs, null, null, null);
},
TaskContinuationOptions.OnlyOnFaulted);
}
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution starting. Started clients: " + _runStartedClients);
EqtTrace.Verbose("ProxyParallelExecutionManager: No sources available for execution.");
proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler);
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution started. Started clients: " + _runStartedClients);
})
.ContinueWith(t =>
{
// Just in case, the actual execution couldn't start for an instance. Ensure that
// we call execution complete since we have already fetched a source. Otherwise
// execution will not terminate
EqtTrace.Error("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager(continuation): Failed to trigger execution. Exception: " + t.Exception);
var handler = eventHandler;
var exceptionToString = t.Exception?.ToString();
var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString };
handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload));
handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString);
// Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun
// Differences:
// Aborted is sent to allow the current execution manager replaced with another instance
// Ensure that the test run aggregator in parallel run events handler doesn't add these statistics
// (since the test run didn't even start)
var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection<AttachmentSet>(), new Collection<InvokedDataCollector>(), TimeSpan.Zero);
handler.HandleTestRunComplete(completeArgs, null, null, null);
},
TaskContinuationOptions.OnlyOnFaulted);
}

public void InitializeTestRun(TestRunCriteria testRunCriteria, IInternalTestRunEventsHandler eventHandler)
Expand Down
Expand Up @@ -66,10 +66,12 @@ internal class ParallelRunEventsHandler : IInternalTestRunEventsHandler
ICollection<AttachmentSet>? runContextAttachments,
ICollection<string>? executorUris)
{
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Handling a run completion, this can be either one part of parallel run completing, or the whole parallel run completing.");
var parallelRunComplete = HandleSingleTestRunComplete(testRunCompleteArgs, lastChunkArgs, runContextAttachments, executorUris);

if (parallelRunComplete)
{
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Whole parallel run completed.");
var completedArgs = new TestRunCompleteEventArgs(_runDataAggregator.GetAggregatedRunStats(),
_runDataAggregator.IsCanceled,
_runDataAggregator.IsAborted,
Expand All @@ -96,6 +98,10 @@ internal class ParallelRunEventsHandler : IInternalTestRunEventsHandler

HandleParallelTestRunComplete(completedArgs);
}
else
{
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Single part of parallel run completed, but whole run is not complete yet.");
}
}

protected bool HandleSingleTestRunComplete(TestRunCompleteEventArgs testRunCompleteArgs,
Expand Down

0 comments on commit 9a0c418

Please sign in to comment.