Skip to content

Commit

Permalink
Added a way to specify a custom monitor, that will not communicate th…
Browse files Browse the repository at this point in the history
…rought

a ZMQ socket.
  • Loading branch information
fbacchella committed Mar 10, 2023
1 parent 113c6e4 commit 2ccad7b
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 146 deletions.
152 changes: 84 additions & 68 deletions src/main/java/zmq/SocketBase.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package zmq;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import zmq.io.IOThread;
import zmq.io.SessionBase;
import zmq.io.net.Address;
Expand All @@ -14,16 +25,6 @@
import zmq.util.Clock;
import zmq.util.MultiMap;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

public abstract class SocketBase extends Own implements IPollEvents, Pipe.IPipeEvents
{
private static class EndpointPipe
Expand All @@ -45,6 +46,29 @@ public String toString()
}
}

/**
* The old consumer that forward events through a socket
*/
private static class SocketEventHandler implements ZMQ.EventConsummer
{
private final SocketBase monitorSocket;

public SocketEventHandler(SocketBase monitorSocket)
{
this.monitorSocket = monitorSocket;
}

public void consume(ZMQ.Event ev)
{
ev.write(monitorSocket);
}

public void close()
{
monitorSocket.close();
}
}

// Map of open endpoints.
private final MultiMap<String, EndpointPipe> endpoints;

Expand Down Expand Up @@ -87,16 +111,13 @@ public String toString()
// File descriptor if applicable
private SocketChannel fileDesc;

// Monitor socket
private SocketBase monitorSocket;

// Bitmask of events being monitored
private int monitorEvents;

// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
protected String connectRid;

private final ReentrantLock monitorSync = new ReentrantLock(false);
private final AtomicReference<ZMQ.EventConsummer> monitor;

// Indicate if the socket is thread safe
private final boolean threadSafe;
Expand All @@ -119,8 +140,8 @@ protected SocketBase(Ctx parent, int tid, int sid, boolean threadSafe)
lastTsc = 0;
ticks = 0;
rcvmore = false;
monitorSocket = null;
monitorEvents = 0;
monitor = new AtomicReference(null);

options.socketId = sid;
options.ipv6 = parent.get(ZMQ.ZMQ_IPV6) != 0;
Expand Down Expand Up @@ -153,8 +174,7 @@ boolean isActive()
@Override
protected void destroy()
{
try {
monitorSync.lock();
synchronized (monitor) {
try {
mailbox.close();
}
Expand All @@ -164,9 +184,6 @@ protected void destroy()
stopMonitor();
assert (destroyed.get());
}
finally {
monitorSync.unlock();
}
}

// Returns the mailbox associated with this socket.
Expand Down Expand Up @@ -1191,15 +1208,10 @@ protected final void processStop()
// We'll remember the fact so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still
// responsible for calling zmq_close on the socket though!
try {
monitorSync.lock();
synchronized (monitor) {
stopMonitor();
ctxTerminated.set(true);
}
finally {
monitorSync.unlock();
}

}

@Override
Expand Down Expand Up @@ -1410,27 +1422,21 @@ private void extractFlags(Msg msg)
* @param events an event mask to monitor.
* @return true if creation succeeded.
* @throws IllegalStateException if a previous monitor was already
* registered.
* registered and consumer is not null.
*/
public final boolean monitor(final String addr, int events)
{
try {
monitorSync.lock();

boolean rc;
synchronized (monitor) {
// To be tested before trying anything
if (ctxTerminated.get()) {
errno.set(ZError.ETERM);
return false;
}

// Support deregistering monitoring endpoints as well
// Support unregistering monitoring endpoints as well
if (addr == null) {
stopMonitor();
return true;
}
if (monitorSocket != null) {
throw new IllegalStateException("Monitor registred twice");
}

SimpleURI uri = SimpleURI.create(addr);

Expand All @@ -1444,35 +1450,51 @@ public final boolean monitor(final String addr, int events)
errno.set(ZError.EPROTONOSUPPORT);
return false;
}

// Register events to monitor
monitorEvents = events;

monitorSocket = getCtx().createSocket(ZMQ.ZMQ_PAIR);
SocketBase monitorSocket = getCtx().createSocket(ZMQ.ZMQ_PAIR);
if (monitorSocket == null) {
return false;
}

// Never block context termination on pending event messages
try {
monitorSocket.setSocketOpt(ZMQ.ZMQ_LINGER, 0);
monitorSocket.setSocketOpt(ZMQ.ZMQ_LINGER, 0);
boolean rc = monitorSocket.bind(addr);
if (rc) {
return monitor(new SocketEventHandler(monitorSocket), events);
}
catch (IllegalArgumentException e) {
stopMonitor();
throw e;
else {
return false;
}
}
}

// Spawn the monitor socket endpoint
rc = monitorSocket.bind(addr);
if (!rc) {
/**
* Register a custom event consumer.
* @param consumer or null for unregister.
* @param events an event mask to monitor.
* @return true if creation succeeded.
* @throws IllegalStateException if a previous monitor was already
* registered and consumer is not null.
*/
public final boolean monitor(ZMQ.EventConsummer consumer, int events)
{
synchronized (monitor) {
if (ctxTerminated.get()) {
errno.set(ZError.ETERM);
return false;
}
// Support unregistering monitoring endpoints as well
if (consumer == null) {
stopMonitor();
}
return rc;
}
finally {
monitorSync.unlock();
else {
if (monitor.get() != null) {
throw new IllegalStateException("Monitor registered twice");
}
monitor.set(consumer);
// Register events to monitor
monitorEvents = events;
}
return true;
}

}

public final void eventHandshaken(String addr, int zmtpVersion)
Expand Down Expand Up @@ -1552,40 +1574,34 @@ public final void eventHandshakeSucceeded(String addr, int errno)

private void event(String addr, Object arg, int event)
{
try {
monitorSync.lock();
if ((monitorEvents & event) == 0 || monitorSocket == null) {
synchronized (monitor) {
if ((monitorEvents & event) == 0 || monitor.get() == null) {
return;
}

monitorEvent(new ZMQ.Event(event, addr, arg));
}
finally {
monitorSync.unlock();
}
}

// Send a monitor event
protected final void monitorEvent(ZMQ.Event event)
{
if (monitorSocket == null) {
if (monitor.get() == null) {
return;
}

event.write(monitorSocket);
monitor.get().consume(event);
}

private void stopMonitor()
{
// this is a private method which is only called from
// contexts where the mutex has been locked before

if (monitorSocket != null) {
if (monitor.get() != null) {
if ((monitorEvents & ZMQ.ZMQ_EVENT_MONITOR_STOPPED) != 0) {
monitorEvent(new ZMQ.Event(ZMQ.ZMQ_EVENT_MONITOR_STOPPED, "", 0));
}
monitorSocket.close();
monitorSocket = null;
monitor.get().close();
monitor.set(null);
monitorEvents = 0;
}
}
Expand Down
28 changes: 24 additions & 4 deletions src/main/java/zmq/ZMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ public class ZMQ
}
}

/**
* A interface used to consume events in monitor
*/
public interface EventConsummer
{
public void consume(Event ev);

/**
* An optional method to close the monitor if needed
*/
default void close()
{
// Default do nothing
}
}

public static class Event
{
private static final int VALUE_INTEGER = 1;
Expand Down Expand Up @@ -298,6 +314,12 @@ private Event(int event, String addr, Object arg, int flag)
}

public boolean write(SocketBase s)
{
Msg msg = new Msg(serialize(s.getCtx()));
return s.send(msg, 0);
}

private ByteBuffer serialize(Ctx ctx)
{
int size = 4 + 1 + addr.length() + 1; // event + len(addr) + addr + flag
if (flag == VALUE_INTEGER || flag == VALUE_CHANNEL) {
Expand All @@ -313,13 +335,11 @@ public boolean write(SocketBase s)
buffer.putInt((Integer) arg);
}
else if (flag == VALUE_CHANNEL) {
int channeldId = s.getCtx().forwardChannel((SelectableChannel) arg);
int channeldId = ctx.forwardChannel((SelectableChannel) arg);
buffer.putInt(channeldId);
}
buffer.flip();

Msg msg = new Msg(buffer);
return s.send(msg, 0);
return buffer;
}

/**
Expand Down

0 comments on commit 2ccad7b

Please sign in to comment.