diff --git a/common/src/main/java/io/netty5/util/Recycler.java b/common/src/main/java/io/netty5/util/Recycler.java index 80a933e983c..530a0021768 100644 --- a/common/src/main/java/io/netty5/util/Recycler.java +++ b/common/src/main/java/io/netty5/util/Recycler.java @@ -16,12 +16,15 @@ package io.netty5.util; import io.netty5.util.concurrent.FastThreadLocal; +import io.netty5.util.concurrent.FastThreadLocalThread; import io.netty5.util.internal.ObjectPool; import io.netty5.util.internal.PlatformDependent; import io.netty5.util.internal.SystemPropertyUtil; import io.netty5.util.internal.logging.InternalLogger; import io.netty5.util.internal.logging.InternalLoggerFactory; + import org.jctools.queues.MessagePassingQueue; +import org.jetbrains.annotations.VisibleForTesting; import java.util.ArrayDeque; import java.util.Queue; @@ -54,6 +57,7 @@ public String toString() { private static final int RATIO; private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD; private static final boolean BLOCKING_POOL; + private static final boolean BATCH_FAST_TL_ONLY; static { // In the future, we might have different maxCapacity for different object types. @@ -74,6 +78,7 @@ public String toString() { RATIO = max(0, SystemPropertyUtil.getInt("io.netty5.recycler.ratio", 8)); BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty5.recycler.blocking", false); + BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty5.recycler.batchFastThreadLocalOnly", true); if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) { @@ -81,11 +86,13 @@ public String toString() { logger.debug("-Dio.netty5.recycler.ratio: disabled"); logger.debug("-Dio.netty5.recycler.chunkSize: disabled"); logger.debug("-Dio.netty5.recycler.blocking: disabled"); + logger.debug("-Dio.netty5.recycler.batchFastThreadLocalOnly: disabled"); } else { logger.debug("-Dio.netty5.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD); logger.debug("-Dio.netty5.recycler.ratio: {}", RATIO); logger.debug("-Dio.netty5.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD); logger.debug("-Dio.netty5.recycler.blocking: {}", BLOCKING_POOL); + logger.debug("-Dio.netty5.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY); } } } @@ -104,6 +111,7 @@ protected void onRemoval(LocalPool value) throws Exception { super.onRemoval(value); MessagePassingQueue> handles = value.pooledHandles; value.pooledHandles = null; + value.owner = null; handles.clear(); } }; @@ -150,9 +158,10 @@ public final T get() { return obj; } + @VisibleForTesting final int threadLocalSize() { LocalPool localPool = threadLocal.getIfExists(); - return localPool == null ? 0 : localPool.pooledHandles.size(); + return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size(); } /** @@ -210,14 +219,21 @@ void toAvailable() { } } - private static final class LocalPool { + private static final class LocalPool implements MessagePassingQueue.Consumer> { private final int ratioInterval; + private final int chunkSize; + private final ArrayDeque> batch; + private volatile Thread owner; private volatile MessagePassingQueue> pooledHandles; private int ratioCounter; @SuppressWarnings("unchecked") LocalPool(int maxCapacity, int ratioInterval, int chunkSize) { this.ratioInterval = ratioInterval; + this.chunkSize = chunkSize; + batch = new ArrayDeque>(chunkSize); + Thread currentThread = Thread.currentThread(); + owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null; if (BLOCKING_POOL) { pooledHandles = new BlockingMessageQueue<>(maxCapacity); } else { @@ -231,7 +247,10 @@ DefaultHandle claim() { if (handles == null) { return null; } - DefaultHandle handle = handles.relaxedPoll(); + if (batch.isEmpty()) { + handles.drain(this, chunkSize); + } + DefaultHandle handle = batch.pollFirst(); if (null != handle) { handle.toClaimed(); } @@ -239,10 +258,18 @@ DefaultHandle claim() { } void release(DefaultHandle handle) { - MessagePassingQueue> handles = pooledHandles; handle.toAvailable(); - if (handles != null) { - handles.relaxedOffer(handle); + Thread owner = this.owner; + if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) { + accept(handle); + } else if (owner != null && owner.getState() == Thread.State.TERMINATED) { + this.owner = null; + pooledHandles = null; + } else { + MessagePassingQueue> handles = pooledHandles; + if (handles != null) { + handles.relaxedOffer(handle); + } } } @@ -253,13 +280,18 @@ DefaultHandle newHandle() { } return null; } + + @Override + public void accept(DefaultHandle e) { + batch.addLast(e); + } } /** * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose. * The implementation relies on synchronised monitor locks for thread-safety. - * The {@code drain} and {@code fill} bulk operations are not supported by this implementation. + * The {@code fill} bulk operation is not supported by this implementation. */ private static final class BlockingMessageQueue implements MessagePassingQueue { private final Queue deque; @@ -334,7 +366,12 @@ public T relaxedPeek() { @Override public int drain(Consumer c, int limit) { - throw new UnsupportedOperationException(); + T obj; + int i = 0; + for (; i < limit && (obj = poll()) != null; i++) { + c.accept(obj); + } + return i; } @Override diff --git a/common/src/test/java/io/netty5/util/RecyclerFastThreadLocalTest.java b/common/src/test/java/io/netty5/util/RecyclerFastThreadLocalTest.java new file mode 100644 index 00000000000..0bd5778854f --- /dev/null +++ b/common/src/test/java/io/netty5/util/RecyclerFastThreadLocalTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty5.util; + +import io.netty5.util.concurrent.FastThreadLocalThread; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(RunInFastThreadLocalThreadExtension.class) +public class RecyclerFastThreadLocalTest extends RecyclerTest { + @NotNull + @Override + protected Thread newThread(Runnable runnable) { + return new FastThreadLocalThread(runnable); + } + + @Override + @Test + @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) + public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception { + final Recycler recycler = newRecycler(1024); + final AtomicBoolean collected = new AtomicBoolean(); + final AtomicReference reference = new AtomicReference(); + Thread thread = new FastThreadLocalThread(() -> { + HandledObject object = recycler.get(); + // Store a reference to the HandledObject to ensure it is not collected when the run method finish. + reference.set(object); + }) { + @Override + protected void finalize() throws Throwable { + super.finalize(); + collected.set(true); + } + }; + assertFalse(collected.get()); + thread.start(); + thread.join(); + + // Null out so it can be collected. + thread = null; + + // Loop until the Thread was collected. If we can not collect it the Test will fail due of a timeout. + while (!collected.get()) { + System.gc(); + System.runFinalization(); + Thread.sleep(50); + } + + // Now call recycle after the Thread was collected to ensure this still works... + reference.getAndSet(null).recycle(); + } +} diff --git a/common/src/test/java/io/netty5/util/RecyclerTest.java b/common/src/test/java/io/netty5/util/RecyclerTest.java index 40ed50de525..782f3750f84 100644 --- a/common/src/test/java/io/netty5/util/RecyclerTest.java +++ b/common/src/test/java/io/netty5/util/RecyclerTest.java @@ -15,6 +15,7 @@ */ package io.netty5.util; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -39,11 +40,11 @@ public class RecyclerTest { - private static Recycler newRecycler(int maxCapacityPerThread) { + protected static Recycler newRecycler(int maxCapacityPerThread) { return newRecycler(maxCapacityPerThread, 8, maxCapacityPerThread >> 1); } - private static Recycler newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) { + protected static Recycler newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) { return new Recycler(maxCapacityPerThread, ratio, chunkSize) { @Override protected HandledObject newObject( @@ -53,13 +54,18 @@ protected HandledObject newObject( }; } + @NotNull + protected Thread newThread(Runnable runnable) { + return new Thread(runnable); + } + @Test @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception { final Recycler recycler = newRecycler(1024); final AtomicBoolean collected = new AtomicBoolean(); final AtomicReference reference = new AtomicReference<>(); - Thread thread = new Thread(() -> { + Thread thread = newThread(() -> { HandledObject object = recycler.get(); // Store a reference to the HandledObject to ensure it is not collected when the run method finish. reference.set(object); @@ -104,11 +110,11 @@ public void testMultipleRecycleAtDifferentThread() throws InterruptedException { Recycler recycler = newRecycler(1024); final HandledObject object = recycler.get(); final AtomicReference exceptionStore = new AtomicReference<>(); - final Thread thread1 = new Thread(object::recycle); + final Thread thread1 = newThread(object::recycle); thread1.start(); thread1.join(); - final Thread thread2 = new Thread(() -> { + final Thread thread2 = newThread(() -> { try { object.recycle(); } catch (IllegalStateException e) { @@ -131,7 +137,7 @@ public void testMultipleRecycleAtDifferentThreadRacing() throws InterruptedExcep final AtomicReference exceptionStore = new AtomicReference<>(); final CountDownLatch countDownLatch = new CountDownLatch(2); - final Thread thread1 = new Thread(new Runnable() { + final Thread thread1 = newThread(new Runnable() { @Override public void run() { try { @@ -148,7 +154,7 @@ public void run() { }); thread1.start(); - final Thread thread2 = new Thread(new Runnable() { + final Thread thread2 = newThread(new Runnable() { @Override public void run() { try { @@ -188,7 +194,7 @@ public void testMultipleRecycleRacing() throws InterruptedException { final AtomicReference exceptionStore = new AtomicReference(); final CountDownLatch countDownLatch = new CountDownLatch(1); - final Thread thread1 = new Thread(new Runnable() { + final Thread thread1 = newThread(new Runnable() { @Override public void run() { try { @@ -295,7 +301,7 @@ public void testRecycleAtDifferentThread() throws Exception { final HandledObject o = recycler.get(); final HandledObject o2 = recycler.get(); - final Thread thread = new Thread(() -> { + final Thread thread = newThread(() -> { o.recycle(); o2.recycle(); }); @@ -311,15 +317,12 @@ public void testRecycleAtTwoThreadsMulti() throws Exception { final Recycler recycler = newRecycler(256); final HandledObject o = recycler.get(); - ExecutorService single = Executors.newSingleThreadExecutor(); + ExecutorService single = Executors.newSingleThreadExecutor(RecyclerTest.this::newThread); final CountDownLatch latch1 = new CountDownLatch(1); - single.execute(new Runnable() { - @Override - public void run() { - o.recycle(); - latch1.countDown(); - } + single.execute(() -> { + o.recycle(); + latch1.countDown(); }); assertTrue(latch1.await(100, TimeUnit.MILLISECONDS)); final HandledObject o2 = recycler.get(); @@ -327,13 +330,10 @@ public void run() { assertSame(o2, o); final CountDownLatch latch2 = new CountDownLatch(1); - single.execute(new Runnable() { - @Override - public void run() { - //The object should be recycled - o2.recycle(); - latch2.countDown(); - } + single.execute(() -> { + //The object should be recycled + o2.recycle(); + latch2.countDown(); }); assertTrue(latch2.await(100, TimeUnit.MILLISECONDS)); @@ -345,7 +345,7 @@ public void run() { @Test public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { - final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY + final int maxCapacity = 4; final Recycler recycler = newRecycler(maxCapacity, 4, 4); // Borrow 2 * maxCapacity objects. @@ -361,9 +361,9 @@ public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { array[i].recycle(); } - final Thread thread = new Thread(() -> { - for (int i = maxCapacity; i < array.length; i ++) { - array[i].recycle(); + final Thread thread = newThread(() -> { + for (int i1 = maxCapacity; i1 < array.length; i1 ++) { + array[i1].recycle(); } }); thread.start(); @@ -402,7 +402,7 @@ protected HandledObject newObject(Recycler.Handle handle) { instancesCount.set(0); // Recycle from other thread. - final Thread thread = new Thread(() -> { + final Thread thread = newThread(() -> { for (HandledObject object: array) { object.recycle(); } diff --git a/common/src/test/java/io/netty5/util/RunInFastThreadLocalThreadExtension.java b/common/src/test/java/io/netty5/util/RunInFastThreadLocalThreadExtension.java new file mode 100644 index 00000000000..657778a7a5f --- /dev/null +++ b/common/src/test/java/io/netty5/util/RunInFastThreadLocalThreadExtension.java @@ -0,0 +1,55 @@ +/* + * Copyright 2023 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty5.util; + +import io.netty5.util.concurrent.FastThreadLocalThread; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.InvocationInterceptor; +import org.junit.jupiter.api.extension.ReflectiveInvocationContext; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Annotate your test class with {@code @ExtendWith(RunInFastThreadLocalThreadExtension.class)} to have all test methods + * run in a {@link io.netty5.util.concurrent.FastThreadLocalThread}. + *

+ * This extension implementation is modified from the JUnit 5 + * + * intercepting invocations example. + */ +public class RunInFastThreadLocalThreadExtension implements InvocationInterceptor { + @Override + public void interceptTestMethod( + final Invocation invocation, + final ReflectiveInvocationContext invocationContext, + final ExtensionContext extensionContext) throws Throwable { + final AtomicReference throwable = new AtomicReference<>(); + Thread thread = new FastThreadLocalThread(() -> { + try { + invocation.proceed(); + } catch (Throwable t) { + throwable.set(t); + } + }); + thread.start(); + thread.join(); + Throwable t = throwable.get(); + if (t != null) { + throw t; + } + } +}