Skip to content

Commit

Permalink
Pre-start testhosts (#3666)
Browse files Browse the repository at this point in the history
  • Loading branch information
nohwnd committed Feb 23, 2023
1 parent 3990c63 commit 05c0c4c
Show file tree
Hide file tree
Showing 14 changed files with 441 additions and 166 deletions.
Expand Up @@ -16,6 +16,15 @@ public interface IProxyDiscoveryManager
/// </summary>
void Initialize(bool skipDefaultAdapters);

/// <summary>
/// Initializes test discovery. Create the test host, setup channel and initialize extensions.
/// </summary>
///
/// <param name="discoveryCriteria">Settings, parameters for the discovery request</param>
/// <param name="eventHandler">EventHandler for handling discovery events from Engine</param>
/// <param name="skipDefaultAdapters">Skip default adapters flag</param>
void InitializeDiscovery(DiscoveryCriteria discoveryCriteria, ITestDiscoveryEventsHandler2 eventHandler, bool skipDefaultAdapters);

/// <summary>
/// Discovers tests
/// </summary>
Expand Down
Expand Up @@ -21,6 +21,14 @@ public interface IProxyExecutionManager
/// </summary>
void Initialize(bool skipDefaultAdapters);

/// <summary>
/// Initializes test execution. Create the test host, setup channel and initialize extensions.
/// </summary>
///
/// <param name="testRunCriteria">The settings/options for the test run.</param>
/// <param name="eventHandler">EventHandler for handling execution events from Engine.</param>
void InitializeTestRun(TestRunCriteria testRunCriteria, IInternalTestRunEventsHandler eventHandler);

/// <summary>
/// Starts the test run.
/// </summary>
Expand Down
Expand Up @@ -263,3 +263,5 @@ virtual Microsoft.VisualStudio.TestPlatform.Common.ExtensionFramework.TestPlugin
virtual Microsoft.VisualStudio.TestPlatform.Common.ExtensionFramework.Utilities.TestPluginInformation.IdentifierData.get -> string?
virtual Microsoft.VisualStudio.TestPlatform.Common.ExtensionFramework.Utilities.TestPluginInformation.Metadata.get -> System.Collections.Generic.ICollection<object?>!
virtual Microsoft.VisualStudio.TestPlatform.Common.Hosting.TestRuntimeProviderManager.GetTestHostManagerByRunConfiguration(string? runConfiguration, System.Collections.Generic.List<string!>? _) -> Microsoft.VisualStudio.TestPlatform.ObjectModel.Host.ITestRuntimeProvider?
Microsoft.VisualStudio.TestPlatform.ObjectModel.Engine.IProxyDiscoveryManager.InitializeDiscovery(Microsoft.VisualStudio.TestPlatform.ObjectModel.Client.DiscoveryCriteria! discoveryCriteria, Microsoft.VisualStudio.TestPlatform.ObjectModel.Client.ITestDiscoveryEventsHandler2! eventHandler, bool skipDefaultAdapters) -> void
Microsoft.VisualStudio.TestPlatform.ObjectModel.Engine.IProxyExecutionManager.InitializeTestRun(Microsoft.VisualStudio.TestPlatform.ObjectModel.Client.TestRunCriteria! testRunCriteria, Microsoft.VisualStudio.TestPlatform.ObjectModel.Client.IInternalTestRunEventsHandler! eventHandler) -> void
Expand Up @@ -113,4 +113,12 @@ private void InitializeExtensions(IEnumerable<string> sources)
// it will use TestPluginCache of vstest.console
_discoveryManager.Initialize(Enumerable.Empty<string>(), null);
}

public void InitializeDiscovery(DiscoveryCriteria discoveryCriteria, ITestDiscoveryEventsHandler2 eventHandler, bool skipDefaultAdapters)
{
// Leaving this empty as it is not really relevant to the in-process proxy managers since
// there's no external testhost to be started. The idea of pre-initializing the test run
// makes sense only for out-of-process proxies like ProxyExecutionManager or
// ProxyDiscoveryManager.
}
}
Expand Up @@ -133,7 +133,6 @@ public void Close()
{
}


private void InitializeExtensions(IEnumerable<string> sources)
{
var extensionsFromSource = _testHostManager.GetTestPlatformExtensions(sources, Enumerable.Empty<string>());
Expand All @@ -146,4 +145,12 @@ private void InitializeExtensions(IEnumerable<string> sources)
// it will use TestPluginCache of vstest.console
_executionManager.Initialize(Enumerable.Empty<string>(), null);
}

public void InitializeTestRun(TestRunCriteria testRunCriteria, IInternalTestRunEventsHandler eventHandler)
{
// Leaving this empty as it is not really relevant to the in-process proxy managers since
// there's no external testhost to be started. The idea of pre-initializing the test run
// makes sense only for out-of-process proxies like ProxyExecutionManager or
// ProxyDiscoveryManager.
}
}
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;

Expand All @@ -16,14 +17,22 @@ namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client;
/// </summary>
internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkload> : IDisposable
{
private const int PreStart = 2;
private readonly static int VSTEST_HOSTPRESTART_COUNT =
int.TryParse(
Environment.GetEnvironmentVariable(nameof(VSTEST_HOSTPRESTART_COUNT)),
out int num)
? num
: PreStart;
private readonly Func<TestRuntimeProviderInfo, TManager> _createNewManager;

/// <summary>
/// Default number of Processes
/// </summary>
private TEventHandler? _eventHandler;
private Func<TEventHandler, TManager, TEventHandler>? _getEventHandler;
private Action<TManager, TEventHandler, TWorkload>? _runWorkload;
private Func<TManager, TEventHandler, TWorkload, Task>? _initializeWorkload;
private Action<TManager, TEventHandler, TWorkload, bool, Task?>? _runWorkload;
private bool _acceptMoreWork;
private readonly List<ProviderSpecificWorkload<TWorkload>> _workloads = new();
private readonly List<Slot> _managerSlots = new();
Expand All @@ -33,6 +42,7 @@ internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkloa
public int MaxParallelLevel { get; }
public int OccupiedSlotCount { get; private set; }
public int AvailableSlotCount { get; private set; }
public int PreStartCount { get; private set; }

/// <summary>
/// Creates new instance of ParallelOperationManager.
Expand All @@ -44,6 +54,10 @@ public ParallelOperationManager(Func<TestRuntimeProviderInfo, TManager> createNe
{
_createNewManager = createNewManager;
MaxParallelLevel = parallelLevel;
// pre-start only when we don't run in parallel, if we do run in parallel,
// then pre-starting has no additional value because while one host is starting,
// another is running tests.
PreStartCount = MaxParallelLevel == 1 ? VSTEST_HOSTPRESTART_COUNT : 0;
ClearSlots(acceptMoreWork: true);
}

Expand All @@ -53,33 +67,32 @@ private void ClearSlots(bool acceptMoreWork)
{
_acceptMoreWork = acceptMoreWork;
_managerSlots.Clear();
_managerSlots.AddRange(Enumerable.Range(0, MaxParallelLevel).Select(_ => new Slot()));
_managerSlots.AddRange(Enumerable.Range(0, MaxParallelLevel + PreStartCount).Select(i => new Slot { Index = i }));
SetOccupiedSlotCount();
}
}

private void SetOccupiedSlotCount()
{
AvailableSlotCount = _managerSlots.Count(s => s.IsAvailable);
AvailableSlotCount = _managerSlots.Count(s => !s.HasWork);
OccupiedSlotCount = _managerSlots.Count - AvailableSlotCount;
}

public void StartWork(
List<ProviderSpecificWorkload<TWorkload>> workloads,
TEventHandler eventHandler,
Func<TEventHandler, TManager, TEventHandler> getEventHandler,
Action<TManager, TEventHandler, TWorkload> runWorkload)
Func<TManager, TEventHandler, TWorkload, Task> initializeWorkload,
Action<TManager, TEventHandler, TWorkload, bool, Task?> runWorkload)
{
_ = workloads ?? throw new ArgumentNullException(nameof(workloads));
_eventHandler = eventHandler ?? throw new ArgumentNullException(nameof(eventHandler));
_getEventHandler = getEventHandler ?? throw new ArgumentNullException(nameof(getEventHandler));
_initializeWorkload = initializeWorkload ?? throw new ArgumentNullException(nameof(initializeWorkload));
_runWorkload = runWorkload ?? throw new ArgumentNullException(nameof(runWorkload));

_workloads.AddRange(workloads);

// This creates as many slots as possible even though we might not use them when we get less workloads to process,
// this is not a big issue, and not worth optimizing, because the parallel level is determined by the logical CPU count,
// so it is a small number.
ClearSlots(acceptMoreWork: true);
RunWorkInParallel();
}
Expand All @@ -101,66 +114,115 @@ private bool RunWorkInParallel()
if (_runWorkload == null)
throw new InvalidOperationException($"{nameof(_runWorkload)} was not provided.");

// Reserve slots and assign them work under the lock so we keep
// the slots consistent.
List<SlotWorkloadPair> workToRun = new();
// Reserve slots and assign them work under the lock so we keep the slots consistent.
Slot[] slots;
lock (_lock)
{
if (_workloads.Count == 0)
return false;

// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
// it is possible that we will clear all slots, and have RunWorkInParallel waiting on the lock,
// so when it is allowed to enter it will try to add more work, but we already cancelled,
// so we should not start more work.
if (!_acceptMoreWork)
return false;

var availableSlots = _managerSlots.Where(slot => slot.IsAvailable).ToList();
var availableWorkloads = _workloads.Where(workload => workload != null).ToList();
var amount = Math.Min(availableSlots.Count, availableWorkloads.Count);
var workloadsToRun = availableWorkloads.Take(amount).ToList();

// We grab all empty slots.
var availableSlots = _managerSlots.Where(slot => !slot.HasWork).ToImmutableArray();
var occupiedSlots = MaxParallelLevel - (availableSlots.Length - PreStartCount);
// We grab all available workloads.
var availableWorkloads = _workloads.Where(workload => workload != null).ToImmutableArray();
// We take the amount of workloads to fill all the slots, or just as many workloads
// as there are if there are less workloads than slots.
var amount = Math.Min(availableSlots.Length, availableWorkloads.Length);
var workloadsToAdd = availableWorkloads.Take(amount).ToImmutableArray();

// We associate each workload to a slot, if we reached the max parallel
// level, then we will run only initalize step of the given workload.
for (int i = 0; i < amount; i++)
{
var slot = availableSlots[i];
slot.IsAvailable = false;
var workload = workloadsToRun[i];
workToRun.Add(new SlotWorkloadPair(slot, workload));
slot.HasWork = true;
var workload = workloadsToAdd[i];
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;

var manager = _createNewManager(workload.Provider);
var eventHandler = _getEventHandler(_eventHandler, manager);
slot.EventHandler = eventHandler;
slot.Manager = manager;
slot.ManagerInfo = workload.Provider;
slot.Work = workload.Work;

_workloads.Remove(workload);
}

slots = _managerSlots.ToArray();
SetOccupiedSlotCount();

foreach (var pair in workToRun)
{
var manager = _createNewManager(pair.Workload.Provider);
var eventHandler = _getEventHandler(_eventHandler, manager);
pair.Slot.EventHandler = eventHandler;
pair.Slot.Manager = manager;
pair.Slot.ManagerInfo = pair.Workload.Provider;
pair.Slot.Work = pair.Workload.Work;
}
}

// Kick of the work in parallel outside of the lock so if we have more requests to run
// that come in at the same time we only block them from reserving the same slot at the same time
// but not from starting their assigned work at the same time.
foreach (var pair in workToRun)

// Kick of all pre-started hosts from the ones that had the longest time to initialize.
//
// This code should be safe even outside the lock since HasWork is only changed when work is
// complete and only for the slot that completed work. It is not possible to complete work before
// starting it (which is what we are trying to do here).
var startedWork = 0;
foreach (var slot in slots.Where(s => s.HasWork && !s.IsRunning && s.IsPreStarted).OrderBy(s => s.PreStartTime))
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);

// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
break;
}
}

// We already started as many pre-started testhosts as we are allowed by the max parallel level
// skip running more work.
if (startedWork < MaxParallelLevel)
{
try
foreach (var slot in slots)
{
_runWorkload(pair.Slot.Manager!, pair.Slot.EventHandler!, pair.Workload.Work!);
if (slot.HasWork && !slot.IsRunning)
{
if (!slot.ShouldPreStart)
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Started work on a host.");
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
}
}

// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
break;
}
}
finally
}

var preStartedWork = 0;
foreach (var slot in slots)
{
if (slot.HasWork && slot.ShouldPreStart && !slot.IsPreStarted)
{
// clean the slot or something, to make sure we don't keep it reserved.
preStartedWork++;
slot.PreStartTime = DateTime.Now.TimeOfDay;
slot.IsPreStarted = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Pre-starting a host.");
slot.InitTask = _initializeWorkload!(slot.Manager!, slot.EventHandler!, slot.Work!);
}
}

// Return true when we started more work. Or false, when there was nothing more to do.
// This will propagate to handling of partial discovery or partial run.
return workToRun.Count > 0;
return preStartedWork + startedWork > 0;
}

public bool RunNextWork(TManager completedManager)
Expand All @@ -174,12 +236,12 @@ private void ClearCompletedSlot(TManager completedManager)
{
lock (_lock)
{
var completedSlot = _managerSlots.Where(s => ReferenceEquals(completedManager, s.Manager)).ToList();
var completedSlot = _managerSlots.Where(s => ReferenceEquals(completedManager, s.Manager)).ToImmutableArray();
// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
// it is possible that we will clear all slots, while ClearCompletedSlot is waiting on the lock,
// so when it is allowed to enter it will fail to find the respective slot and fail. In this case it is
// okay that the slot is not found, and we do nothing, because we already stopped all work and cleared the slots.
if (completedSlot.Count == 0)
if (completedSlot.Length == 0)
{
if (_acceptMoreWork)
{
Expand All @@ -191,13 +253,21 @@ private void ClearCompletedSlot(TManager completedManager)
}
}

if (completedSlot.Count > 1)
if (completedSlot.Length > 1)
{
throw new InvalidOperationException("The provided manager was found in multiple slots.");
}

var slot = completedSlot[0];
slot.IsAvailable = true;
slot.PreStartTime = TimeSpan.Zero;
slot.Work = default(TWorkload);
slot.HasWork = false;
slot.ShouldPreStart = false;
slot.IsPreStarted = false;
slot.InitTask = null;
slot.IsRunning = false;
slot.Manager = default(TManager);
slot.EventHandler = default(TEventHandler);

SetOccupiedSlotCount();
}
Expand All @@ -207,9 +277,9 @@ public void DoActionOnAllManagers(Action<TManager> action, bool doActionsInParal
{
// We don't need to lock here, we just grab the current list of
// slots that are occupied (have managers) and run action on each one of them.
var managers = _managerSlots.Where(slot => !slot.IsAvailable).Select(slot => slot.Manager).ToList();
var managers = _managerSlots.Where(slot => slot.HasWork).Select(slot => slot.Manager).ToImmutableArray();
int i = 0;
var actionTasks = new Task[managers.Count];
var actionTasks = new Task[managers.Length];
foreach (var manager in managers)
{
if (manager == null)
Expand Down Expand Up @@ -260,7 +330,14 @@ public void Dispose()

private class Slot
{
public bool IsAvailable { get; set; } = true;
public int Index { get; set; }
public bool HasWork { get; set; }

public bool ShouldPreStart { get; set; }

public Task? InitTask { get; set; }

public bool IsRunning { get; set; }

public TManager? Manager { get; set; }

Expand All @@ -269,16 +346,12 @@ private class Slot
public TEventHandler? EventHandler { get; set; }

public TWorkload? Work { get; set; }
}
public bool IsPreStarted { get; internal set; }
public TimeSpan PreStartTime { get; internal set; }

private class SlotWorkloadPair
{
public SlotWorkloadPair(Slot slot, ProviderSpecificWorkload<TWorkload> workload)
public override string ToString()
{
Slot = slot;
Workload = workload;
return $"{Index}: HasWork: {HasWork}, ShouldPreStart: {ShouldPreStart}, IsPreStarted: {IsPreStarted}, IsRunning: {IsRunning}";
}
public Slot Slot { get; }
public ProviderSpecificWorkload<TWorkload> Workload { get; }
}
}

0 comments on commit 05c0c4c

Please sign in to comment.