From f3b75fe50c1ba241c1c7dff29ecf28146bddc7da Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 2 Feb 2023 01:27:41 -0800 Subject: [PATCH] Make releasing objects back to Recycler faster (#13174) Motivation: The Recycler implementation was changed in https://github.com/netty/netty/pull/11858 to rely on an MPSC queue implementation for delivering released objects back to their originating thread local pool. Typically, the release will often happen from the same thread that claimed the object, so the overhead of having a thread-safe release goes to waste. Modification: We add an unsynchronized ArrayDeque for batching claims out of the `pooledHandles`. This amortises `claim` calls. We then also re-introduce the concept of an owner thread (but by default only if said thread is a FastThreadLocalThread), and release directly into the claim `batch` if the release is from the owner thread. Result: The `RecyclerBenchmark.recyclerGetAndRecycle` benchmark sees a 27.4% improvement, and the `RecyclerBenchmark.producerConsumer` benchmark sees a 22.5% improvement. Fixes https://github.com/netty/netty/issues/13153 Co-authored-by: Norman Maurer --- .../main/java/io/netty5/util/Recycler.java | 53 ++++++++++++-- .../util/RecyclerFastThreadLocalTest.java | 73 +++++++++++++++++++ .../java/io/netty5/util/RecyclerTest.java | 56 +++++++------- .../RunInFastThreadLocalThreadExtension.java | 55 ++++++++++++++ 4 files changed, 201 insertions(+), 36 deletions(-) create mode 100644 common/src/test/java/io/netty5/util/RecyclerFastThreadLocalTest.java create mode 100644 common/src/test/java/io/netty5/util/RunInFastThreadLocalThreadExtension.java diff --git a/common/src/main/java/io/netty5/util/Recycler.java b/common/src/main/java/io/netty5/util/Recycler.java index 80a933e983c..530a0021768 100644 --- a/common/src/main/java/io/netty5/util/Recycler.java +++ b/common/src/main/java/io/netty5/util/Recycler.java @@ -16,12 +16,15 @@ package io.netty5.util; import io.netty5.util.concurrent.FastThreadLocal; +import io.netty5.util.concurrent.FastThreadLocalThread; import io.netty5.util.internal.ObjectPool; import io.netty5.util.internal.PlatformDependent; import io.netty5.util.internal.SystemPropertyUtil; import io.netty5.util.internal.logging.InternalLogger; import io.netty5.util.internal.logging.InternalLoggerFactory; + import org.jctools.queues.MessagePassingQueue; +import org.jetbrains.annotations.VisibleForTesting; import java.util.ArrayDeque; import java.util.Queue; @@ -54,6 +57,7 @@ public String toString() { private static final int RATIO; private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD; private static final boolean BLOCKING_POOL; + private static final boolean BATCH_FAST_TL_ONLY; static { // In the future, we might have different maxCapacity for different object types. @@ -74,6 +78,7 @@ public String toString() { RATIO = max(0, SystemPropertyUtil.getInt("io.netty5.recycler.ratio", 8)); BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty5.recycler.blocking", false); + BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty5.recycler.batchFastThreadLocalOnly", true); if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) { @@ -81,11 +86,13 @@ public String toString() { logger.debug("-Dio.netty5.recycler.ratio: disabled"); logger.debug("-Dio.netty5.recycler.chunkSize: disabled"); logger.debug("-Dio.netty5.recycler.blocking: disabled"); + logger.debug("-Dio.netty5.recycler.batchFastThreadLocalOnly: disabled"); } else { logger.debug("-Dio.netty5.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD); logger.debug("-Dio.netty5.recycler.ratio: {}", RATIO); logger.debug("-Dio.netty5.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD); logger.debug("-Dio.netty5.recycler.blocking: {}", BLOCKING_POOL); + logger.debug("-Dio.netty5.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY); } } } @@ -104,6 +111,7 @@ protected void onRemoval(LocalPool value) throws Exception { super.onRemoval(value); MessagePassingQueue> handles = value.pooledHandles; value.pooledHandles = null; + value.owner = null; handles.clear(); } }; @@ -150,9 +158,10 @@ public final T get() { return obj; } + @VisibleForTesting final int threadLocalSize() { LocalPool localPool = threadLocal.getIfExists(); - return localPool == null ? 0 : localPool.pooledHandles.size(); + return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size(); } /** @@ -210,14 +219,21 @@ void toAvailable() { } } - private static final class LocalPool { + private static final class LocalPool implements MessagePassingQueue.Consumer> { private final int ratioInterval; + private final int chunkSize; + private final ArrayDeque> batch; + private volatile Thread owner; private volatile MessagePassingQueue> pooledHandles; private int ratioCounter; @SuppressWarnings("unchecked") LocalPool(int maxCapacity, int ratioInterval, int chunkSize) { this.ratioInterval = ratioInterval; + this.chunkSize = chunkSize; + batch = new ArrayDeque>(chunkSize); + Thread currentThread = Thread.currentThread(); + owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null; if (BLOCKING_POOL) { pooledHandles = new BlockingMessageQueue<>(maxCapacity); } else { @@ -231,7 +247,10 @@ DefaultHandle claim() { if (handles == null) { return null; } - DefaultHandle handle = handles.relaxedPoll(); + if (batch.isEmpty()) { + handles.drain(this, chunkSize); + } + DefaultHandle handle = batch.pollFirst(); if (null != handle) { handle.toClaimed(); } @@ -239,10 +258,18 @@ DefaultHandle claim() { } void release(DefaultHandle handle) { - MessagePassingQueue> handles = pooledHandles; handle.toAvailable(); - if (handles != null) { - handles.relaxedOffer(handle); + Thread owner = this.owner; + if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) { + accept(handle); + } else if (owner != null && owner.getState() == Thread.State.TERMINATED) { + this.owner = null; + pooledHandles = null; + } else { + MessagePassingQueue> handles = pooledHandles; + if (handles != null) { + handles.relaxedOffer(handle); + } } } @@ -253,13 +280,18 @@ DefaultHandle newHandle() { } return null; } + + @Override + public void accept(DefaultHandle e) { + batch.addLast(e); + } } /** * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose. * The implementation relies on synchronised monitor locks for thread-safety. - * The {@code drain} and {@code fill} bulk operations are not supported by this implementation. + * The {@code fill} bulk operation is not supported by this implementation. */ private static final class BlockingMessageQueue implements MessagePassingQueue { private final Queue deque; @@ -334,7 +366,12 @@ public T relaxedPeek() { @Override public int drain(Consumer c, int limit) { - throw new UnsupportedOperationException(); + T obj; + int i = 0; + for (; i < limit && (obj = poll()) != null; i++) { + c.accept(obj); + } + return i; } @Override diff --git a/common/src/test/java/io/netty5/util/RecyclerFastThreadLocalTest.java b/common/src/test/java/io/netty5/util/RecyclerFastThreadLocalTest.java new file mode 100644 index 00000000000..0bd5778854f --- /dev/null +++ b/common/src/test/java/io/netty5/util/RecyclerFastThreadLocalTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty5.util; + +import io.netty5.util.concurrent.FastThreadLocalThread; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(RunInFastThreadLocalThreadExtension.class) +public class RecyclerFastThreadLocalTest extends RecyclerTest { + @NotNull + @Override + protected Thread newThread(Runnable runnable) { + return new FastThreadLocalThread(runnable); + } + + @Override + @Test + @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) + public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception { + final Recycler recycler = newRecycler(1024); + final AtomicBoolean collected = new AtomicBoolean(); + final AtomicReference reference = new AtomicReference(); + Thread thread = new FastThreadLocalThread(() -> { + HandledObject object = recycler.get(); + // Store a reference to the HandledObject to ensure it is not collected when the run method finish. + reference.set(object); + }) { + @Override + protected void finalize() throws Throwable { + super.finalize(); + collected.set(true); + } + }; + assertFalse(collected.get()); + thread.start(); + thread.join(); + + // Null out so it can be collected. + thread = null; + + // Loop until the Thread was collected. If we can not collect it the Test will fail due of a timeout. + while (!collected.get()) { + System.gc(); + System.runFinalization(); + Thread.sleep(50); + } + + // Now call recycle after the Thread was collected to ensure this still works... + reference.getAndSet(null).recycle(); + } +} diff --git a/common/src/test/java/io/netty5/util/RecyclerTest.java b/common/src/test/java/io/netty5/util/RecyclerTest.java index 40ed50de525..782f3750f84 100644 --- a/common/src/test/java/io/netty5/util/RecyclerTest.java +++ b/common/src/test/java/io/netty5/util/RecyclerTest.java @@ -15,6 +15,7 @@ */ package io.netty5.util; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -39,11 +40,11 @@ public class RecyclerTest { - private static Recycler newRecycler(int maxCapacityPerThread) { + protected static Recycler newRecycler(int maxCapacityPerThread) { return newRecycler(maxCapacityPerThread, 8, maxCapacityPerThread >> 1); } - private static Recycler newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) { + protected static Recycler newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) { return new Recycler(maxCapacityPerThread, ratio, chunkSize) { @Override protected HandledObject newObject( @@ -53,13 +54,18 @@ protected HandledObject newObject( }; } + @NotNull + protected Thread newThread(Runnable runnable) { + return new Thread(runnable); + } + @Test @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception { final Recycler recycler = newRecycler(1024); final AtomicBoolean collected = new AtomicBoolean(); final AtomicReference reference = new AtomicReference<>(); - Thread thread = new Thread(() -> { + Thread thread = newThread(() -> { HandledObject object = recycler.get(); // Store a reference to the HandledObject to ensure it is not collected when the run method finish. reference.set(object); @@ -104,11 +110,11 @@ public void testMultipleRecycleAtDifferentThread() throws InterruptedException { Recycler recycler = newRecycler(1024); final HandledObject object = recycler.get(); final AtomicReference exceptionStore = new AtomicReference<>(); - final Thread thread1 = new Thread(object::recycle); + final Thread thread1 = newThread(object::recycle); thread1.start(); thread1.join(); - final Thread thread2 = new Thread(() -> { + final Thread thread2 = newThread(() -> { try { object.recycle(); } catch (IllegalStateException e) { @@ -131,7 +137,7 @@ public void testMultipleRecycleAtDifferentThreadRacing() throws InterruptedExcep final AtomicReference exceptionStore = new AtomicReference<>(); final CountDownLatch countDownLatch = new CountDownLatch(2); - final Thread thread1 = new Thread(new Runnable() { + final Thread thread1 = newThread(new Runnable() { @Override public void run() { try { @@ -148,7 +154,7 @@ public void run() { }); thread1.start(); - final Thread thread2 = new Thread(new Runnable() { + final Thread thread2 = newThread(new Runnable() { @Override public void run() { try { @@ -188,7 +194,7 @@ public void testMultipleRecycleRacing() throws InterruptedException { final AtomicReference exceptionStore = new AtomicReference(); final CountDownLatch countDownLatch = new CountDownLatch(1); - final Thread thread1 = new Thread(new Runnable() { + final Thread thread1 = newThread(new Runnable() { @Override public void run() { try { @@ -295,7 +301,7 @@ public void testRecycleAtDifferentThread() throws Exception { final HandledObject o = recycler.get(); final HandledObject o2 = recycler.get(); - final Thread thread = new Thread(() -> { + final Thread thread = newThread(() -> { o.recycle(); o2.recycle(); }); @@ -311,15 +317,12 @@ public void testRecycleAtTwoThreadsMulti() throws Exception { final Recycler recycler = newRecycler(256); final HandledObject o = recycler.get(); - ExecutorService single = Executors.newSingleThreadExecutor(); + ExecutorService single = Executors.newSingleThreadExecutor(RecyclerTest.this::newThread); final CountDownLatch latch1 = new CountDownLatch(1); - single.execute(new Runnable() { - @Override - public void run() { - o.recycle(); - latch1.countDown(); - } + single.execute(() -> { + o.recycle(); + latch1.countDown(); }); assertTrue(latch1.await(100, TimeUnit.MILLISECONDS)); final HandledObject o2 = recycler.get(); @@ -327,13 +330,10 @@ public void run() { assertSame(o2, o); final CountDownLatch latch2 = new CountDownLatch(1); - single.execute(new Runnable() { - @Override - public void run() { - //The object should be recycled - o2.recycle(); - latch2.countDown(); - } + single.execute(() -> { + //The object should be recycled + o2.recycle(); + latch2.countDown(); }); assertTrue(latch2.await(100, TimeUnit.MILLISECONDS)); @@ -345,7 +345,7 @@ public void run() { @Test public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { - final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY + final int maxCapacity = 4; final Recycler recycler = newRecycler(maxCapacity, 4, 4); // Borrow 2 * maxCapacity objects. @@ -361,9 +361,9 @@ public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { array[i].recycle(); } - final Thread thread = new Thread(() -> { - for (int i = maxCapacity; i < array.length; i ++) { - array[i].recycle(); + final Thread thread = newThread(() -> { + for (int i1 = maxCapacity; i1 < array.length; i1 ++) { + array[i1].recycle(); } }); thread.start(); @@ -402,7 +402,7 @@ protected HandledObject newObject(Recycler.Handle handle) { instancesCount.set(0); // Recycle from other thread. - final Thread thread = new Thread(() -> { + final Thread thread = newThread(() -> { for (HandledObject object: array) { object.recycle(); } diff --git a/common/src/test/java/io/netty5/util/RunInFastThreadLocalThreadExtension.java b/common/src/test/java/io/netty5/util/RunInFastThreadLocalThreadExtension.java new file mode 100644 index 00000000000..657778a7a5f --- /dev/null +++ b/common/src/test/java/io/netty5/util/RunInFastThreadLocalThreadExtension.java @@ -0,0 +1,55 @@ +/* + * Copyright 2023 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty5.util; + +import io.netty5.util.concurrent.FastThreadLocalThread; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.InvocationInterceptor; +import org.junit.jupiter.api.extension.ReflectiveInvocationContext; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Annotate your test class with {@code @ExtendWith(RunInFastThreadLocalThreadExtension.class)} to have all test methods + * run in a {@link io.netty5.util.concurrent.FastThreadLocalThread}. + *

+ * This extension implementation is modified from the JUnit 5 + * + * intercepting invocations example. + */ +public class RunInFastThreadLocalThreadExtension implements InvocationInterceptor { + @Override + public void interceptTestMethod( + final Invocation invocation, + final ReflectiveInvocationContext invocationContext, + final ExtensionContext extensionContext) throws Throwable { + final AtomicReference throwable = new AtomicReference<>(); + Thread thread = new FastThreadLocalThread(() -> { + try { + invocation.proceed(); + } catch (Throwable t) { + throwable.set(t); + } + }); + thread.start(); + thread.join(); + Throwable t = throwable.get(); + if (t != null) { + throw t; + } + } +}