Skip to content

Commit

Permalink
#9237 add extra aggressive shrinking flag to control if the idle tim…
Browse files Browse the repository at this point in the history
…eout period should be observed or not

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Mar 22, 2023
1 parent c2a93ed commit b0060c6
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private long _stopTimeout;
private Executor _virtualThreadsExecutor;
private int _shrinkCount = 1;
private boolean _aggressiveShrinking;

public QueuedThreadPool()
{
Expand Down Expand Up @@ -562,11 +563,26 @@ public void setIdleTimeoutMaxShrinkCount(int shrinkCount)
/**
* @return the maximum number of idle threads to exit in one idle timeout period
*/
@ManagedAttribute("maximum number of idle threads to exit in one idle timeout period")
public int getIdleTimeoutMaxShrinkCount()
{
return _shrinkCount;
}

public void setAggressiveShrinking(boolean aggressiveShrinking)
{
_aggressiveShrinking = aggressiveShrinking;
}

/**
* @return true if the idle threads should exit before the idle timeout period, false otherwise
*/
@ManagedAttribute("should the idle threads exit before the idle timeout period")
public boolean isAggressiveShrinking()
{
return _aggressiveShrinking;
}

/**
* @return the number of jobs in the queue waiting for a thread
*/
Expand Down Expand Up @@ -1072,12 +1088,20 @@ public void run()
boolean idle = true;
try
{
exit: while (_counts.getHi() > Integer.MIN_VALUE)
while (_counts.getHi() > Integer.MIN_VALUE)
{
try
{
boolean aggressiveShrinking = isAggressiveShrinking();
long idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(getIdleTimeout());
Runnable job = idleJobPoll(idleTimeoutNanos);

if (job == null && !aggressiveShrinking)
{
if (shrinkIfNeeded(idleTimeoutNanos))
break;
}

while (job != null)
{
idle = false;
Expand All @@ -1099,47 +1123,8 @@ public void run()
job = _jobs.poll();
}

// No jobs available, should we shrink?
int threads = getThreads();
if (threads > getMinThreads())
{
// We have an idle timeout and excess threads, so check if we should shrink?
long now = NanoTime.now();
long shrinkPeriod = idleTimeoutNanos / getIdleTimeoutMaxShrinkCount();
if (LOG.isDebugEnabled())
LOG.debug("Shrink check, {} > {} period={}ms {}", threads, getMinThreads(), TimeUnit.NANOSECONDS.toMillis(shrinkPeriod), QueuedThreadPool.this);
while (true)
{
long shrinkThreshold = _shrinkThreshold.get();
long threshold = shrinkThreshold;

// If the threshold is too far in the past,
// advance it to be one idle timeout before now plus a bit of shrink period.
if (NanoTime.elapsed(threshold, now) > idleTimeoutNanos)
threshold = now - idleTimeoutNanos;

// advance the threshold by one shrink period
threshold += shrinkPeriod;

// Is the new threshold in the future?
if (NanoTime.isBefore(now, threshold))
{
// Yes - we cannot shrink yet, so break and continue looking for jobs
if (LOG.isDebugEnabled())
LOG.debug("Shrink skipped, threshold={}ms in future {}", NanoTime.millisElapsed(now, threshold), QueuedThreadPool.this);
break;
}

// We can shrink if we can update the atomic shrink threshold?
if (_shrinkThreshold.compareAndSet(shrinkThreshold, threshold))
{
// Yes - we have shrunk, so exit.
if (LOG.isDebugEnabled())
LOG.debug("SHRUNK, threshold={}ms in past {}", NanoTime.millisElapsed(threshold, now), QueuedThreadPool.this);
break exit;
}
}
}
if (aggressiveShrinking && shrinkIfNeeded(idleTimeoutNanos))
break;
}
catch (InterruptedException e)
{
Expand All @@ -1162,5 +1147,62 @@ public void run()
ensureThreads();
}
}

/**
* @return true if shrunk, false otherwise.
*/
private boolean shrinkIfNeeded(long idleTimeoutNanos)
{
while (true)
{
// No jobs available, should we shrink?
int threads = getThreads();
int minThreads = getMinThreads();
if (threads > minThreads)
{
// We have an idle timeout and excess threads, so check if we should shrink?
long now = NanoTime.now();
long shrinkPeriod = idleTimeoutNanos / getIdleTimeoutMaxShrinkCount();
if (LOG.isDebugEnabled())
LOG.debug("Shrink check, {} > {} period={}ms {}", threads, minThreads, TimeUnit.NANOSECONDS.toMillis(shrinkPeriod), QueuedThreadPool.this);

long shrinkThreshold = _shrinkThreshold.get();
long threshold = shrinkThreshold;

// If the threshold is too far in the past,
// advance it to be one idle timeout before now plus a bit of shrink period.
if (NanoTime.elapsed(threshold, now) > idleTimeoutNanos)
threshold = now - idleTimeoutNanos;

// advance the threshold by one shrink period
threshold += shrinkPeriod;

// Is the new threshold in the future?
if (NanoTime.isBefore(now, threshold))
{
// Yes - we cannot shrink yet, so continue looking for jobs
if (LOG.isDebugEnabled())
LOG.debug("Shrink skipped, threshold={}ms in future {}", NanoTime.millisElapsed(now, threshold), QueuedThreadPool.this);
return false;
}

// We can shrink if we can update the atomic shrink threshold?
if (_shrinkThreshold.compareAndSet(shrinkThreshold, threshold))
{
// Yes - we have shrunk, so exit.
if (LOG.isDebugEnabled())
LOG.debug("SHRUNK, threshold={}ms in past {}", NanoTime.millisElapsed(threshold, now), QueuedThreadPool.this);
return true;
}
}
else
{
// We reached min threads, continue looking for jobs
if (LOG.isDebugEnabled())
LOG.debug("At min threads, {} > {} {}", threads, minThreads, QueuedThreadPool.this);
return false;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,9 @@ public void testRealistic() throws Exception
final Random random = new Random();

QueuedThreadPool qtp = new QueuedThreadPool(2 * spikeThreads, busyThreads / 2);
// This test requires the aggressive shrinking as it keeps all threads busy enough that not a single one
// ever stays idle for the idle timeout period.
qtp.setAggressiveShrinking(true);
qtp.setIdleTimeout(idleTimeout);
qtp.setIdleTimeoutMaxShrinkCount(shrinkCount);
qtp.start();
Expand Down

0 comments on commit b0060c6

Please sign in to comment.