Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved concurrency and thread-safeness in Mailbox #939

Merged
merged 1 commit into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 23 additions & 72 deletions src/main/java/zmq/Mailbox.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,20 @@

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;

import zmq.pipe.YPipe;
import zmq.util.Errno;

public final class Mailbox implements IMailbox
public class Mailbox implements IMailbox
{
// The pipe to store actual commands.
private final YPipe<Command> cpipe;
private final Deque<Command> cpipe;

// Signaler to pass signals from writer thread to reader thread.
// kept it although a ConcurrentLinkedDeque, because the signaler channel is used in many places.
private final Signaler signaler;

// There's only one thread receiving from the mailbox, but there
// is arbitrary number of threads sending. Given that ypipe requires
// synchronized access on both of its endpoints, we have to synchronize
// the sending side.
private final Lock sync;

// True if the underlying pipe is active, i.e. when we are allowed to
// read commands from it.
private boolean active;

// mailbox name, for better debugging
private final String name;

Expand All @@ -34,18 +24,9 @@ public final class Mailbox implements IMailbox
public Mailbox(Ctx ctx, String name, int tid)
{
this.errno = ctx.errno();
cpipe = new YPipe<>(Config.COMMAND_PIPE_GRANULARITY.getValue());
sync = new ReentrantLock();
cpipe = new ConcurrentLinkedDeque<>();
signaler = new Signaler(ctx, tid, errno);

// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
// new command is posted.

Command cmd = cpipe.read();
assert (cmd == null);
active = false;

this.name = name;
}

Expand All @@ -57,69 +38,39 @@ public SelectableChannel getFd()
@Override
public void send(final Command cmd)
{
boolean ok = false;
sync.lock();
try {
cpipe.write(cmd, false);
ok = cpipe.flush();
}
finally {
sync.unlock();
}

if (!ok) {
signaler.send();
}
cpipe.addLast(cmd);
signaler.send();
}

@Override
public Command recv(long timeout)
{
Command cmd;
// Try to get the command straight away.
if (active) {
cmd = cpipe.read();
if (cmd != null) {
return cmd;
Command cmd = cpipe.pollFirst();
while (cmd == null) {
// Wait for signal from the command sender.
boolean rc = signaler.waitEvent(timeout);
if (!rc) {
assert (errno.get() == ZError.EAGAIN || errno.get() == ZError.EINTR) : errno.get();
break;
}

// If there are no more commands available, switch into passive state.
active = false;
}

// Wait for signal from the command sender.
boolean rc = signaler.waitEvent(timeout);
if (!rc) {
assert (errno.get() == ZError.EAGAIN || errno.get() == ZError.EINTR) : errno.get();
return null;
}
// Receive the signal.
signaler.recv();
if (errno.get() == ZError.EINTR) {
break;
}

// Receive the signal.
signaler.recv();
if (errno.get() == ZError.EINTR) {
return null;
// Get a command.
// Another thread may already fetch the command, so loop on it
cmd = cpipe.pollFirst();
}

// Switch into active state.
active = true;

// Get a command.
cmd = cpipe.read();
assert (cmd != null) : "command shall never be null when read";

return cmd;
}

@Override
public void close() throws IOException
{
// TODO: Retrieve and deallocate commands inside the cpipe.

// Work around problem that other threads might still be in our
// send() method, by waiting on the mutex before disappearing.
sync.lock();
sync.unlock();

signaler.close();
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/zmq/MailboxSafe.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

@Deprecated
public class MailboxSafe implements IMailbox
{
// The pipe to store actual commands.
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/zmq/Signaler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ private interface IoOperation<O>
private final Pipe.SinkChannel w;
private final Pipe.SourceChannel r;
private final Selector selector;
private final ByteBuffer wdummy = ByteBuffer.allocate(1);
private final ByteBuffer rdummy = ByteBuffer.allocate(1);
private final ThreadLocal<ByteBuffer> wdummy = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1));
private final ThreadLocal<ByteBuffer> rdummy = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1));

// Selector.selectNow at every sending message doesn't show enough performance
private final AtomicLong wcursor = new AtomicLong(0);
Expand Down Expand Up @@ -68,7 +68,7 @@ private interface IoOperation<O>
private <O> O maksInterrupt(IoOperation<O> operation) throws IOException
{
// This loop try to protect the current thread from external interruption.
// If it happens, it mangle current context internal state.
// If it happens, it mangles current context internal state.
// So it keep trying until it succeed.
// This must only be called on internal IO (using Pipe)
boolean interrupted = Thread.interrupted();
Expand Down Expand Up @@ -131,8 +131,8 @@ void send()

while (nbytes == 0) {
try {
wdummy.clear();
nbytes = maksInterrupt(() -> w.write(wdummy));
wdummy.get().clear();
nbytes = maksInterrupt(() -> w.write(wdummy.get()));
}
catch (IOException e) {
throw new ZError.IOException(e);
Expand Down Expand Up @@ -198,8 +198,8 @@ void recv()
// On windows, there may be a need to try several times until it succeeds
while (nbytes == 0) {
try {
rdummy.clear();
nbytes = maksInterrupt(() -> r.read(rdummy));
rdummy.get().clear();
nbytes = maksInterrupt(() -> r.read(rdummy.get()));
}
catch (ClosedChannelException e) {
errno.set(ZError.EINTR);
Expand Down
46 changes: 2 additions & 44 deletions src/main/java/zmq/SocketBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ public String toString()
// Mutex for synchronize access to the socket in thread safe mode
private final ReentrantLock threadSafeSync;

// Signaler to be used in the reaping stage
private Signaler reaperSignaler;

protected SocketBase(Ctx parent, int tid, int sid)
{
this(parent, tid, sid, false);
Expand Down Expand Up @@ -135,14 +132,8 @@ protected SocketBase(Ctx parent, int tid, int sid, boolean threadSafe)

this.threadSafe = threadSafe;
this.threadSafeSync = new ReentrantLock();
this.reaperSignaler = null;

if (threadSafe) {
mailbox = new MailboxSafe(parent, this.threadSafeSync, "safe-socket-" + sid);
}
else {
mailbox = new Mailbox(parent, "socket-" + sid, tid);
}
mailbox = new Mailbox(parent, "socket-" + sid, tid);
}

// Concrete algorithms for the x- methods are to be defined by
Expand Down Expand Up @@ -170,14 +161,6 @@ protected void destroy()
catch (IOException ignore) {
}

if (reaperSignaler != null) {
try {
reaperSignaler.close();
}
catch (IOException ignored) {
}
}

stopMonitor();
assert (destroyed.get());
}
Expand Down Expand Up @@ -1083,11 +1066,6 @@ public final void close()
lock();

try {
// Remove all existing signalers for thread safe sockets
if (threadSafe) {
((MailboxSafe) mailbox).clearSignalers();
}

// Mark the socket as dead
active = false;

Expand Down Expand Up @@ -1121,23 +1099,7 @@ final void startReaping(Poller poller)
this.poller = poller;
SelectableChannel fd;

if (!threadSafe) {
fd = ((Mailbox) mailbox).getFd();
}
else {
threadSafeSync.lock();
try {
reaperSignaler = new Signaler(getCtx(), getTid(), getCtx().errno());
fd = reaperSignaler.getFd();
((MailboxSafe) mailbox).addSignaler(reaperSignaler);

// Send a signal to make sure reaper handle existing commands
reaperSignaler.send();
}
finally {
threadSafeSync.unlock();
}
}
fd = ((Mailbox) mailbox).getFd();

handle = this.poller.addHandle(fd, this);
this.poller.setPollIn(handle);
Expand Down Expand Up @@ -1357,10 +1319,6 @@ public final void inEvent()
lock();

try {
// If the socket is thread safe we need to unsignal the reaper signaler
if (threadSafe) {
reaperSignaler.recv();
}
enterInEvent();
processCommands(0, false, null);
}
Expand Down