Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ctx can now be provided a thread factory #942

Merged
merged 1 commit into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/java/org/zeromq/ZContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.zeromq.ZMQ.Socket;

import zmq.util.Draft;
import zmq.util.function.BiFunction;

/**
* ZContext provides a high-level ZeroMQ context management class
Expand Down Expand Up @@ -371,6 +372,7 @@ public void setSndHWM(int sndhwm)
* Set the handler invoked when a {@link zmq.poll.Poller} abruptly terminates due to an uncaught exception.<p>
* It default to the value of {@link Thread#getDefaultUncaughtExceptionHandler()}
* @param handler The object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler)
{
Expand All @@ -390,6 +392,7 @@ public UncaughtExceptionHandler getUncaughtExceptionHandler()
* be logged.<p>
* Default to {@link Throwable#printStackTrace()}
* @param handler The object to use as this thread's handler for recoverable exceptions notifications.
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setNotificationExceptionHandler(UncaughtExceptionHandler handler)
{
Expand All @@ -404,6 +407,27 @@ public UncaughtExceptionHandler getNotificationExceptionHandler()
return context.getNotificationExceptionHandler();
}

/**
* Used to define a custom thread factory. It can be used to create thread that will be bounded to a CPU for
* performance or tweaks the created thread. It the UncaughtExceptionHandler is not set, the created thread UncaughtExceptionHandler
* will not be changed, so the factory can also be used to set it.
*
* @param threadFactory the thread factory used by {@link zmq.poll.Poller}
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setThreadFactor(BiFunction<Runnable, String, Thread> threadFactory)
{
context.setThreadFactor(threadFactory);
}

/**
* @return the current thread factory
*/
public BiFunction<Runnable, String, Thread> getThreadFactory()
{
return context.getThreadFactory();
}

/**
* @return the main
*/
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/org/zeromq/ZMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import zmq.msg.MsgAllocator;
import zmq.util.Draft;
import zmq.util.Z85;
import zmq.util.function.BiFunction;

/**
* <p>The ØMQ lightweight messaging kernel is a library which extends the standard socket interfaces
Expand Down Expand Up @@ -569,6 +570,7 @@ public int getIOThreads()

/**
* Set the size of the 0MQ thread pool to handle I/O operations.
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public boolean setIOThreads(int ioThreads)
{
Expand All @@ -585,6 +587,7 @@ public int getMaxSockets()

/**
* Sets the maximum number of sockets allowed on the context
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public boolean setMaxSockets(int maxSockets)
{
Expand Down Expand Up @@ -629,6 +632,7 @@ public boolean setIPv6(boolean ipv6)
* Set the handler invoked when a {@link zmq.poll.Poller} abruptly terminates due to an uncaught exception.<p>
* It default to the value of {@link Thread#getDefaultUncaughtExceptionHandler()}
* @param handler The object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler)
{
Expand All @@ -648,6 +652,7 @@ public UncaughtExceptionHandler getUncaughtExceptionHandler()
* be logged.<p>
* Default to {@link Throwable#printStackTrace()}
* @param handler The object to use as this thread's handler for recoverable exceptions notifications.
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setNotificationExceptionHandler(UncaughtExceptionHandler handler)
{
Expand All @@ -662,6 +667,27 @@ public UncaughtExceptionHandler getNotificationExceptionHandler()
return ctx.getNotificationExceptionHandler();
}

/**
* Used to define a custom thread factory. It can be used to create thread that will be bounded to a CPU for
* performance or tweaks the created thread. It the UncaughtExceptionHandler is not set, the created thread UncaughtExceptionHandler
* will not be changed, so the factory can also be used to set it.
*
* @param threadFactory the thread factory used by {@link zmq.poll.Poller}
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setThreadFactor(BiFunction<Runnable, String, Thread> threadFactory)
{
ctx.setThreadFactory(threadFactory);
}

/**
* @return the current thread factory
*/
public BiFunction<Runnable, String, Thread> getThreadFactory()
{
return ctx.getThreadFactory();
}

/**
* This is an explicit "destructor". It can be called to ensure the corresponding 0MQ
* Context has been disposed of.
Expand Down
82 changes: 69 additions & 13 deletions src/main/java/zmq/Ctx.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
import zmq.socket.Sockets;
import zmq.util.Errno;
import zmq.util.MultiMap;

//Context object encapsulates all the global state associated with
// the library.

import zmq.util.function.BiFunction;

/**
* Context object encapsulates all the global state associated with
* the library.<br/>
* It creates a reaper thread and some IO threads as defined by {@link ZMQ#ZMQ_IO_THREADS}. The thread are created
* using a thread factory that defined the UncaughtExceptionHandler as defined by {@link Ctx#setUncaughtExceptionHandler(UncaughtExceptionHandler)}
* and defined the thread as a daemon. If a custom thread factory is defined with {@link Ctx#setThreadFactory(BiFunction)},
* all that steps must be handled manually.
*/
public class Ctx
{
private static final int WAIT_FOREVER = -1;
Expand Down Expand Up @@ -132,6 +138,9 @@ private enum Side
// Number of I/O threads to launch.
private int ioThreadCount;

// The thread factory used by the poller
private BiFunction<Runnable, String, Thread> threadFactory;

// Does context wait (possibly forever) on termination?
private boolean blocky;

Expand Down Expand Up @@ -185,6 +194,7 @@ public Ctx()
slots = null;
maxSockets = ZMQ.ZMQ_MAX_SOCKETS_DFLT;
ioThreadCount = ZMQ.ZMQ_IO_THREADS_DFLT;
threadFactory = this::createThread;

ipv6 = false;
blocky = true;
Expand Down Expand Up @@ -346,16 +356,23 @@ final void shutdown()
}
}

/**
* Set the handler invoked when a {@link zmq.poll.Poller} abruptly terminates due to an uncaught exception.<p>
* It default to the value of {@link Thread#getDefaultUncaughtExceptionHandler()}
* @param handler The object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
*/
public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler)
private void chechStarted()
{
if (!starting.get()) {
throw new IllegalStateException("Already started");
}
}

/**
* Set the handler invoked when a {@link zmq.poll.Poller} abruptly terminates due to an uncaught exception.<br/>
* It defaults to the value of {@link Thread#getDefaultUncaughtExceptionHandler()}
* @param handler The object to use as this thread's uncaught exception handler. If null then this thread has no
* explicit handler and will use the one defined for the {@link ThreadGroup}.
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler)
{
chechStarted();
exhandler = handler;
}

Expand All @@ -372,12 +389,11 @@ public UncaughtExceptionHandler getUncaughtExceptionHandler()
* be logged.<p>
* Default to {@link Throwable#printStackTrace()}
* @param handler The object to use as this thread's handler for recoverable exceptions notifications.
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setNotificationExceptionHandler(UncaughtExceptionHandler handler)
{
if (!starting.get()) {
throw new IllegalStateException("Already started");
}
chechStarted();
exnotification = handler;
}

Expand All @@ -389,9 +405,40 @@ public UncaughtExceptionHandler getNotificationExceptionHandler()
return exnotification;
}

/**
* Used to define a custom thread factory. It can be used to create thread that will be bounded to a CPU for
* performance or tweaks the created thread. It the UncaughtExceptionHandler is not set, the created thread UncaughtExceptionHandler
* will not be changed, so the factory can also be used to set it.
*
* @param threadFactory the thread factory used by {@link zmq.poll.Poller}
* @throws IllegalStateException If context was already initialized by the creation of a socket
*/
public void setThreadFactory(BiFunction<Runnable, String, Thread> threadFactory)
{
chechStarted();
this.threadFactory = threadFactory;
}

/**
* @return the current thread factory
*/
public BiFunction<Runnable, String, Thread> getThreadFactory()
{
return threadFactory;
}

/**
* Set an option
* @param option the option to set
* @param optval the option value
* @return true is the option is allowed for a context and the value is valid for the option
* @throws IllegalStateException If context was already initialized by the creation of a socket, and the
* option can't be changed.
*/
public boolean set(int option, int optval)
{
if (option == ZMQ.ZMQ_MAX_SOCKETS && optval >= 1) {
chechStarted();
optSync.lock();
try {
maxSockets = optval;
Expand All @@ -401,6 +448,7 @@ public boolean set(int option, int optval)
}
}
else if (option == ZMQ.ZMQ_IO_THREADS && optval >= 0) {
chechStarted();
optSync.lock();
try {
ioThreadCount = optval;
Expand Down Expand Up @@ -871,4 +919,12 @@ private void cleanForwarded()
forwardHolder.map.remove(handle);
}
}

private Thread createThread(Runnable target, String name)
{
Thread t = new Thread(target, name);
t.setDaemon(true);
t.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
return t;
}
}
3 changes: 1 addition & 2 deletions src/main/java/zmq/poll/Poller.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ public String toString()

public Poller(Ctx ctx, String name)
{
super(name);
super(name, ctx.getThreadFactory());
this.ctx = ctx;
worker.setUncaughtExceptionHandler(ctx.getUncaughtExceptionHandler());
exnotification = ctx.getNotificationExceptionHandler();
fdTable = new HashSet<>();
selector = ctx.createSelector();
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/zmq/poll/PollerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import zmq.util.Clock;
import zmq.util.MultiMap;
import zmq.util.function.BiFunction;

abstract class PollerBase implements Runnable
{
Expand Down Expand Up @@ -55,11 +56,11 @@ public String toString()
}
}

// Load of the poller. Currently the number of file descriptors
// Load of the poller. Currently, the number of file descriptors
// registered.
private final AtomicInteger load;
private final AtomicInteger load = new AtomicInteger(0);

private final MultiMap<Long, TimerInfo> timers;
private final MultiMap<Long, TimerInfo> timers = new MultiMap<>();

// the thread where all events will be dispatched. So, the actual IO or Reaper threads.
protected final Thread worker;
Expand All @@ -70,11 +71,13 @@ public String toString()
protected PollerBase(String name)
{
worker = createWorker(name);

load = new AtomicInteger(0);
timers = new MultiMap<>();
}

protected PollerBase(String name, BiFunction<Runnable, String, Thread> threadFactory)
{
worker = threadFactory.apply(this, name);
}

Thread createWorker(String name)
{
Thread worker = new Thread(this, name);
Expand Down
8 changes: 1 addition & 7 deletions src/test/java/zmq/poll/PollerBaseTested.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@ class PollerBaseTested extends PollerBase

PollerBaseTested()
{
super("test");
}

@Override
Thread createWorker(String name)
{
return Thread.currentThread();
super("test", (s, r) -> Thread.currentThread());
}

void clock(long clock)
Expand Down
47 changes: 47 additions & 0 deletions src/test/java/zmq/poll/PollerTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package zmq.poll;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
import java.nio.channels.Selector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -37,4 +39,49 @@ public Selector createSelector()
selectorRef.get().close();
catched.await();
}

@Test(timeout = 1000)
public void catchFailure() throws IOException, InterruptedException
{
CountDownLatch catched = new CountDownLatch(1);
Ctx ctx = new Ctx();
ctx.setUncaughtExceptionHandler((t, e) ->
{
if (e instanceof OutOfMemoryError) {
catched.countDown();
}
});
Poller poller = new Poller(ctx, "test");
Pipe signaler = Pipe.open();
try (Pipe.SourceChannel source = signaler.source();
Pipe.SinkChannel sink = signaler.sink()
) {
source.configureBlocking(false);
Poller.Handle handle = poller.addHandle(signaler.source(), new IPollEvents()
{
@Override
public void inEvent()
{
throw new OutOfMemoryError();
}
});
poller.setPollIn(handle);
poller.start();
sink.write(ByteBuffer.allocate(1));
catched.await();
}
}

@Test(timeout = 1000)
public void threadFactory() throws InterruptedException
{
CountDownLatch catched = new CountDownLatch(1);
Ctx ctx = new Ctx();
ctx.setThreadFactory((r, s) -> {
catched.countDown();
return new Thread(r, s);
});
new Poller(ctx, "test");
catched.await();
}
}