Skip to content

Commit

Permalink
#9237 handle review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Mar 24, 2023
1 parent dc09d49 commit 531452e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if the pool is stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size. Essentially if positive,
* this represents the effective number of idle threads, and if negative it represents the
* demand for more threads, aka the job queue's size.</dd>
* demand for more threads, which is equivalent to the job queue's size.</dd>
* </dl>
*/
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
Expand All @@ -113,7 +113,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private ThreadPoolBudget _budget;
private long _stopTimeout;
private Executor _virtualThreadsExecutor;
private int _shrinkCount = 1;
private int _maxEvictCount = 1;

public QueuedThreadPool()
{
Expand Down Expand Up @@ -543,29 +543,29 @@ public void setVirtualThreadsExecutor(Executor executor)
* number of threads}.
* The default value is {@code 1}.</p>
* <p>For example, consider a thread pool with {@code minThread=2}, {@code maxThread=20},
* {@code idleTimeout=5000} and {@code idleTimeoutShrinkCount=3}.
* {@code idleTimeout=5000} and {@code maxEvictCount=3}.
* Let's assume all 20 threads are executing a task, and they all finish their own tasks
* at the same time and no more tasks are submitted; then, all 20 will wait for an idle
* timeout, after which 3 threads will be exited, while the other 17 will wait another
* idle timeout; then another 3 threads will be exited, and so on until {@code minThreads=2}
* will be reached.</p>
*
* @param shrinkCount the maximum number of idle threads to exit in one idle timeout period
* @param evictCount the maximum number of idle threads to exit in one idle timeout period
*/
public void setIdleTimeoutMaxShrinkCount(int shrinkCount)
public void setMaxEvictCount(int evictCount)
{
if (shrinkCount < 1)
throw new IllegalArgumentException("Invalid shrink count " + shrinkCount);
_shrinkCount = shrinkCount;
if (evictCount < 1)
throw new IllegalArgumentException("Invalid evict count " + evictCount);
_maxEvictCount = evictCount;
}

/**
* @return the maximum number of idle threads to exit in one idle timeout period
*/
@ManagedAttribute("maximum number of idle threads to exit in one idle timeout period")
public int getIdleTimeoutMaxShrinkCount()
public int getMaxEvictCount()
{
return _shrinkCount;
return _maxEvictCount;
}

/**
Expand Down Expand Up @@ -969,10 +969,10 @@ protected void runJob(Runnable job)
}

/**
* <p>Attempts to shrink the thread pool by one thread if {@link #getThreads()} is greater than {@link #getMinThreads()}.</p>
* @return true if shrunk, false otherwise.
* <p>Attempts to evict the current thread from the pool if {@link #getThreads()} is greater than {@link #getMinThreads()}.</p>
* @return true if the current thread was evicted, false otherwise.
*/
protected boolean shrinkIfNeeded()
protected boolean evict()
{
long idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(getIdleTimeout());
while (true)
Expand All @@ -984,7 +984,7 @@ protected boolean shrinkIfNeeded()
{
// We have an idle timeout and excess threads, so check if we should shrink?
long now = NanoTime.now();
long shrinkPeriod = idleTimeoutNanos / getIdleTimeoutMaxShrinkCount();
long shrinkPeriod = idleTimeoutNanos / getMaxEvictCount();
if (LOG.isDebugEnabled())
LOG.debug("Shrink check, {} > {} period={}ms {}", threads, minThreads, TimeUnit.NANOSECONDS.toMillis(shrinkPeriod), QueuedThreadPool.this);

Expand Down Expand Up @@ -1027,23 +1027,6 @@ protected boolean shrinkIfNeeded()
}
}

private void doRunJob(Runnable job)
{
try
{
runJob(job);
}
catch (Throwable e)
{
LOG.warn("Job failed", e);
}
finally
{
// Clear any thread interrupted status.
Thread.interrupted();
}
}

/**
* @return the job queue
*/
Expand Down Expand Up @@ -1134,7 +1117,7 @@ public void run()
boolean idle = true;
try
{
while (_counts.getHi() > Integer.MIN_VALUE)
while (_counts.getHi() != Integer.MIN_VALUE)
{
try
{
Expand Down Expand Up @@ -1162,7 +1145,7 @@ public void run()
job = _jobs.poll();
}

if (shrinkIfNeeded())
if (evict())
break;
}
catch (InterruptedException e)
Expand All @@ -1186,5 +1169,22 @@ public void run()
ensureThreads();
}
}

private void doRunJob(Runnable job)
{
try
{
runJob(job);
}
catch (Throwable e)
{
LOG.warn("Job failed", e);
}
finally
{
// Clear any thread interrupted status.
Thread.interrupted();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ public void testShrinkCount() throws Exception
tp.setMaxThreads(maxThreads);
int idleTimeout = 1000;
tp.setIdleTimeout(idleTimeout);
tp.setIdleTimeoutMaxShrinkCount(3);
tp.setMaxEvictCount(3);
tp.start();

waitForThreads(tp, minThreads);
Expand All @@ -1136,10 +1136,10 @@ public void testShrinkCount() throws Exception
assertEquals(maxThreads, tp.getThreads());

Thread.sleep(idleTimeout + idleTimeout / 2);
assertEquals(maxThreads - tp.getIdleTimeoutMaxShrinkCount(), tp.getThreads());
assertEquals(maxThreads - tp.getMaxEvictCount(), tp.getThreads());

Thread.sleep(idleTimeout);
assertEquals(maxThreads - 2 * tp.getIdleTimeoutMaxShrinkCount(), tp.getThreads());
assertEquals(maxThreads - 2 * tp.getMaxEvictCount(), tp.getThreads());

Thread.sleep(idleTimeout);
assertEquals(minThreads, tp.getThreads());
Expand All @@ -1157,7 +1157,7 @@ public void testRealistic() throws Exception

QueuedThreadPool qtp = new QueuedThreadPool(2 * spikeThreads, busyThreads / 2);
qtp.setIdleTimeout(idleTimeout);
qtp.setIdleTimeoutMaxShrinkCount(shrinkCount);
qtp.setMaxEvictCount(shrinkCount);
qtp.start();

CountDownLatch spike = new CountDownLatch(spikeThreads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@State(Scope.Benchmark)
@Warmup(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
public class QTPBenchmark
public class QueuedThreadPoolBenchmark
{
QueuedThreadPool pool;
private CountDownLatch[] latches;
Expand Down Expand Up @@ -80,7 +80,7 @@ public void test() throws Exception
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(QTPBenchmark.class.getSimpleName())
.include(QueuedThreadPoolBenchmark.class.getSimpleName())
.forks(1)
// .addProfiler(CompilerProfiler.class)
// .addProfiler(LinuxPerfProfiler.class)
Expand Down

0 comments on commit 531452e

Please sign in to comment.