Skip to content

Commit

Permalink
Fix race in NonStickyOrderedEventExecutor that affects inEventLoop(..…
Browse files Browse the repository at this point in the history
….) (netty#13237)

Motivation:

53b6dd4 added support for inEventLoop(...) but did have a bug related to when the saved executing Thread was set to null. This could lead to inEventLoop(...) returning the wrong value.

Modifications:

Replace volatile by AtomicReference to ensure we only set the saved executiong Thread to null when it was not updated by another Thread yet.

Result:

No more races
  • Loading branch information
normanmaurer committed Feb 22, 2023
1 parent 3d8a62a commit 59aa6e6
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what
Expand Down Expand Up @@ -223,7 +224,7 @@ private static final class NonStickyOrderedEventExecutor extends AbstractEventEx
private final AtomicInteger state = new AtomicInteger();
private final int maxTaskExecutePerRun;

private volatile Thread executingThread;
private final AtomicReference<Thread> executingThread = new AtomicReference<Thread>();

NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
super(executor);
Expand All @@ -236,7 +237,8 @@ public void run() {
if (!state.compareAndSet(SUBMITTED, RUNNING)) {
return;
}
executingThread = Thread.currentThread();
Thread current = Thread.currentThread();
executingThread.set(current);
for (;;) {
int i = 0;
try {
Expand All @@ -251,7 +253,8 @@ public void run() {
if (i == maxTaskExecutePerRun) {
try {
state.set(SUBMITTED);
executingThread = null;
// Only set executingThread to null if no other thread did update it yet.
executingThread.compareAndSet(current, null);
executor.execute(this);
return; // done
} catch (Throwable ignore) {
Expand Down Expand Up @@ -279,7 +282,8 @@ public void run() {
// The above cases can be distinguished by performing a
// compareAndSet(NONE, RUNNING). If it returns "false", it is case 1; otherwise it is case 2.
if (tasks.isEmpty() || !state.compareAndSet(NONE, RUNNING)) {
executingThread = null;
// Only set executingThread to null if no other thread did update it yet.
executingThread.compareAndSet(current, null);
return; // done
}
}
Expand All @@ -289,7 +293,7 @@ public void run() {

@Override
public boolean inEventLoop(Thread thread) {
return executingThread == thread;
return executingThread.get() == thread;
}

@Override
Expand Down

0 comments on commit 59aa6e6

Please sign in to comment.