-
Notifications
You must be signed in to change notification settings - Fork 315
/
ParallelOperationManager.cs
357 lines (307 loc) · 14.8 KB
/
ParallelOperationManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client;
/// <summary>
/// Manages work that is done on multiple managers (testhosts) in parallel such as parallel discovery or parallel run.
/// </summary>
internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkload> : IDisposable
{
private const int PreStart = 2;
private readonly static int VSTEST_HOSTPRESTART_COUNT =
int.TryParse(
Environment.GetEnvironmentVariable(nameof(VSTEST_HOSTPRESTART_COUNT)),
out int num)
? num
: PreStart;
private readonly Func<TestRuntimeProviderInfo, TManager> _createNewManager;
/// <summary>
/// Default number of Processes
/// </summary>
private TEventHandler? _eventHandler;
private Func<TEventHandler, TManager, TEventHandler>? _getEventHandler;
private Func<TManager, TEventHandler, TWorkload, Task>? _initializeWorkload;
private Action<TManager, TEventHandler, TWorkload, bool, Task?>? _runWorkload;
private bool _acceptMoreWork;
private readonly List<ProviderSpecificWorkload<TWorkload>> _workloads = new();
private readonly List<Slot> _managerSlots = new();
private readonly object _lock = new();
public int MaxParallelLevel { get; }
public int OccupiedSlotCount { get; private set; }
public int AvailableSlotCount { get; private set; }
public int PreStartCount { get; private set; }
/// <summary>
/// Creates new instance of ParallelOperationManager.
/// </summary>
/// <param name="createNewManager">Creates a new manager that is responsible for running a single part of the overall workload.
/// A manager is typically a testhost, and the part of workload is discovering or running a single test dll.</param>
/// <param name="parallelLevel">Determines the maximum amount of parallel managers that can be active at the same time.</param>
public ParallelOperationManager(Func<TestRuntimeProviderInfo, TManager> createNewManager, int parallelLevel)
{
_createNewManager = createNewManager;
MaxParallelLevel = parallelLevel;
// pre-start only when we don't run in parallel, if we do run in parallel,
// then pre-starting has no additional value because while one host is starting,
// another is running tests.
PreStartCount = MaxParallelLevel == 1 ? VSTEST_HOSTPRESTART_COUNT : 0;
ClearSlots(acceptMoreWork: true);
}
private void ClearSlots(bool acceptMoreWork)
{
lock (_lock)
{
_acceptMoreWork = acceptMoreWork;
_managerSlots.Clear();
_managerSlots.AddRange(Enumerable.Range(0, MaxParallelLevel + PreStartCount).Select(i => new Slot { Index = i }));
SetOccupiedSlotCount();
}
}
private void SetOccupiedSlotCount()
{
AvailableSlotCount = _managerSlots.Count(s => !s.HasWork);
OccupiedSlotCount = _managerSlots.Count - AvailableSlotCount;
}
public void StartWork(
List<ProviderSpecificWorkload<TWorkload>> workloads,
TEventHandler eventHandler,
Func<TEventHandler, TManager, TEventHandler> getEventHandler,
Func<TManager, TEventHandler, TWorkload, Task> initializeWorkload,
Action<TManager, TEventHandler, TWorkload, bool, Task?> runWorkload)
{
_ = workloads ?? throw new ArgumentNullException(nameof(workloads));
_eventHandler = eventHandler ?? throw new ArgumentNullException(nameof(eventHandler));
_getEventHandler = getEventHandler ?? throw new ArgumentNullException(nameof(getEventHandler));
_initializeWorkload = initializeWorkload ?? throw new ArgumentNullException(nameof(initializeWorkload));
_runWorkload = runWorkload ?? throw new ArgumentNullException(nameof(runWorkload));
_workloads.AddRange(workloads);
ClearSlots(acceptMoreWork: true);
RunWorkInParallel();
}
// This does not do anything in parallel, all the workloads we schedule are offloaded to separate Task in the _runWorkload callback.
// I did not want to change that, yet but this is the correct place to do that offloading. Not each manager.
private bool RunWorkInParallel()
{
// TODO: Right now we don't re-use shared hosts, but if we did, this is the place
// where we should find a workload that fits the manager if any of them is shared.
// Or tear it down, and start a new one.
if (_eventHandler == null)
throw new InvalidOperationException($"{nameof(_eventHandler)} was not provided.");
if (_getEventHandler == null)
throw new InvalidOperationException($"{nameof(_getEventHandler)} was not provided.");
if (_runWorkload == null)
throw new InvalidOperationException($"{nameof(_runWorkload)} was not provided.");
// Reserve slots and assign them work under the lock so we keep the slots consistent.
Slot[] slots;
lock (_lock)
{
// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
// it is possible that we will clear all slots, and have RunWorkInParallel waiting on the lock,
// so when it is allowed to enter it will try to add more work, but we already cancelled,
// so we should not start more work.
if (!_acceptMoreWork)
return false;
// We grab all empty slots.
var availableSlots = _managerSlots.Where(slot => !slot.HasWork).ToImmutableArray();
var occupiedSlots = MaxParallelLevel - (availableSlots.Length - PreStartCount);
// We grab all available workloads.
var availableWorkloads = _workloads.Where(workload => workload != null).ToImmutableArray();
// We take the amount of workloads to fill all the slots, or just as many workloads
// as there are if there are less workloads than slots.
var amount = Math.Min(availableSlots.Length, availableWorkloads.Length);
var workloadsToAdd = availableWorkloads.Take(amount).ToImmutableArray();
// We associate each workload to a slot, if we reached the max parallel
// level, then we will run only initalize step of the given workload.
for (int i = 0; i < amount; i++)
{
var slot = availableSlots[i];
slot.HasWork = true;
var workload = workloadsToAdd[i];
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;
var manager = _createNewManager(workload.Provider);
var eventHandler = _getEventHandler(_eventHandler, manager);
slot.EventHandler = eventHandler;
slot.Manager = manager;
slot.ManagerInfo = workload.Provider;
slot.Work = workload.Work;
_workloads.Remove(workload);
}
slots = _managerSlots.ToArray();
SetOccupiedSlotCount();
}
// Kick of the work in parallel outside of the lock so if we have more requests to run
// that come in at the same time we only block them from reserving the same slot at the same time
// but not from starting their assigned work at the same time.
// Kick of all pre-started hosts from the ones that had the longest time to initialize.
//
// This code should be safe even outside the lock since HasWork is only changed when work is
// complete and only for the slot that completed work. It is not possible to complete work before
// starting it (which is what we are trying to do here).
var startedWork = 0;
foreach (var slot in slots.Where(s => s.HasWork && !s.IsRunning && s.IsPreStarted).OrderBy(s => s.PreStartTime))
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}");
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
break;
}
}
// We already started as many pre-started testhosts as we are allowed by the max parallel level
// skip running more work.
if (startedWork < MaxParallelLevel)
{
foreach (var slot in slots)
{
if (slot.HasWork && !slot.IsRunning)
{
if (!slot.ShouldPreStart)
{
startedWork++;
slot.IsRunning = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Started work on a host.");
_runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask);
}
}
// We already started as many as we were allowed, jump out;
if (startedWork == MaxParallelLevel)
{
break;
}
}
}
var preStartedWork = 0;
foreach (var slot in slots)
{
if (slot.HasWork && slot.ShouldPreStart && !slot.IsPreStarted)
{
preStartedWork++;
slot.PreStartTime = DateTime.Now.TimeOfDay;
slot.IsPreStarted = true;
EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Pre-starting a host.");
slot.InitTask = _initializeWorkload!(slot.Manager!, slot.EventHandler!, slot.Work!);
}
}
// Return true when we started more work. Or false, when there was nothing more to do.
// This will propagate to handling of partial discovery or partial run.
return preStartedWork + startedWork > 0;
}
public bool RunNextWork(TManager completedManager)
{
ValidateArg.NotNull(completedManager, nameof(completedManager));
ClearCompletedSlot(completedManager);
return RunWorkInParallel();
}
private void ClearCompletedSlot(TManager completedManager)
{
lock (_lock)
{
var completedSlot = _managerSlots.Where(s => ReferenceEquals(completedManager, s.Manager)).ToImmutableArray();
// When HandlePartialDiscovery or HandlePartialRun are in progress, and we call StopAllManagers,
// it is possible that we will clear all slots, while ClearCompletedSlot is waiting on the lock,
// so when it is allowed to enter it will fail to find the respective slot and fail. In this case it is
// okay that the slot is not found, and we do nothing, because we already stopped all work and cleared the slots.
if (completedSlot.Length == 0)
{
if (_acceptMoreWork)
{
throw new InvalidOperationException("The provided manager was not found in any slot.");
}
else
{
return;
}
}
if (completedSlot.Length > 1)
{
throw new InvalidOperationException("The provided manager was found in multiple slots.");
}
var slot = completedSlot[0];
slot.PreStartTime = TimeSpan.Zero;
slot.Work = default(TWorkload);
slot.HasWork = false;
slot.ShouldPreStart = false;
slot.IsPreStarted = false;
slot.InitTask = null;
slot.IsRunning = false;
slot.Manager = default(TManager);
slot.EventHandler = default(TEventHandler);
SetOccupiedSlotCount();
}
}
public void DoActionOnAllManagers(Action<TManager> action, bool doActionsInParallel = false)
{
// We don't need to lock here, we just grab the current list of
// slots that are occupied (have managers) and run action on each one of them.
var managers = _managerSlots.Where(slot => slot.HasWork).Select(slot => slot.Manager).ToImmutableArray();
int i = 0;
var actionTasks = new Task[managers.Length];
foreach (var manager in managers)
{
if (manager == null)
continue;
// Read the array before firing the task - beware of closures
if (doActionsInParallel)
{
actionTasks[i] = Task.Run(() => action(manager));
i++;
}
else
{
DoManagerAction(() => action(manager));
}
}
if (doActionsInParallel)
{
DoManagerAction(() => Task.WaitAll(actionTasks));
}
}
private static void DoManagerAction(Action action)
{
try
{
action();
}
catch (Exception ex)
{
// Exception can occur if we are trying to cancel a test run on an executor where test run is not even fired
// we can safely ignore that as user is just canceling the test run and we don't care about additional parallel executors
// as we will be disposing them off soon anyway
EqtTrace.Warning("ParallelOperationManager.DoManagerAction: Exception while invoking an action on Proxy Manager instance: {0}", ex);
}
}
internal void StopAllManagers()
{
ClearSlots(acceptMoreWork: false);
}
public void Dispose()
{
ClearSlots(acceptMoreWork: false);
}
private class Slot
{
public int Index { get; set; }
public bool HasWork { get; set; }
public bool ShouldPreStart { get; set; }
public Task? InitTask { get; set; }
public bool IsRunning { get; set; }
public TManager? Manager { get; set; }
public TestRuntimeProviderInfo? ManagerInfo { get; set; }
public TEventHandler? EventHandler { get; set; }
public TWorkload? Work { get; set; }
public bool IsPreStarted { get; internal set; }
public TimeSpan PreStartTime { get; internal set; }
public override string ToString()
{
return $"{Index}: HasWork: {HasWork}, ShouldPreStart: {ShouldPreStart}, IsPreStarted: {IsPreStarted}, IsRunning: {IsRunning}";
}
}
}