Skip to content

Commit

Permalink
Fixes #9237 - Decouple QTP idleTimeout from pool shrink rate.
Browse files Browse the repository at this point in the history
Introduced `QueuedThreadPool.idleTimeoutMaxShrinkCount` to be the number of idle threads that are exited in one idle timeout.

When set to 1 (the default), the old behavior is reproduced: expiring 1 thread every idle timeout.
When set to larger values, allows to keep around the threads for the idle timeout (in case of further load spikes), but allows to quickly recover OS memory when they are truly idle.

For example, with 2000 threads, 30 seconds idle timeout and idleTimeoutMaxShrinkCount=1, it will take 995 minutes (about 16.5 hrs) to shrink the pool back to 10 threads.
By setting idleTimeoutMaxShrinkCount=100, the thread pool can be shrunk to 10 threads in about 10 minutes.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Mar 14, 2023
1 parent d748fde commit 17391af
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,19 @@ This value represents the maximum number of threads that can be reserved and use
A negative value for `QueuedThreadPool.reservedThreads` means that the actual value will be heuristically derived from the number of CPU cores and `QueuedThreadPool.maxThreads`.
A value of zero for `QueuedThreadPool.reservedThreads` means that reserved threads are disabled, and therefore the xref:pg-arch-threads-execution-strategy-epc[`Execute-Produce-Consume` mode] is never used -- the xref:pg-arch-threads-execution-strategy-pec[`Produce-Execute-Consume` mode] is always used instead.

`QueuedThreadPool` always maintains the number of threads between `QueuedThreadPool.minThreads` and `QueuedThreadPool.maxThreads`; during load spikes the number of thread grows to meet the load demand, and when the load on the system diminishes or the system goes idle, the number of threads shrinks.

Shrinking `QueuedThreadPool` is important in particular in containerized environments, where typically you want to return the memory occupied by the threads to the operative system.
The shrinking of the `QueuedThreadPool` is controlled by two parameters: `QueuedThreadPool.idleTimeout` and `QueuedThreadPool.idleTimeoutMaxShrinkCount`.

`QueuedThreadPool.idleTimeout` indicates how long a thread should stay around when it is idle, waiting for tasks to execute.
The longer the threads stay around, the more ready they are in case of new load spikes on the system; however, they consume resources: a Java platform thread typically allocates 1 MiB of native memory.

`QueuedThreadPool.idleTimeoutMaxShrinkCount` controls how many idle threads are exited for one `QueuedThreadPool.idleTimeout` period.
The larger this value is, the quicker the threads are exited when the `QueuedThreadPool` is idle, and their resources returned to the operative system; however, large values may result in too much thread churning: the `QueuedThreadPool` shrinks too fast and must re-create a lot of threads in case of a new load spike on the system.

A good balance between `QueuedThreadPool.idleTimeout` and `QueuedThreadPool.idleTimeoutMaxShrinkCount` depends on the load profile of your system, and it is often tuned via trial and error.

[[pg-arch-threads-thread-pool-virtual-threads]]
===== Virtual Threads
Virtual threads have been introduced in Java 19 as a preview feature.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="idleTimeoutMaxShrinkCount" type="int"><Property name="jetty.threadPool.idleTimeoutMaxShrinkCount" default="1"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
<Get id="namePrefix" name="name" />
<Call class="java.lang.Thread" name="ofVirtual">
Expand Down
1 change: 1 addition & 0 deletions jetty-server/src/main/config/etc/jetty-threadpool.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="useVirtualThreads" property="jetty.threadPool.useVirtualThreads" />
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="idleTimeoutMaxShrinkCount" type="int"><Property name="jetty.threadPool.idleTimeoutMaxShrinkCount" default="1"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
</New>
</Configure>
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ etc/jetty-threadpool-virtual-preview.xml
## Thread idle timeout (in milliseconds).
#jetty.threadPool.idleTimeout=60000

## The max number of idle threads that are exited in one idleTimeout period.
#jetty.threadPool.idleTimeoutMaxShrinkCount=1

## Whether to output a detailed dump.
#jetty.threadPool.detailedDump=false

Expand Down
3 changes: 3 additions & 0 deletions jetty-server/src/main/config/modules/threadpool.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ etc/jetty-threadpool.xml
## Thread idle timeout (in milliseconds).
#jetty.threadPool.idleTimeout=60000

## The max number of idle threads that are exited in one idleTimeout period.
#jetty.threadPool.idleTimeoutMaxShrinkCount=1

## Whether to output a detailed dump.
#jetty.threadPool.detailedDump=false
# end::documentation[]
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private ThreadPoolBudget _budget;
private long _stopTimeout;
private Executor _virtualThreadsExecutor;
private int _shrinkCount = 1;

public QueuedThreadPool()
{
Expand Down Expand Up @@ -536,6 +537,36 @@ public void setVirtualThreadsExecutor(Executor executor)
}
}

/**
* <p>Returns the maximum number of idle threads that are exited for every idle timeout
* period, thus shrinking this thread pool towards its {@link #getMinThreads() minimum
* number of threads}.
* The default value is {@code 1}.</p>
* <p>For example, consider a thread pool with {@code minThread=2}, {@code maxThread=20},
* {@code idleTimeout=5000} and {@code idleTimeoutShrinkCount=3}.
* Let's assume all 20 threads are executing a task, and they all finish their own tasks
* at the same time and no more tasks are submitted; then, all 20 will wait for an idle
* timeout, after which 3 threads will be exited, while the other 17 will wait another
* idle timeout; then another 3 threads will be exited, and so on until {@code minThreads=2}
* will be reached.</p>
*
* @param shrinkCount the maximum number of idle threads to exit in one idle timeout period
*/
public void setIdleTimeoutMaxShrinkCount(int shrinkCount)
{
if (shrinkCount < 1)
throw new IllegalArgumentException("Invalid shrink count " + shrinkCount);
_shrinkCount = shrinkCount;
}

/**
* @return the maximum number of idle threads to exit in one idle timeout period
*/
public int getIdleTimeoutMaxShrinkCount()
{
return _shrinkCount;
}

/**
* @return the number of jobs in the queue waiting for a thread
*/
Expand Down Expand Up @@ -830,7 +861,6 @@ protected void startThread()
if (LOG.isDebugEnabled())
LOG.debug("Starting {}", thread);
_threads.add(thread);
_lastShrink.set(NanoTime.now());
thread.start();
started = true;
}
Expand Down Expand Up @@ -1010,11 +1040,11 @@ public String toString()

private class Runner implements Runnable
{
private Runnable idleJobPoll(long idleTimeout) throws InterruptedException
private Runnable idleJobPoll(long idleTimeoutNanos) throws InterruptedException
{
if (idleTimeout <= 0)
if (idleTimeoutNanos <= 0)
return _jobs.take();
return _jobs.poll(idleTimeout, TimeUnit.MILLISECONDS);
return _jobs.poll(idleTimeoutNanos, TimeUnit.NANOSECONDS);
}

@Override
Expand All @@ -1027,54 +1057,68 @@ public void run()
try
{
Runnable job = null;
while (true)
exit: while (true)
{
// If we had a job,
if (job != null)
{
// signal that we are idle again,
idle = true;
// signal that we are idle again
if (!addCounts(0, 1))
break;
}
// else check we are still running
// else check we are still running.
else if (_counts.getHi() == Integer.MIN_VALUE)
{
break;
}

try
{
// Look for an immediately available job
job = _jobs.poll();
long idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(getIdleTimeout());
job = idleJobPoll(idleTimeoutNanos);
if (job == null)
{
// No job immediately available maybe we should shrink?
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0 && getThreads() > _minThreads)
// No job available after an idle timeout, maybe we should shrink?
if (idleTimeoutNanos > 0 && getThreads() > getMinThreads())
{
long last = _lastShrink.get();
long now = NanoTime.now();
if (NanoTime.millisElapsed(last, now) > idleTimeout && _lastShrink.compareAndSet(last, now))
long shrinkPeriod = idleTimeoutNanos / getIdleTimeoutMaxShrinkCount();
if (LOG.isDebugEnabled())
LOG.debug("Shrink check, period={}ms {}", TimeUnit.NANOSECONDS.toMillis(shrinkPeriod), QueuedThreadPool.this);
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
long lastShrink = _lastShrink.get();
long prevShrink = lastShrink;
// If the shrink window is too far in the past,
// advance it to be one idle timeout before now.
if (NanoTime.elapsed(lastShrink, now) > idleTimeoutNanos)
prevShrink = now - idleTimeoutNanos;

// Add shrink periods until the window is full.
long nextShrink = prevShrink + shrinkPeriod;
if (NanoTime.isBefore(now, nextShrink))
{
if (LOG.isDebugEnabled())
LOG.debug("Shrink skipped, last={}ms ago {}", NanoTime.millisElapsed(lastShrink, now), QueuedThreadPool.this);
break;
}

// Update the shrink window.
if (_lastShrink.compareAndSet(lastShrink, nextShrink))
{
if (LOG.isDebugEnabled())
LOG.debug("Shrink necessary, last={}ms ago {}", NanoTime.millisElapsed(lastShrink, now), QueuedThreadPool.this);
break exit;
}
}
}

// Wait for a job, only after we have checked if we should shrink
job = idleJobPoll(idleTimeout);

// If still no job?
if (job == null)
// continue to try again
continue;
continue;
}

idle = false;

// run job
// Run the job.
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
runJob(job);
Expand All @@ -1093,7 +1137,7 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
}
finally
{
// Clear any interrupted status
// Clear any thread interrupted status.
Thread.interrupted();
}
}
Expand All @@ -1103,13 +1147,13 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
Thread thread = Thread.currentThread();
removeThread(thread);

// Decrement the total thread count and the idle count if we had no job
// Decrement the total thread count and the idle count if we had no job.
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);

// There is a chance that we shrunk just as a job was queued for us, so
// check again if we have sufficient threads to meet demand
// There is a chance that we shrunk just as a job was queued,
// so check again if we have sufficient threads to meet demand.
ensureThreads();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
Expand Down Expand Up @@ -1022,6 +1023,47 @@ public void testInterruptedStop() throws Exception
assertTrue(stopping._completed.await(5, TimeUnit.SECONDS));
}

@Test
public void testShrinkCount() throws Exception
{
QueuedThreadPool tp = new QueuedThreadPool();
int minThreads = 2;
tp.setMinThreads(minThreads);
int maxThreads = 10;
tp.setMaxThreads(maxThreads);
int idleTimeout = 1000;
tp.setIdleTimeout(idleTimeout);
tp.setIdleTimeoutMaxShrinkCount(3);
tp.start();

waitForThreads(tp, minThreads);
waitForIdle(tp, minThreads);

RunningJob[] jobs = new RunningJob[maxThreads];
for (int i = 0; i < jobs.length; i++)
{
RunningJob job = jobs[i] = new RunningJob("JOB" + i);
tp.execute(job);
assertTrue(job._run.await(1, TimeUnit.SECONDS));
}

for (int i = 0; i < jobs.length; i++)
{
jobs[i]._stopping.countDown();
}

assertEquals(maxThreads, tp.getThreads());

Thread.sleep(idleTimeout + idleTimeout / 2);
assertEquals(maxThreads - tp.getIdleTimeoutMaxShrinkCount(), tp.getThreads());

Thread.sleep(idleTimeout);
assertEquals(maxThreads - 2 * tp.getIdleTimeoutMaxShrinkCount(), tp.getThreads());

Thread.sleep(idleTimeout);
assertEquals(minThreads, tp.getThreads());
}

private int count(String s, String p)
{
int c = 0;
Expand Down

0 comments on commit 17391af

Please sign in to comment.