-
-
Notifications
You must be signed in to change notification settings - Fork 333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement a max jobs per worker budget #4965
Changes from 8 commits
c6d7457
500a9a4
e76edbc
377b254
8ad9388
be59729
999bb9b
6ecaaab
14724bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,12 @@ final class ParallelFileProcessor | |
* @var int | ||
*/ | ||
private const SYSTEM_ERROR_LIMIT = 50; | ||
/** | ||
* the number of chunks a worker can process before getting killed | ||
* | ||
* @var int | ||
*/ | ||
private const MAX_CHUNKS_PER_WORKER = 8; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is a bit ambiguous the different between |
||
|
||
private ProcessPool|null $processPool = null; | ||
|
||
|
@@ -98,10 +104,10 @@ public function process( | |
return; | ||
} | ||
|
||
$job = array_pop($jobs); | ||
$jobsChunk = array_pop($jobs); | ||
$parallelProcess->request([ | ||
ReactCommand::ACTION => Action::MAIN, | ||
Content::FILES => $job, | ||
Content::FILES => $jobsChunk, | ||
]); | ||
}); | ||
}); | ||
|
@@ -137,12 +143,24 @@ public function process( | |
}; | ||
|
||
$timeoutInSeconds = SimpleParameterProvider::provideIntParameter(Option::PARALLEL_JOB_TIMEOUT_IN_SECONDS); | ||
|
||
for ($i = 0; $i < $numberOfProcesses; ++$i) { | ||
// nothing else to process, stop now | ||
if ($jobs === []) { | ||
break; | ||
} | ||
$fileChunksBudgetPerProcess = []; | ||
|
||
$processSpawner = function() use ( | ||
&$systemErrors, | ||
&$fileDiffs, | ||
&$jobs, | ||
$postFileCallback, | ||
&$systemErrorsCount, | ||
&$reachedInternalErrorsCountLimit, | ||
$mainScript, | ||
$input, | ||
$serverPort, | ||
$streamSelectLoop, | ||
$timeoutInSeconds, | ||
$handleErrorCallable, | ||
&$fileChunksBudgetPerProcess, | ||
&$processSpawner | ||
): void { | ||
|
||
$processIdentifier = Random::generate(); | ||
$workerCommandLine = $this->workerCommandLineFactory->create( | ||
|
@@ -153,6 +171,7 @@ public function process( | |
$processIdentifier, | ||
$serverPort, | ||
); | ||
$fileChunksBudgetPerProcess[$processIdentifier] = self::MAX_CHUNKS_PER_WORKER; | ||
|
||
$parallelProcess = new ParallelProcess($workerCommandLine, $streamSelectLoop, $timeoutInSeconds); | ||
|
||
|
@@ -167,7 +186,9 @@ function (array $json) use ( | |
&$systemErrorsCount, | ||
&$collectedDatas, | ||
&$reachedInternalErrorsCountLimit, | ||
$processIdentifier | ||
$processIdentifier, | ||
&$fileChunksBudgetPerProcess, | ||
&$processSpawner | ||
): void { | ||
// decode arrays to objects | ||
foreach ($json[Bridge::SYSTEM_ERRORS] as $jsonError) { | ||
|
@@ -195,16 +216,24 @@ function (array $json) use ( | |
$this->processPool->quitAll(); | ||
} | ||
|
||
if ($fileChunksBudgetPerProcess[$processIdentifier] <= 0) { | ||
// kill the current worker, and spawn a fresh one to free memory | ||
$this->processPool->quitProcess($processIdentifier); | ||
|
||
($processSpawner)(); | ||
return; | ||
} | ||
if ($jobs === []) { | ||
$this->processPool->quitProcess($processIdentifier); | ||
return; | ||
} | ||
|
||
$job = array_pop($jobs); | ||
$jobsChunk = array_pop($jobs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renaming because the previous name suggested it would be a single job |
||
$parallelProcess->request([ | ||
ReactCommand::ACTION => Action::MAIN, | ||
Content::FILES => $job, | ||
Content::FILES => $jobsChunk, | ||
]); | ||
--$fileChunksBudgetPerProcess[$processIdentifier]; | ||
}, | ||
|
||
// 2. callable on error | ||
|
@@ -226,6 +255,15 @@ function ($exitCode, string $stdErr) use (&$systemErrors, $processIdentifier): v | |
); | ||
|
||
$this->processPool->attachProcess($processIdentifier, $parallelProcess); | ||
}; | ||
|
||
for ($i = 0; $i < $numberOfProcesses; ++$i) { | ||
// nothing else to process, stop now | ||
if ($jobs === []) { | ||
break; | ||
} | ||
|
||
($processSpawner)(); | ||
} | ||
|
||
$streamSelectLoop->run(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduced the jobsize, which leads to less memory used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config/config.php
need to be updated as well:rector-src/config/config.php
Line 16 in 038f809
could you also update the different between $jobSize vs
MAX_CHUNKS_PER_WORKER
? Thank you.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dropped the args in this file, as these just reflect the defaults and therefore don't need to be kept in sync
where?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here seems ok
↓
#4965 (review)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍