From 90de5ced4e4c6bcab84937b5963dd102d70775a8 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 31 Jan 2023 10:29:17 -0800 Subject: [PATCH] Make releasing objects back to Recycler faster Motivation: The Recycler implementation was changed in https://github.com/netty/netty/pull/11858 to rely on an MPSC queue implementation for delivering released objects back to their originating thread local pool. Typically, the release will often happen from the same thread that claimed the object, so the overhead of having a thread-safe release goes to waste. Modification: We add an unsynchronized ArrayDeque for batching claims out of the `pooledHandles`. This amortises `claim` calls. We then also re-introduce the concept of an owner thread (but by default only if said thread is a FastThreadLocalThread), and release directly into the claim `batch` if the release is from the owner thread. Result: The `RecyclerBenchmark.recyclerGetAndRecycle` benchmark sees a 27.4% improvement, and the `RecyclerBenchmark.producerConsumer` benchmark sees a 22.5% improvement. Fixes https://github.com/netty/netty/issues/13153 --- .../src/main/java/io/netty/util/Recycler.java | 54 +++++++++++-- .../util/RecyclerFastThreadLocalTest.java | 76 +++++++++++++++++++ .../test/java/io/netty/util/RecyclerTest.java | 46 ++++++----- .../RunInFastThreadLocalThreadExtension.java | 58 ++++++++++++++ 4 files changed, 209 insertions(+), 25 deletions(-) create mode 100644 common/src/test/java/io/netty/util/RecyclerFastThreadLocalTest.java create mode 100644 common/src/test/java/io/netty/util/RunInFastThreadLocalThreadExtension.java diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index dd7b2d73bee..fbba5d5351b 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -16,12 +16,14 @@ package io.netty.util; import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; import io.netty.util.internal.ObjectPool; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.jctools.queues.MessagePassingQueue; +import org.jetbrains.annotations.VisibleForTesting; import java.util.ArrayDeque; import java.util.Queue; @@ -54,6 +56,7 @@ public String toString() { private static final int RATIO; private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD; private static final boolean BLOCKING_POOL; + private static final boolean BATCH_FAST_TL_ONLY; static { // In the future, we might have different maxCapacity for different object types. @@ -74,6 +77,7 @@ public String toString() { RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8)); BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false); + BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true); if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) { @@ -81,11 +85,13 @@ public String toString() { logger.debug("-Dio.netty.recycler.ratio: disabled"); logger.debug("-Dio.netty.recycler.chunkSize: disabled"); logger.debug("-Dio.netty.recycler.blocking: disabled"); + logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled"); } else { logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD); logger.debug("-Dio.netty.recycler.ratio: {}", RATIO); logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD); logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL); + logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY); } } } @@ -104,6 +110,7 @@ protected void onRemoval(LocalPool value) throws Exception { super.onRemoval(value); MessagePassingQueue> handles = value.pooledHandles; value.pooledHandles = null; + value.owner = null; handles.clear(); } }; @@ -195,9 +202,10 @@ public final boolean recycle(T o, Handle handle) { return true; } + @VisibleForTesting final int threadLocalSize() { LocalPool localPool = threadLocal.getIfExists(); - return localPool == null ? 0 : localPool.pooledHandles.size(); + return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size(); } /** @@ -255,14 +263,21 @@ void toAvailable() { } } - private static final class LocalPool { + private static final class LocalPool implements MessagePassingQueue.Consumer> { private final int ratioInterval; + private final int chunkSize; + private final ArrayDeque> batch; + private volatile Thread owner; private volatile MessagePassingQueue> pooledHandles; private int ratioCounter; @SuppressWarnings("unchecked") LocalPool(int maxCapacity, int ratioInterval, int chunkSize) { this.ratioInterval = ratioInterval; + this.chunkSize = chunkSize; + batch = new ArrayDeque>(chunkSize); + Thread currentThread = Thread.currentThread(); + owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null; if (BLOCKING_POOL) { pooledHandles = new BlockingMessageQueue>(maxCapacity); } else { @@ -276,7 +291,10 @@ DefaultHandle claim() { if (handles == null) { return null; } - DefaultHandle handle = handles.relaxedPoll(); + if (batch.isEmpty()) { + handles.drain(this, chunkSize); + } + DefaultHandle handle = batch.pollFirst(); if (null != handle) { handle.toClaimed(); } @@ -285,9 +303,19 @@ DefaultHandle claim() { void release(DefaultHandle handle) { handle.toAvailable(); - MessagePassingQueue> handles = pooledHandles; - if (handles != null) { - handles.relaxedOffer(handle); + Thread owner = this.owner; + if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) { + accept(handle); + } else { + if (owner != null && owner.getState() == Thread.State.TERMINATED) { + this.owner = null; + pooledHandles = null; + } else { + MessagePassingQueue> handles = pooledHandles; + if (handles != null) { + handles.relaxedOffer(handle); + } + } } } @@ -298,13 +326,18 @@ DefaultHandle newHandle() { } return null; } + + @Override + public void accept(DefaultHandle e) { + batch.addLast(e); + } } /** * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose. * The implementation relies on synchronised monitor locks for thread-safety. - * The {@code drain} and {@code fill} bulk operations are not supported by this implementation. + * The {@code fill} bulk operation is not supported by this implementation. */ private static final class BlockingMessageQueue implements MessagePassingQueue { private final Queue deque; @@ -379,7 +412,12 @@ public T relaxedPeek() { @Override public int drain(Consumer c, int limit) { - throw new UnsupportedOperationException(); + T obj; + int i = 0; + for (; i < limit && (obj = poll()) != null; i++) { + c.accept(obj); + } + return i; } @Override diff --git a/common/src/test/java/io/netty/util/RecyclerFastThreadLocalTest.java b/common/src/test/java/io/netty/util/RecyclerFastThreadLocalTest.java new file mode 100644 index 00000000000..7c8d4da113d --- /dev/null +++ b/common/src/test/java/io/netty/util/RecyclerFastThreadLocalTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2023 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util; + +import io.netty.util.concurrent.FastThreadLocalThread; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(RunInFastThreadLocalThreadExtension.class) +public class RecyclerFastThreadLocalTest extends RecyclerTest { + @NotNull + @Override + protected Thread newThread(Runnable runnable) { + return new FastThreadLocalThread(runnable); + } + + @Override + @Test + @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) + public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception { + final Recycler recycler = newRecycler(1024); + final AtomicBoolean collected = new AtomicBoolean(); + final AtomicReference reference = new AtomicReference(); + Thread thread = new FastThreadLocalThread(new Runnable() { + @Override + public void run() { + HandledObject object = recycler.get(); + // Store a reference to the HandledObject to ensure it is not collected when the run method finish. + reference.set(object); + } + }) { + @Override + protected void finalize() throws Throwable { + super.finalize(); + collected.set(true); + } + }; + assertFalse(collected.get()); + thread.start(); + thread.join(); + + // Null out so it can be collected. + thread = null; + + // Loop until the Thread was collected. If we can not collect it the Test will fail due of a timeout. + while (!collected.get()) { + System.gc(); + System.runFinalization(); + Thread.sleep(50); + } + + // Now call recycle after the Thread was collected to ensure this still works... + reference.getAndSet(null).recycle(); + } +} diff --git a/common/src/test/java/io/netty/util/RecyclerTest.java b/common/src/test/java/io/netty/util/RecyclerTest.java index edfd797d0af..49359ff9621 100644 --- a/common/src/test/java/io/netty/util/RecyclerTest.java +++ b/common/src/test/java/io/netty/util/RecyclerTest.java @@ -15,6 +15,7 @@ */ package io.netty.util; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.function.Executable; @@ -23,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,11 +41,11 @@ public class RecyclerTest { - private static Recycler newRecycler(int maxCapacityPerThread) { + protected static Recycler newRecycler(int maxCapacityPerThread) { return newRecycler(maxCapacityPerThread, 8, maxCapacityPerThread >> 1); } - private static Recycler newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) { + protected static Recycler newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) { return new Recycler(maxCapacityPerThread, ratio, chunkSize) { @Override protected HandledObject newObject( @@ -53,6 +55,11 @@ protected HandledObject newObject( }; } + @NotNull + protected Thread newThread(Runnable runnable) { + return new Thread(runnable); + } + @Test @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception { @@ -114,7 +121,7 @@ public void testMultipleRecycleAtDifferentThread() throws InterruptedException { Recycler recycler = newRecycler(1024); final HandledObject object = recycler.get(); final AtomicReference exceptionStore = new AtomicReference(); - final Thread thread1 = new Thread(new Runnable() { + final Thread thread1 = newThread(new Runnable() { @Override public void run() { object.recycle(); @@ -123,7 +130,7 @@ public void run() { thread1.start(); thread1.join(); - final Thread thread2 = new Thread(new Runnable() { + final Thread thread2 = newThread(new Runnable() { @Override public void run() { try { @@ -149,7 +156,7 @@ public void testMultipleRecycleAtDifferentThreadRacing() throws InterruptedExcep final AtomicReference exceptionStore = new AtomicReference(); final CountDownLatch countDownLatch = new CountDownLatch(2); - final Thread thread1 = new Thread(new Runnable() { + final Thread thread1 = newThread(new Runnable() { @Override public void run() { try { @@ -166,7 +173,7 @@ public void run() { }); thread1.start(); - final Thread thread2 = new Thread(new Runnable() { + final Thread thread2 = newThread(new Runnable() { @Override public void run() { try { @@ -206,7 +213,7 @@ public void testMultipleRecycleRacing() throws InterruptedException { final AtomicReference exceptionStore = new AtomicReference(); final CountDownLatch countDownLatch = new CountDownLatch(1); - final Thread thread1 = new Thread(new Runnable() { + final Thread thread1 = newThread(new Runnable() { @Override public void run() { try { @@ -313,13 +320,13 @@ public void testRecycleAtDifferentThread() throws Exception { final HandledObject o = recycler.get(); final HandledObject o2 = recycler.get(); - final Thread thread = new Thread() { + final Thread thread = newThread(new Runnable() { @Override public void run() { o.recycle(); o2.recycle(); } - }; + }); thread.start(); thread.join(); @@ -332,7 +339,12 @@ public void testRecycleAtTwoThreadsMulti() throws Exception { final Recycler recycler = newRecycler(256); final HandledObject o = recycler.get(); - ExecutorService single = Executors.newSingleThreadExecutor(); + ExecutorService single = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(@NotNull Runnable r) { + return RecyclerTest.this.newThread(r); + } + }); final CountDownLatch latch1 = new CountDownLatch(1); single.execute(new Runnable() { @@ -366,7 +378,7 @@ public void run() { @Test public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { - final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY + final int maxCapacity = 4; final Recycler recycler = newRecycler(maxCapacity, 4, 4); // Borrow 2 * maxCapacity objects. @@ -382,14 +394,14 @@ public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { array[i].recycle(); } - final Thread thread = new Thread() { + final Thread thread = newThread(new Runnable() { @Override public void run() { - for (int i = maxCapacity; i < array.length; i ++) { - array[i].recycle(); + for (int i1 = maxCapacity; i1 < array.length; i1++) { + array[i1].recycle(); } } - }; + }); thread.start(); thread.join(); @@ -426,14 +438,14 @@ protected HandledObject newObject(Recycler.Handle handle) { instancesCount.set(0); // Recycle from other thread. - final Thread thread = new Thread() { + final Thread thread = newThread(new Runnable() { @Override public void run() { for (HandledObject object: array) { object.recycle(); } } - }; + }); thread.start(); thread.join(); diff --git a/common/src/test/java/io/netty/util/RunInFastThreadLocalThreadExtension.java b/common/src/test/java/io/netty/util/RunInFastThreadLocalThreadExtension.java new file mode 100644 index 00000000000..5445b83d581 --- /dev/null +++ b/common/src/test/java/io/netty/util/RunInFastThreadLocalThreadExtension.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util; + +import io.netty.util.concurrent.FastThreadLocalThread; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.InvocationInterceptor; +import org.junit.jupiter.api.extension.ReflectiveInvocationContext; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Annotate your test class with {@code @ExtendWith(RunInFastThreadLocalThreadExtension.class)} to have all test methods + * run in a {@link io.netty.util.concurrent.FastThreadLocalThread}. + *

+ * This extension implementation is modified from the JUnit 5 + * + * intercepting invocations example. + */ +public class RunInFastThreadLocalThreadExtension implements InvocationInterceptor { + @Override + public void interceptTestMethod( + final Invocation invocation, + final ReflectiveInvocationContext invocationContext, + final ExtensionContext extensionContext) throws Throwable { + final AtomicReference throwable = new AtomicReference(); + Thread thread = new FastThreadLocalThread(new Runnable() { + @Override + public void run() { + try { + invocation.proceed(); + } catch (Throwable t) { + throwable.set(t); + } + } + }); + thread.start(); + thread.join(); + Throwable t = throwable.get(); + if (t != null) { + throw t; + } + } +}