Skip to content

Commit

Permalink
Implement a max jobs per worker budget (#4965)
Browse files Browse the repository at this point in the history
* Implement a max jobs per worker budget

* Update ParallelFileProcessor.php

* fix

* docs

* reduce default job size

* better naming

* Update ParallelFileProcessor.php

* remove defaults from rector config

* Update ParallelFileProcessor.php
  • Loading branch information
staabm committed Sep 10, 2023
1 parent 038f809 commit 0717306
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 13 deletions.
2 changes: 1 addition & 1 deletion config/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

$rectorConfig->autoloadPaths([]);
$rectorConfig->bootstrapFiles([]);
$rectorConfig->parallel(120, 16, 20);
$rectorConfig->parallel();

// to avoid autoimporting out of the box
$rectorConfig->importNames(false, false);
Expand Down
2 changes: 1 addition & 1 deletion packages/Config/RectorConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function disableParallel(): void
SimpleParameterProvider::setParameter(Option::PARALLEL, false);
}

public function parallel(int $seconds = 120, int $maxNumberOfProcess = 16, int $jobSize = 20): void
public function parallel(int $seconds = 120, int $maxNumberOfProcess = 16, int $jobSize = 15): void
{
SimpleParameterProvider::setParameter(Option::PARALLEL, true);
SimpleParameterProvider::setParameter(Option::PARALLEL_JOB_TIMEOUT_IN_SECONDS, $seconds);
Expand Down
61 changes: 50 additions & 11 deletions packages/Parallel/Application/ParallelFileProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ final class ParallelFileProcessor
* @var int
*/
private const SYSTEM_ERROR_LIMIT = 50;
/**
* The number of chunks a worker can process before getting killed.
* In contrast the jobSize defines the maximum size of a chunk, a worker process at a time.
*
* @var int
*/
private const MAX_CHUNKS_PER_WORKER = 8;

private ProcessPool|null $processPool = null;

Expand Down Expand Up @@ -98,10 +105,10 @@ public function process(
return;
}

$job = array_pop($jobs);
$jobsChunk = array_pop($jobs);
$parallelProcess->request([
ReactCommand::ACTION => Action::MAIN,
Content::FILES => $job,
Content::FILES => $jobsChunk,
]);
});
});
Expand Down Expand Up @@ -137,12 +144,24 @@ public function process(
};

$timeoutInSeconds = SimpleParameterProvider::provideIntParameter(Option::PARALLEL_JOB_TIMEOUT_IN_SECONDS);

for ($i = 0; $i < $numberOfProcesses; ++$i) {
// nothing else to process, stop now
if ($jobs === []) {
break;
}
$fileChunksBudgetPerProcess = [];

$processSpawner = function() use (
&$systemErrors,
&$fileDiffs,
&$jobs,
$postFileCallback,
&$systemErrorsCount,
&$reachedInternalErrorsCountLimit,
$mainScript,
$input,
$serverPort,
$streamSelectLoop,
$timeoutInSeconds,
$handleErrorCallable,
&$fileChunksBudgetPerProcess,
&$processSpawner
): void {

$processIdentifier = Random::generate();
$workerCommandLine = $this->workerCommandLineFactory->create(
Expand All @@ -153,6 +172,7 @@ public function process(
$processIdentifier,
$serverPort,
);
$fileChunksBudgetPerProcess[$processIdentifier] = self::MAX_CHUNKS_PER_WORKER;

$parallelProcess = new ParallelProcess($workerCommandLine, $streamSelectLoop, $timeoutInSeconds);

Expand All @@ -167,7 +187,9 @@ function (array $json) use (
&$systemErrorsCount,
&$collectedDatas,
&$reachedInternalErrorsCountLimit,
$processIdentifier
$processIdentifier,
&$fileChunksBudgetPerProcess,
&$processSpawner
): void {
// decode arrays to objects
foreach ($json[Bridge::SYSTEM_ERRORS] as $jsonError) {
Expand Down Expand Up @@ -195,16 +217,24 @@ function (array $json) use (
$this->processPool->quitAll();
}

if ($fileChunksBudgetPerProcess[$processIdentifier] <= 0) {
// kill the current worker, and spawn a fresh one to free memory
$this->processPool->quitProcess($processIdentifier);

($processSpawner)();
return;
}
if ($jobs === []) {
$this->processPool->quitProcess($processIdentifier);
return;
}

$job = array_pop($jobs);
$jobsChunk = array_pop($jobs);
$parallelProcess->request([
ReactCommand::ACTION => Action::MAIN,
Content::FILES => $job,
Content::FILES => $jobsChunk,
]);
--$fileChunksBudgetPerProcess[$processIdentifier];
},

// 2. callable on error
Expand All @@ -226,6 +256,15 @@ function ($exitCode, string $stdErr) use (&$systemErrors, $processIdentifier): v
);

$this->processPool->attachProcess($processIdentifier, $parallelProcess);
};

for ($i = 0; $i < $numberOfProcesses; ++$i) {
// nothing else to process, stop now
if ($jobs === []) {
break;
}

($processSpawner)();
}

$streamSelectLoop->run();
Expand Down

0 comments on commit 0717306

Please sign in to comment.