@@ -25,18 +25,30 @@ package resourcetuner
25
25
import (
26
26
"context"
27
27
"errors"
28
+ "runtime"
28
29
"sync"
29
30
"time"
30
31
31
32
"github.com/shirou/gopsutil/v4/cpu"
32
33
"github.com/shirou/gopsutil/v4/mem"
33
34
"go.einride.tech/pid"
35
+ "go.temporal.io/sdk/log"
34
36
"go.temporal.io/sdk/worker"
35
37
)
36
38
37
39
type ResourceBasedTunerOptions struct {
40
+ // TargetMem is the target overall system memory usage as value 0 and 1 that the controller will
41
+ // attempt to maintain. Must be set nonzero.
38
42
TargetMem float64
43
+ // TargetCpu is the target overall system CPU usage as value 0 and 1 that the controller will
44
+ // attempt to maintain. Must be set nonzero.
39
45
TargetCpu float64
46
+ // Passed to ResourceBasedSlotSupplierOptions.RampThrottle for activities.
47
+ // If not set, the default value is 50ms.
48
+ ActivityRampThrottle time.Duration
49
+ // Passed to ResourceBasedSlotSupplierOptions.RampThrottle for workflows.
50
+ // If not set, the default value is 0ms.
51
+ WorkflowRampThrottle time.Duration
40
52
}
41
53
42
54
// NewResourceBasedTuner creates a WorkerTuner that dynamically adjusts the number of slots based
@@ -50,10 +62,19 @@ func NewResourceBasedTuner(opts ResourceBasedTunerOptions) (worker.WorkerTuner,
50
62
controller := NewResourceController (options )
51
63
wfSS := & ResourceBasedSlotSupplier {controller : controller ,
52
64
options : defaultWorkflowResourceBasedSlotSupplierOptions ()}
65
+ if opts .WorkflowRampThrottle != 0 {
66
+ wfSS .options .RampThrottle = opts .WorkflowRampThrottle
67
+ }
53
68
actSS := & ResourceBasedSlotSupplier {controller : controller ,
54
69
options : defaultActivityResourceBasedSlotSupplierOptions ()}
70
+ if opts .ActivityRampThrottle != 0 {
71
+ actSS .options .RampThrottle = opts .ActivityRampThrottle
72
+ }
55
73
laSS := & ResourceBasedSlotSupplier {controller : controller ,
56
74
options : defaultActivityResourceBasedSlotSupplierOptions ()}
75
+ if opts .ActivityRampThrottle != 0 {
76
+ laSS .options .RampThrottle = opts .ActivityRampThrottle
77
+ }
57
78
nexusSS := & ResourceBasedSlotSupplier {controller : controller ,
58
79
options : defaultWorkflowResourceBasedSlotSupplierOptions ()}
59
80
compositeTuner , err := worker .NewCompositeTuner (worker.CompositeTunerOptions {
@@ -163,7 +184,7 @@ func (r *ResourceBasedSlotSupplier) TryReserveSlot(info worker.SlotReservationIn
163
184
numIssued := info .NumIssuedSlots ()
164
185
if numIssued < r .options .MinSlots || (numIssued < r .options .MaxSlots &&
165
186
time .Since (r .lastSlotIssuedAt ) > r .options .RampThrottle ) {
166
- decision , err := r .controller .pidDecision ()
187
+ decision , err := r .controller .pidDecision (info . Logger () )
167
188
if err != nil {
168
189
info .Logger ().Error ("Error calculating resource usage" , "error" , err )
169
190
return nil
@@ -188,10 +209,14 @@ func (r *ResourceBasedSlotSupplier) MaxSlots() int {
188
209
type SystemInfoSupplier interface {
189
210
// GetMemoryUsage returns the current system memory usage as a fraction of total memory between
190
211
// 0 and 1.
191
- GetMemoryUsage () (float64 , error )
212
+ GetMemoryUsage (infoContext * SystemInfoContext ) (float64 , error )
192
213
// GetCpuUsage returns the current system CPU usage as a fraction of total CPU usage between 0
193
214
// and 1.
194
- GetCpuUsage () (float64 , error )
215
+ GetCpuUsage (infoContext * SystemInfoContext ) (float64 , error )
216
+ }
217
+
218
+ type SystemInfoContext struct {
219
+ Logger log.Logger
195
220
}
196
221
197
222
// ResourceControllerOptions contains configurable parameters for a ResourceController.
@@ -262,7 +287,9 @@ type ResourceController struct {
262
287
func NewResourceController (options ResourceControllerOptions ) * ResourceController {
263
288
var infoSupplier SystemInfoSupplier
264
289
if options .InfoSupplier == nil {
265
- infoSupplier = & psUtilSystemInfoSupplier {}
290
+ infoSupplier = & psUtilSystemInfoSupplier {
291
+ cGroupInfo : newCGroupInfo (),
292
+ }
266
293
} else {
267
294
infoSupplier = options .InfoSupplier
268
295
}
@@ -286,23 +313,22 @@ func NewResourceController(options ResourceControllerOptions) *ResourceControlle
286
313
}
287
314
}
288
315
289
- func (rc * ResourceController ) pidDecision () (bool , error ) {
316
+ func (rc * ResourceController ) pidDecision (logger log. Logger ) (bool , error ) {
290
317
rc .mu .Lock ()
291
318
defer rc .mu .Unlock ()
292
319
293
- memUsage , err := rc .infoSupplier .GetMemoryUsage ()
320
+ memUsage , err := rc .infoSupplier .GetMemoryUsage (& SystemInfoContext { Logger : logger } )
294
321
if err != nil {
295
322
return false , err
296
323
}
297
- cpuUsage , err := rc .infoSupplier .GetCpuUsage ()
324
+ cpuUsage , err := rc .infoSupplier .GetCpuUsage (& SystemInfoContext { Logger : logger } )
298
325
if err != nil {
299
326
return false , err
300
327
}
301
328
if memUsage >= rc .options .MemTargetPercent {
302
329
// Never allow going over the memory target
303
330
return false , nil
304
331
}
305
- //fmt.Printf("mem: %f, cpu: %f\n", memUsage, cpuUsage)
306
332
elapsedTime := time .Since (rc .lastRefresh )
307
333
// This shouldn't be possible with real implementations, but if the elapsed time is 0 the
308
334
// PID controller can produce NaNs.
@@ -326,27 +352,54 @@ func (rc *ResourceController) pidDecision() (bool, error) {
326
352
}
327
353
328
354
type psUtilSystemInfoSupplier struct {
329
- mu sync.Mutex
355
+ logger log.Logger
356
+ mu sync.Mutex
357
+ lastRefresh time.Time
358
+
330
359
lastMemStat * mem.VirtualMemoryStat
331
360
lastCpuUsage float64
332
- lastRefresh time.Time
361
+
362
+ stopTryingToGetCGroupInfo bool
363
+ cGroupInfo cGroupInfo
333
364
}
334
365
335
- func (p * psUtilSystemInfoSupplier ) GetMemoryUsage () (float64 , error ) {
336
- if err := p .maybeRefresh (); err != nil {
366
+ type cGroupInfo interface {
367
+ // Update requests an update of the cgroup stats. This is a no-op if not in a cgroup. Returns
368
+ // true if cgroup stats should continue to be updated, false if not in a cgroup or the returned
369
+ // error is considered unrecoverable.
370
+ Update () (bool , error )
371
+ // GetLastMemUsage returns last known memory usage as a fraction of the cgroup limit. 0 if not
372
+ // in a cgroup or limit is not set.
373
+ GetLastMemUsage () float64
374
+ // GetLastCPUUsage returns last known CPU usage as a fraction of the cgroup limit. 0 if not in a
375
+ // cgroup or limit is not set.
376
+ GetLastCPUUsage () float64
377
+ }
378
+
379
+ func (p * psUtilSystemInfoSupplier ) GetMemoryUsage (infoContext * SystemInfoContext ) (float64 , error ) {
380
+ if err := p .maybeRefresh (infoContext ); err != nil {
337
381
return 0 , err
338
382
}
383
+ lastCGroupMem := p .cGroupInfo .GetLastMemUsage ()
384
+ if lastCGroupMem != 0 {
385
+ return lastCGroupMem , nil
386
+ }
339
387
return p .lastMemStat .UsedPercent / 100 , nil
340
388
}
341
389
342
- func (p * psUtilSystemInfoSupplier ) GetCpuUsage () (float64 , error ) {
343
- if err := p .maybeRefresh (); err != nil {
390
+ func (p * psUtilSystemInfoSupplier ) GetCpuUsage (infoContext * SystemInfoContext ) (float64 , error ) {
391
+ if err := p .maybeRefresh (infoContext ); err != nil {
344
392
return 0 , err
345
393
}
394
+
395
+ lastCGroupCPU := p .cGroupInfo .GetLastCPUUsage ()
396
+ if lastCGroupCPU != 0 {
397
+ return lastCGroupCPU , nil
398
+ }
346
399
return p .lastCpuUsage / 100 , nil
347
400
}
348
401
349
- func (p * psUtilSystemInfoSupplier ) maybeRefresh () error {
402
+ func (p * psUtilSystemInfoSupplier ) maybeRefresh (infoContext * SystemInfoContext ) error {
350
403
if time .Since (p .lastRefresh ) < 100 * time .Millisecond {
351
404
return nil
352
405
}
@@ -360,16 +413,24 @@ func (p *psUtilSystemInfoSupplier) maybeRefresh() error {
360
413
defer cancelFn ()
361
414
memStat , err := mem .VirtualMemoryWithContext (ctx )
362
415
if err != nil {
363
- println ("Refresh error: " , err )
364
416
return err
365
417
}
366
418
cpuUsage , err := cpu .PercentWithContext (ctx , 0 , false )
367
419
if err != nil {
368
- println ("Refresh error: " , err )
369
420
return err
370
421
}
422
+
371
423
p .lastMemStat = memStat
372
424
p .lastCpuUsage = cpuUsage [0 ]
425
+
426
+ if runtime .GOOS == "linux" && ! p .stopTryingToGetCGroupInfo {
427
+ continueUpdates , err := p .cGroupInfo .Update ()
428
+ if err != nil {
429
+ infoContext .Logger .Warn ("Failed to get cgroup stats" , "error" , err )
430
+ }
431
+ p .stopTryingToGetCGroupInfo = ! continueUpdates
432
+ }
433
+
373
434
p .lastRefresh = time .Now ()
374
435
return nil
375
436
}
0 commit comments