Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Many updates to metadata handling #936

Merged
merged 1 commit into from
Jan 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/main/java/org/zeromq/ZCert.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Arrays;
import java.util.Date;
import java.util.Locale;
import java.util.Map;

import org.zeromq.ZMQ.Curve;
import org.zeromq.ZMQ.Curve.KeyPair;
Expand Down Expand Up @@ -154,8 +155,8 @@ private void add(ZMetadata meta, ZConfig config)
{
String now = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH).format(new Date());
config.addComment(String.format("** Generated on %1$s by ZCert **", now));
for (String key : meta.keySet()) {
config.putValue("metadata/" + key, meta.get(key));
for (Map.Entry<String, String> e : meta.entrySet()) {
config.putValue("metadata/" + e.getKey(), e.getValue());
}
}

Expand Down
92 changes: 92 additions & 0 deletions src/main/java/org/zeromq/ZMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.zeromq.proto.ZPicture;
import zmq.Ctx;
import zmq.Msg;
import zmq.Options;
import zmq.SocketBase;
import zmq.ZError;
Expand Down Expand Up @@ -3251,6 +3252,61 @@ public int connectPeer(String addr)
return base.connectPeer(addr);
}

/**
* Queues a message created from data, so it can be sent.
*
* @param msg the {@link Msg} to send. The message is either a single-part message by itself,
* or the last part of a multi-part message.
* @return true when it has been queued on the socket and ØMQ has assumed responsibility for the message.
* This does not indicate that the message has been transmitted to the network.
*/
public boolean sendMsg(Msg msg)
{
return sendMsg(msg, 0);
}

/**
* Queues a multi-part message created from data, so it can be sent.
*
* @param msg the message to send. further message parts are to follow.
* @return true when it has been queued on the socket and ØMQ has assumed responsibility for the message.
* This does not indicate that the message has been transmitted to the network.
*/
public boolean sendMsgMore(Msg msg)
{
return sendMsg(msg, zmq.ZMQ.ZMQ_SNDMORE);
}

/**
* Queues a message created from data, so it can be sent.
*
* @param msg the {@link Msg} to send. The message is either a single-part message by itself,
* or the last part of a multi-part message.
* @param flags a combination (with + or |) of the flags defined below:
* <ul>
* <li>{@link org.zeromq.ZMQ#DONTWAIT DONTWAIT}:
* For socket types ({@link org.zeromq.ZMQ#DEALER DEALER}, {@link org.zeromq.ZMQ#PUSH PUSH})
* that block when there are no available peers (or all peers have full high-water mark),
* specifies that the operation should be performed in non-blocking mode.
* If the message cannot be queued on the socket, the method shall fail with errno set to EAGAIN.</li>
* <li>{@link org.zeromq.ZMQ#SNDMORE SNDMORE}:
* Specifies that the message being sent is a multi-part message,
* and that further message parts are to follow.</li>
* <li>0 : blocking send of a single-part message or the last of a multi-part message</li>
* </ul>
* @return true when it has been queued on the socket and ØMQ has assumed responsibility for the message.
* This does not indicate that the message has been transmitted to the network.
*/
public boolean sendMsg(Msg msg, int flags)
{
if (base.send(msg, flags)) {
return true;
}

mayRaise();
return false;
}

/**
* Queues a message created from data, so it can be sent.
*
Expand Down Expand Up @@ -3517,6 +3573,42 @@ public boolean sendBinaryPicture(String picture, Object... args)
return new ZPicture().sendBinaryPicture(this, picture, args);
}

/**
* Receives a message.
*
* @return the message received; null on error.
*/
public Msg recvMsg()
{
return recvMsg(0);
}

/**
* Receives a message.
* <p>
* @param flags either:
* <ul>
* <li>{@link org.zeromq.ZMQ#DONTWAIT DONTWAIT}:
* Specifies that the operation should be performed in non-blocking mode.
* If there are no messages available on the specified socket,
* the method shall fail with errno set to EAGAIN and return null.</li>
* <li>0 : receive operation blocks until one message is successfully retrieved,
* or stops when timeout set by {@link #setReceiveTimeOut(int)} expires.</li>
* </ul>
* @return the message received; null on error.
*/
public Msg recvMsg(int flags)
{
zmq.Msg msg = base.recv(flags);

if (msg != null) {
return msg;
}

mayRaise();
return null;
}

/**
* Receives a message.
*
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/org/zeromq/ZSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,20 @@ public int send(byte[] b, int flags)
return -1;
}

public int sendMessage(Msg msg, int flags)
{
if (socketBase.send(msg, flags)) {
return msg.size();
}
mayRaise();
return -1;
}

public int sendMessage(Msg msg)
{
return sendMessage(msg, 0);
}

/**
* Send a frame
*
Expand Down Expand Up @@ -236,6 +250,16 @@ public byte[] receive(int flags)
return msg.data();
}

public Msg receiveMessage()
{
return socketBase.recv(0);
}

public Msg receiveMessage(int flags)
{
return socketBase.recv(flags);
}

public String receiveStringUtf8()
{
return receiveStringUtf8(0);
Expand Down
162 changes: 139 additions & 23 deletions src/main/java/org/zeromq/util/ZMetadata.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.zeromq.util;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;

import org.zeromq.ZConfig;
Expand All @@ -25,31 +29,159 @@ public ZMetadata(Metadata metadata)
this.metadata = metadata;
}

public final Set<String> keySet()
public Set<String> keySet()
{
return metadata.keySet();
}

public final String get(String key)
public String get(String key)
{
return metadata.get(key);
}

public final void set(String key, String value)
public void set(String key, String value)
{
metadata.set(key, value);
metadata.put(key, value);
}

public final void remove(String key)
public void remove(String key)
{
metadata.remove(key);
}

public final byte[] bytes()
public byte[] bytes()
{
return metadata.bytes();
}

public String toString()
{
return metadata.toString();
}

public int hashCode()
{
return metadata.hashCode();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ZMetadata zMetadata = (ZMetadata) o;
return Objects.equals(metadata, zMetadata.metadata);
}

/**
* Returns a {@link Set} view of the properties contained in this metadata.
* The set is backed by the metadata, so changes to the metadata are
* reflected in the set, and vice-versa. If the metadata is modified
* while an iteration over the set is in progress (except through
* the iterator's own {@code remove} operation, or through the
* {@code setValue} operation on a metadata property returned by the
* iterator) the results of the iteration are undefined. The set
* supports element removal, which removes the corresponding
* mapping from the metadata, via the {@code Iterator.remove},
* {@code Set.remove}, {@code removeAll}, {@code retainAll} and
* {@code clear} operations. It does not support the
* {@code add} or {@code addAll} operations.
*
* @return a set view of the properties contained in this metadata
*/
public Set<Entry<String, String>> entrySet()
{
return metadata.entrySet();
}

/**
* Returns a {@link Collection} view of the values contained in this metadata.
* The collection is backed by the metadata, so changes to the map are
* reflected in the collection, and vice-versa. If the metadata is
* modified while an iteration over the collection is in progress
* (except through the iterator's own {@code remove} operation),
* the results of the iteration are undefined. The collection
* supports element removal, which removes the corresponding
* property from the metadata, via the {@code Iterator.remove},
* {@code Collection.remove}, {@code removeAll},
* {@code retainAll} and {@code clear} operations. It does not
* support the {@code add} or {@code addAll} operations.
*
* @return a collection view of the values contained in this map
*/
public Collection<String> values()
{
return metadata.values();
}

public void set(Metadata zapProperties)
{
metadata.set(zapProperties);
}

/**
* Returns {@code true} if this map contains no key-value mappings.
*
* @return {@code true} if this map contains no key-value mappings
*/
public boolean isEmpty()
{
return metadata.isEmpty();
}

/**
* Returns {@code true} if this metada contains the property requested
*
* @param property property the name of the property to be tested.
* @return {@code true} if this metada contains the property
*/
public boolean containsKey(String property)
{
return metadata.containsKey(property);
}

/**
* Removes all the properties.
* The map will be empty after this call returns.
*/
public void clear()
{
metadata.clear();
}

/**
* Returns the number of properties. If it contains more properties
* than {@code Integer.MAX_VALUE} elements, returns {@code Integer.MAX_VALUE}.
*
* @return the number of properties
*/
public int size()
{
return metadata.size();
}

/**
* Serialize metadata to an output stream, using the specifications of the ZMTP protocol
* <pre>
* property = name value
* name = OCTET 1*255name-char
* name-char = ALPHA | DIGIT | "-" | "_" | "." | "+"
* value = 4OCTET *OCTET ; Size in network byte order
* </pre>
*
* @param stream the output stream
* @throws IOException if an I/O error occurs.
* @throws IllegalStateException if one of the properties name size is bigger than 255
*/
public void write(OutputStream stream) throws IOException
{
metadata.write(stream);
}

public static ZMetadata read(String meta)
{
if (meta == null || meta.length() == 0) {
Expand All @@ -62,8 +194,7 @@ public static ZMetadata read(String meta)
return new ZMetadata(data);
}
catch (CharacterCodingException e) {
e.printStackTrace();
return null;
throw new IllegalArgumentException("Not a parsable metadata string");
}
}

Expand All @@ -79,19 +210,4 @@ public static ZMetadata read(ZConfig conf)
}
return metadata;
}

public String toString()
{
return metadata.toString();
}

public int hashCode()
{
return metadata.hashCode();
}

public boolean equals(Object obj)
{
return metadata.equals(obj);
}
}