Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
izeye committed Sep 17, 2023
1 parent 9f450a7 commit ef4b709
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected KeyValue correlationId(JmsProcessObservationContext context) {
if (message.getJMSCorrelationID() == null) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
}
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, context.getCarrier().getJMSCorrelationID());
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, message.getJMSCorrelationID());
}
catch (JMSException exc) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
Expand Down Expand Up @@ -130,7 +130,7 @@ protected KeyValue messageId(JmsProcessObservationContext context) {
if (message.getJMSMessageID() == null) {
return MESSAGE_ID_UNKNOWN;
}
return KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, context.getCarrier().getJMSMessageID());
return KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, message.getJMSMessageID());
}
catch (JMSException exc) {
return MESSAGE_ID_UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,29 @@ protected KeyValue correlationId(JmsPublishObservationContext context) {
if (message == null || message.getJMSCorrelationID() == null) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
}
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, context.getCarrier().getJMSCorrelationID());
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, message.getJMSCorrelationID());
}
catch (JMSException exc) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
}
}

protected KeyValue destinationName(JmsPublishObservationContext context) {
if (context.getCarrier() == null) {
Message message = context.getCarrier();
if (message == null) {
return DESTINATION_NAME_UNKNOWN;
}
try {
Destination jmsDestination = context.getCarrier().getJMSDestination();
Destination jmsDestination = message.getJMSDestination();
if (jmsDestination instanceof Queue) {
Queue queue = (Queue) jmsDestination;
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, queue.getQueueName());
}
else if (jmsDestination instanceof Topic) {
if (jmsDestination instanceof Topic) {
Topic topic = (Topic) jmsDestination;
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, topic.getTopicName());
}
else {
return DESTINATION_NAME_UNKNOWN;
}
return DESTINATION_NAME_UNKNOWN;
}
catch (JMSException e) {
return DESTINATION_NAME_UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ private JmsInstrumentation() {
}

/**
* Instrument the {@link Session} given as argument for observability and records
* Instrument the {@link Session} given as argument for observability and record
* observations using the provided Observation registry.
* @param session the target session to proxy for instrumentation.
* @param session the target session to proxy for instrumentation
* @param registry the Observation registry to use
* @return the instrumented session that should be used to record observations.
* @return the instrumented session that should be used to record observations
*/
public static Session instrumentSession(Session session, ObservationRegistry registry) {
SessionInvocationHandler handler = new SessionInvocationHandler(session, registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public enum LowCardinalityKeyNames implements KeyName {

/**
* Name of the exception thrown during the operation, or
* {@value KeyValue#NONE_VALUE}} if no exception happened.
* {@value KeyValue#NONE_VALUE} if no exception happened.
*/
EXCEPTION {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

/**
* {@link ObservationConvention} interface for
* {@link JmsObservationDocumentation#JMS_MESSAGE_PUBLISH JMS message publications}
* operations.
* {@link JmsObservationDocumentation#JMS_MESSAGE_PUBLISH JMS message process} operations.
*
* @author Brian Clozel
* @since 1.12.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

/**
* {@link ObservationConvention} interface for
* {@link JmsObservationDocumentation#JMS_MESSAGE_PUBLISH JMS message publications}
* {@link JmsObservationDocumentation#JMS_MESSAGE_PUBLISH JMS message publication}
* operations.
*
* @author Brian Clozel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MessageConsumerInvocationHandler implements InvocationHandler {

private final ObservationRegistry registry;

public MessageConsumerInvocationHandler(MessageConsumer target, ObservationRegistry registry) {
MessageConsumerInvocationHandler(MessageConsumer target, ObservationRegistry registry) {
this.target = target;
this.registry = registry;
}
Expand All @@ -69,7 +69,7 @@ static class ObservedMessageListener implements MessageListener {

private final ObservationRegistry registry;

public ObservedMessageListener(MessageListener delegate, ObservationRegistry registry) {
ObservedMessageListener(MessageListener delegate, ObservationRegistry registry) {
this.delegate = delegate;
this.registry = registry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ class MessageProducerInvocationHandler implements InvocationHandler {

private final ObservationRegistry registry;

public MessageProducerInvocationHandler(MessageProducer target, ObservationRegistry registry) {
MessageProducerInvocationHandler(MessageProducer target, ObservationRegistry registry) {
this.target = target;
this.registry = registry;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

if ("send".equals(method.getName()) && args[0] != null) {
Message message = findMessageArgument(args);
Observation observation = JmsObservationDocumentation.JMS_MESSAGE_PUBLISH.observation(null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class SessionInvocationHandler implements InvocationHandler {

/**
* Create an invocation handler to be used for proxying a {@link Session}.
* @param session the proxied session.
* @param registry the observation registry used for recording observations.
* @param session the proxied session
* @param registry the observation registry used for recording observations
*/
SessionInvocationHandler(Session session, ObservationRegistry registry) {
this.target = session;
Expand All @@ -60,7 +60,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return Proxy.newProxyInstance(this.target.getClass().getClassLoader(),
new Class[] { MessageProducer.class }, producerHandler);
}
else if (result instanceof MessageConsumer) {
if (result instanceof MessageConsumer) {
MessageConsumer consumer = (MessageConsumer) result;
MessageConsumerInvocationHandler consumerHandler = new MessageConsumerInvocationHandler(consumer,
this.registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*
* @author Brian Clozel
*/
public class JmsInstrumentationTests {
class JmsInstrumentationTests {

@RegisterExtension
EmbeddedActiveMQExtension server = new EmbeddedActiveMQExtension();
Expand All @@ -62,14 +62,13 @@ void setupServer() throws JMSException {
@ParameterizedTest(name = "{index} {0}")
@MethodSource("messageSenders")
void shouldInstrumentSendOperations(String methodName, SessionConsumer sessionConsumer) throws Exception {
Session session = createInstrumentedSession();
// jmsConnection.close();
sessionConsumer.accept(session);
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.publish")
.that()
.hasContextualNameEqualTo("test.send publish");
session.close();
try (Session session = createInstrumentedSession()) {
sessionConsumer.accept(session);
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.publish")
.that()
.hasContextualNameEqualTo("test.send publish");
}
}

static Stream<Arguments> messageSenders() {
Expand Down Expand Up @@ -110,55 +109,55 @@ static Stream<Arguments> messageSenders() {

@Test
void shouldInstrumentSendOperationWhenException() throws Exception {
Session session = createInstrumentedSession();
Topic topic = session.createTopic("test.send");
MessageProducer producer = session.createProducer(topic);
TextMessage message = session.createTextMessage("test content");
jmsConnection.close();
assertThatThrownBy(() -> producer.send(message)).isInstanceOf(jakarta.jms.IllegalStateException.class);
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.publish")
.that()
.hasContextualNameEqualTo("test.send publish")
.hasLowCardinalityKeyValue("exception", "IllegalStateException");
session.close();
try (Session session = createInstrumentedSession()) {
Topic topic = session.createTopic("test.send");
MessageProducer producer = session.createProducer(topic);
TextMessage message = session.createTextMessage("test content");
jmsConnection.close();
assertThatThrownBy(() -> producer.send(message)).isInstanceOf(jakarta.jms.IllegalStateException.class);
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.publish")
.that()
.hasContextualNameEqualTo("test.send publish")
.hasLowCardinalityKeyValue("exception", "IllegalStateException");
}
}

@Test
void shouldInstrumentMessageListener() throws Exception {
Session session = createInstrumentedSession();
Topic topic = session.createTopic("test.send");
CountDownLatch latch = new CountDownLatch(1);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> latch.countDown());
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("test send"));
assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.process")
.that()
.hasContextualNameEqualTo("test.send process");
session.close();
try (Session session = createInstrumentedSession()) {
Topic topic = session.createTopic("test.send");
CountDownLatch latch = new CountDownLatch(1);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> latch.countDown());
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("test send"));
assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.process")
.that()
.hasContextualNameEqualTo("test.send process");
}
}

@Test
void shouldInstrumentMessageListenerWhenException() throws Exception {
Session session = createInstrumentedSession();
Topic topic = session.createTopic("test.send");
CountDownLatch latch = new CountDownLatch(1);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> {
latch.countDown();
throw new java.lang.IllegalStateException("test error");
});
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("test send"));
assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.process")
.that()
.hasLowCardinalityKeyValue("exception", "IllegalStateException");
session.close();
try (Session session = createInstrumentedSession()) {
Topic topic = session.createTopic("test.send");
CountDownLatch latch = new CountDownLatch(1);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> {
latch.countDown();
throw new java.lang.IllegalStateException("test error");
});
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("test send"));
assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
TestObservationRegistryAssert.assertThat(registry)
.hasObservationWithNameEqualTo("jms.message.process")
.that()
.hasLowCardinalityKeyValue("exception", "IllegalStateException");
}
}

private Session createInstrumentedSession() throws JMSException {
Expand Down

0 comments on commit ef4b709

Please sign in to comment.