Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timing issue in parallel execution #4629

Merged
merged 4 commits into from Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -9,6 +9,7 @@

using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;

namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client;

Expand Down Expand Up @@ -63,6 +64,8 @@ public ParallelOperationManager(Func<TestRuntimeProviderInfo, TWorkload, TManage

private void ClearSlots(bool acceptMoreWork)
{
EqtTrace.Verbose($"ParallelOperationManager.ClearSlots: Clearing all slots. Slots should accept more work: {acceptMoreWork}");

lock (_lock)
{
_acceptMoreWork = acceptMoreWork;
Expand All @@ -76,6 +79,13 @@ private void SetOccupiedSlotCount()
{
AvailableSlotCount = _managerSlots.Count(s => !s.HasWork);
OccupiedSlotCount = _managerSlots.Count - AvailableSlotCount;

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.SetOccupiedSlotCount: Setting slot counts AvailableSlotCount = {AvailableSlotCount}, OccupiedSlotCount = {OccupiedSlotCount}.");
EqtTrace.Verbose($"Occupied slots:\n{(string.Join("\n", _managerSlots.Where(s => s.HasWork).Select((slot) => $"{slot.Index}: {GetSourcesForSlotExpensive(slot)}").ToArray()))}");

}
}

public void StartWork(
Expand All @@ -91,6 +101,7 @@ private void SetOccupiedSlotCount()
_initializeWorkload = initializeWorkload ?? throw new ArgumentNullException(nameof(initializeWorkload));
_runWorkload = runWorkload ?? throw new ArgumentNullException(nameof(runWorkload));

EqtTrace.Verbose($"ParallelOperationManager.StartWork: Starting adding {workloads.Count} workloads.");
_workloads.AddRange(workloads);

ClearSlots(acceptMoreWork: true);
Expand Down Expand Up @@ -123,7 +134,10 @@ private bool RunWorkInParallel()
// so when it is allowed to enter it will try to add more work, but we already cancelled,
// so we should not start more work.
if (!_acceptMoreWork)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We don't accept more work, returning false.");
return false;
}

// We grab all empty slots.
var availableSlots = _managerSlots.Where(slot => !slot.HasWork).ToImmutableArray();
Expand All @@ -136,11 +150,10 @@ private bool RunWorkInParallel()
var workloadsToAdd = availableWorkloads.Take(amount).ToImmutableArray();

// We associate each workload to a slot, if we reached the max parallel
// level, then we will run only initalize step of the given workload.
// level, then we will run only initialize step of the given workload.
for (int i = 0; i < amount; i++)
{
var slot = availableSlots[i];
slot.HasWork = true;
var workload = workloadsToAdd[i];
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;

Expand All @@ -152,6 +165,13 @@ private bool RunWorkInParallel()
slot.Work = workload.Work;

_workloads.Remove(workload);

EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Adding 1 workload to slot, remaining workloads {_workloads.Count}.");

// This must be set last, every loop below looks at this property,
// and they can do so from a different thread. So if we mark it as HasWork before actually assigning the work
// we can pick up the slot, but it has no associated work yet.
slot.HasWork = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix.

}

slots = _managerSlots.ToArray();
Expand All @@ -172,12 +192,16 @@ private bool RunWorkInParallel()
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host for work (source) {GetSourcesForSlotExpensive(slot)}: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
}
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);

// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
break;
}
}
Expand All @@ -194,14 +218,18 @@ private bool RunWorkInParallel()
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Started work on a host.");
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Started host in slot number {slot.Index} for work (source): {GetSourcesForSlotExpensive(slot)}.");
}
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
}
}

// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work.");
break;
}
}
Expand All @@ -215,14 +243,19 @@ private bool RunWorkInParallel()
preStartedWork++;
slot.PreStartTime = DateTime.Now.TimeOfDay;
slot.IsPreStarted = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Pre-starting a host.");
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Pre-starting a host for work (source): {GetSourcesForSlotExpensive(slot)}.");
}
slot.InitTask = _initializeWorkload!(slot.Manager!, slot.EventHandler!, slot.Work!);
}
}

// Return true when we started more work. Or false, when there was nothing more to do.
// This will propagate to handling of partial discovery or partial run.
return preStartedWork + startedWork > 0;
var weAddedMoreWork = preStartedWork + startedWork > 0;
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {preStartedWork + startedWork} work items in here, returning {weAddedMoreWork}.");
return weAddedMoreWork;
}

public bool RunNextWork(TManager completedManager)
Expand Down Expand Up @@ -258,6 +291,10 @@ private void ClearCompletedSlot(TManager completedManager)
throw new InvalidOperationException("The provided manager was found in multiple slots.");
}

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose($"ParallelOperationManager.ClearCompletedSlot: Clearing slot number {completedSlot[0].Index} with work (source): {GetSourcesForSlotExpensive(completedSlot[0])}.");
}
var slot = completedSlot[0];
slot.PreStartTime = TimeSpan.Zero;
slot.Work = default(TWorkload);
Expand All @@ -273,8 +310,14 @@ private void ClearCompletedSlot(TManager completedManager)
}
}

private static string GetSourcesForSlotExpensive(ParallelOperationManager<TManager, TEventHandler, TWorkload>.Slot slot)
{
return string.Join(", ", (slot.Work as DiscoveryCriteria)?.Sources ?? (slot.Work as TestRunCriteria)?.Sources ?? Array.Empty<string>());
}

public void DoActionOnAllManagers(Action<TManager> action, bool doActionsInParallel = false)
{
EqtTrace.Verbose($"ParallelOperationManager.DoActionOnAllManagers: Calling an action on all managers.");
// We don't need to lock here, we just grab the current list of
// slots that are occupied (have managers) and run action on each one of them.
var managers = _managerSlots.Where(slot => slot.HasWork).Select(slot => slot.Manager).ToImmutableArray();
Expand Down Expand Up @@ -320,11 +363,13 @@ private static void DoManagerAction(Action action)

internal void StopAllManagers()
{
EqtTrace.Verbose($"ParallelOperationManager.StopAllManagers: Stopping all managers.");
ClearSlots(acceptMoreWork: false);
}

public void Dispose()
{
EqtTrace.Verbose($"ParallelOperationManager.Dispose: Disposing all managers.");
ClearSlots(acceptMoreWork: false);
}

Expand Down
Expand Up @@ -287,6 +287,12 @@ private Task InitializeDiscoverTestsOnConcurrentManager(IProxyDiscoveryManager p
bool initialized,
Task? task)
{
// If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before.
if (discoveryCriteria == null)
{
throw new ArgumentNullException(nameof(discoveryCriteria));
}

// Kick off another discovery task for the next source
Task.Run(() =>
{
Expand Down
Expand Up @@ -159,11 +159,12 @@ public void Close()
? _runCompletedClients == _runStartedClients
: _runCompletedClients == _availableWorkloads;

EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total completed clients = {0}, Run complete = {1}, Run canceled: {2}.", _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled);
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total workloads = {0}, Total started clients = {1} Total completed clients = {2}, Run complete = {3}, Run canceled: {4}.", _availableWorkloads, _runStartedClients, _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled);
}

if (allRunsCompleted)
{
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: All runs completed stopping all managers.");
_parallelOperationManager.StopAllManagers();
return true;
}
Expand All @@ -185,8 +186,13 @@ public void Close()
// {
// return true;
// }
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Not cancelled or aborted, running next work.");
var _ = _parallelOperationManager.RunNextWork(proxyExecutionManager);
}
else
{
EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Cancelled or aborted, not running next work.");
}

return false;
}
Expand Down Expand Up @@ -403,7 +409,7 @@ private Task PrepareTestRunOnConcurrentManager(IProxyExecutionManager proxyExecu
// clients to be done running their workloads when aborting/cancelling and that doesn't
// happen with an initialized workload that is never run.
//
// Interlocked.Increment(ref _runStartedClients);
// Interlocked.Increment(ref _runStartedClients); <- BUG: Is this a bug waiting to happen for pre-started hosts?
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
});
}
Expand All @@ -421,54 +427,60 @@ private Task PrepareTestRunOnConcurrentManager(IProxyExecutionManager proxyExecu
bool initialized,
Task? initTask)
{
if (testRunCriteria != null)
// If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before.
if (testRunCriteria == null)
{
Task.Run(() =>
{
if (!initialized)
{
if (!proxyExecutionManager.IsInitialized)
{
proxyExecutionManager.Initialize(_skipDefaultAdapters);
}
throw new ArgumentNullException(nameof(testRunCriteria));
}

Interlocked.Increment(ref _runStartedClients);
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
}
else
Task.Run(() =>
{
if (!initialized)
{
if (!proxyExecutionManager.IsInitialized)
{
initTask!.Wait();
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing uninitialized client. Started clients: " + _runStartedClients);
proxyExecutionManager.Initialize(_skipDefaultAdapters);
}

EqtTrace.Verbose("ParallelProxyExecutionManager: Execution started. Started clients: " + _runStartedClients);
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing test run. Started clients: " + _runStartedClients);
Interlocked.Increment(ref _runStartedClients);
proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler);
}
else
{
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Waiting for pre-initialized client to finish initialization. Started clients: " + _runStartedClients);
initTask!.Wait();
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Pre-initialized client finished initialization. Started clients: " + _runStartedClients);
}

proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler);
})
.ContinueWith(t =>
{
// Just in case, the actual execution couldn't start for an instance. Ensure that
// we call execution complete since we have already fetched a source. Otherwise
// execution will not terminate
EqtTrace.Error("ParallelProxyExecutionManager: Failed to trigger execution. Exception: " + t.Exception);

var handler = eventHandler;
var exceptionToString = t.Exception?.ToString();
var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString };
handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload));
handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString);

// Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun
// Differences:
// Aborted is sent to allow the current execution manager replaced with another instance
// Ensure that the test run aggregator in parallel run events handler doesn't add these statistics
// (since the test run didn't even start)
var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection<AttachmentSet>(), new Collection<InvokedDataCollector>(), TimeSpan.Zero);
handler.HandleTestRunComplete(completeArgs, null, null, null);
},
TaskContinuationOptions.OnlyOnFaulted);
}
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution starting. Started clients: " + _runStartedClients);

EqtTrace.Verbose("ProxyParallelExecutionManager: No sources available for execution.");
proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler);
EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution started. Started clients: " + _runStartedClients);
})
.ContinueWith(t =>
{
// Just in case, the actual execution couldn't start for an instance. Ensure that
// we call execution complete since we have already fetched a source. Otherwise
// execution will not terminate
EqtTrace.Error("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager(continuation): Failed to trigger execution. Exception: " + t.Exception);

var handler = eventHandler;
var exceptionToString = t.Exception?.ToString();
var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString };
handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload));
handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString);

// Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun
// Differences:
// Aborted is sent to allow the current execution manager replaced with another instance
// Ensure that the test run aggregator in parallel run events handler doesn't add these statistics
// (since the test run didn't even start)
var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection<AttachmentSet>(), new Collection<InvokedDataCollector>(), TimeSpan.Zero);
handler.HandleTestRunComplete(completeArgs, null, null, null);
},
TaskContinuationOptions.OnlyOnFaulted);
}

public void InitializeTestRun(TestRunCriteria testRunCriteria, IInternalTestRunEventsHandler eventHandler)
Expand Down
Expand Up @@ -66,10 +66,12 @@ internal class ParallelRunEventsHandler : IInternalTestRunEventsHandler
ICollection<AttachmentSet>? runContextAttachments,
ICollection<string>? executorUris)
{
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Handling a run completion, this can be either one part of parallel run completing, or the whole parallel run completing.");
var parallelRunComplete = HandleSingleTestRunComplete(testRunCompleteArgs, lastChunkArgs, runContextAttachments, executorUris);

if (parallelRunComplete)
{
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Whole parallel run completed.");
var completedArgs = new TestRunCompleteEventArgs(_runDataAggregator.GetAggregatedRunStats(),
_runDataAggregator.IsCanceled,
_runDataAggregator.IsAborted,
Expand All @@ -96,6 +98,10 @@ internal class ParallelRunEventsHandler : IInternalTestRunEventsHandler

HandleParallelTestRunComplete(completedArgs);
}
else
{
EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Single part of parallel run completed, but whole run is not complete yet.");
}
}

protected bool HandleSingleTestRunComplete(TestRunCompleteEventArgs testRunCompleteArgs,
Expand Down