Skip to content

Commit

Permalink
Add Observability instrumentation for Jakarta JMS
Browse files Browse the repository at this point in the history
This commit adds a new `JmsInstrumentation` class that instruments
an instances of a `jakarta.jms.Session` with the Observation API.
This proxies the `MessageProducer` and `MessageConsumer` instances
created by the session and creates dedicated observations:

* `send*` method calls on `MessageProducer` will create `"jms.message.publish"`
  observations.
* when configuring a `MessageListener` on `MessageConsumer` instances returned
by the session, `"jms.message.process"` observations are created when messages
are received and processed by the callback.

Here is how an existing JMS Session instance can be instrumented for observability:

```
Session original = ...
ObservationRegistry registry = ...
Session session = JmsInstrumentation.instrumentSession(original, registry);

Topic topic = session.createTopic("micrometer.test.topic");
MessageProducer producer = session.createProducer(topic);
// this operation will create a "jms.message.publish" observation
producer.send(session.createMessage("test message content"));

MessageConsumer consumer = session.createConsumer(topic);
// when a message is processed by the listener,
// a "jms.message.process" observation is created
consumer.setMessageListener(message -> consumeMessage(message));
```

This change does not instrument `receive` methods on the
`MessageConsumer` as there is little value here. The resulting metric
would only measure the time it takes to receive the message (i.e. not
process it) and there would be no actionable trace, as those methods
return the received `Message` and its processing will not happen in
tracing scope.

Closes gh-4007
  • Loading branch information
bclozel committed Aug 5, 2023
1 parent 46f3809 commit e0661f3
Show file tree
Hide file tree
Showing 19 changed files with 1,601 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ def VERSIONS = [
'io.prometheus:simpleclient_common:latest.release',
'io.prometheus:simpleclient_pushgateway:latest.release',
'io.rest-assured:rest-assured:latest.release',
'jakarta.jms:jakarta.jms-api:3.0.+',
'javax.cache:cache-api:latest.release',
'javax.inject:javax.inject:1',
'javax.servlet:javax.servlet-api:latest.release',
'javax.xml.bind:jaxb-api:2.3.+',
'io.micrometer:context-propagation:1.1.+',
'org.jetbrains.kotlinx:kotlinx-coroutines-core:latest.release',
'net.sf.ehcache:ehcache:latest.release',
'org.apache.activemq:artemis-junit-5:latest.release',
'org.apache.activemq:artemis-jakarta-client:latest.release',
'org.apache.felix:org.apache.felix.framework:7.0.5',
'org.apache.felix:org.apache.felix.scr:2.2.2',
'org.apache.httpcomponents:httpasyncclient:latest.release',
Expand Down
3 changes: 3 additions & 0 deletions micrometer-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ jar {
org.apache.kafka.*;resolution:=dynamic,\
com.codahale.metrics.*;resolution:=dynamic;version="${@}",\
com.google.common.cache.*;resolution:=dynamic;version="${@}",\
jakarta.jms.*;resolution:=dynamic;version="${@}",\
jakarta.servlet.*;resolution:=dynamic;version="${@}",\
javax.servlet.*;resolution:=dynamic;version="${@}",\
io.micrometer.context.*;resolution:=dynamic,\
Expand Down Expand Up @@ -100,6 +101,8 @@ dependencies {
optionalApi 'org.glassfish.jersey.core:jersey-server'
optionalApi 'io.grpc:grpc-api'
optionalApi 'io.netty:netty-transport'
// jakarta JMS
optionalApi 'jakarta.jms:jakarta.jms-api'

// apache httpcomponents monitoring
optionalApi 'org.apache.httpcomponents:httpclient'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2023 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.core.instrument.binder.jms;

import io.micrometer.common.KeyValue;
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.binder.jms.JmsObservationDocumentation.LowCardinalityKeyNames;
import jakarta.jms.*;

import static io.micrometer.core.instrument.binder.jms.JmsObservationDocumentation.*;

/**
* Default implementation for {@link JmsProcessObservationConvention}.
*
* @author Brian Clozel
* @since 1.12.0
*/
public class DefaultJmsProcessObservationConvention implements JmsProcessObservationConvention {

private static final KeyValue DESTINATION_TEMPORARY = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY,
"true");

private static final KeyValue DESTINATION_DURABLE = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY,
"false");

private static final KeyValue EXCEPTION_NONE = KeyValue.of(LowCardinalityKeyNames.EXCEPTION, KeyValue.NONE_VALUE);

private static final KeyValue OPERATION_PROCESS = KeyValue.of(LowCardinalityKeyNames.OPERATION, "process");

private static final KeyValue DESTINATION_NAME_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME,
"unknown");

private static final KeyValue MESSAGE_CONVERSATION_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID,
"unknown");

private static final KeyValue MESSAGE_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, "unknown");

@Override
public String getName() {
return "jms.message.process";
}

@Override
public String getContextualName(JmsProcessObservationContext context) {
return destinationName(context).getValue() + " process";
}

@Override
public KeyValues getLowCardinalityKeyValues(JmsProcessObservationContext context) {
return KeyValues.of(exception(context), OPERATION_PROCESS, temporaryDestination(context));
}

private KeyValue exception(JmsProcessObservationContext context) {
Throwable error = context.getError();
if (error != null) {
String simpleName = error.getClass().getSimpleName();
return KeyValue.of(LowCardinalityKeyNames.EXCEPTION,
!simpleName.isEmpty() ? simpleName : error.getClass().getName());
}
return EXCEPTION_NONE;
}

protected KeyValue temporaryDestination(JmsProcessObservationContext context) {
try {
Message message = context.getCarrier();
Destination destination = message.getJMSDestination();
if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic) {
return DESTINATION_TEMPORARY;
}
return DESTINATION_DURABLE;
}
catch (JMSException exc) {
return DESTINATION_DURABLE;
}
}

@Override
public KeyValues getHighCardinalityKeyValues(JmsProcessObservationContext context) {
return KeyValues.of(correlationId(context), destinationName(context), messageId(context));
}

protected KeyValue correlationId(JmsProcessObservationContext context) {
try {
Message message = context.getCarrier();
if (message.getJMSCorrelationID() == null) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
}
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, context.getCarrier().getJMSCorrelationID());
}
catch (JMSException exc) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
}
}

protected KeyValue destinationName(JmsProcessObservationContext context) {
try {
Destination jmsDestination = context.getCarrier().getJMSDestination();
if (jmsDestination instanceof Queue) {
Queue queue = (Queue) jmsDestination;
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, queue.getQueueName());
}
if (jmsDestination instanceof Topic) {
Topic topic = (Topic) jmsDestination;
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, topic.getTopicName());
}
return DESTINATION_NAME_UNKNOWN;
}
catch (JMSException e) {
return DESTINATION_NAME_UNKNOWN;
}
}

protected KeyValue messageId(JmsProcessObservationContext context) {
try {
Message message = context.getCarrier();
if (message.getJMSMessageID() == null) {
return MESSAGE_ID_UNKNOWN;
}
return KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, context.getCarrier().getJMSMessageID());
}
catch (JMSException exc) {
return MESSAGE_ID_UNKNOWN;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2023 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.core.instrument.binder.jms;

import io.micrometer.common.KeyValue;
import io.micrometer.common.KeyValues;
import jakarta.jms.*;

import static io.micrometer.core.instrument.binder.jms.JmsObservationDocumentation.*;

/**
* Default implementation for {@link JmsPublishObservationConvention}.
*
* @author Brian Clozel
* @since 1.12.0
*/
public class DefaultJmsPublishObservationConvention implements JmsPublishObservationConvention {

private static final KeyValue DESTINATION_TEMPORARY = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY,
"true");

private static final KeyValue DESTINATION_DURABLE = KeyValue.of(LowCardinalityKeyNames.DESTINATION_TEMPORARY,
"false");

private static final KeyValue EXCEPTION_NONE = KeyValue.of(LowCardinalityKeyNames.EXCEPTION, KeyValue.NONE_VALUE);

private static final KeyValue OPERATION_PUBLISH = KeyValue.of(LowCardinalityKeyNames.OPERATION, "publish");

private static final KeyValue MESSAGE_CONVERSATION_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID,
"unknown");

private static final KeyValue DESTINATION_NAME_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME,
"unknown");

private static final KeyValue MESSAGE_ID_UNKNOWN = KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, "unknown");

@Override
public String getName() {
return "jms.message.publish";
}

@Override
public String getContextualName(JmsPublishObservationContext context) {
return destinationName(context).getValue() + " publish";
}

@Override
public KeyValues getLowCardinalityKeyValues(JmsPublishObservationContext context) {
return KeyValues.of(exception(context), OPERATION_PUBLISH, temporaryDestination(context));
}

private KeyValue exception(JmsPublishObservationContext context) {
Throwable error = context.getError();
if (error != null) {
String simpleName = error.getClass().getSimpleName();
return KeyValue.of(LowCardinalityKeyNames.EXCEPTION,
!simpleName.isEmpty() ? simpleName : error.getClass().getName());
}
return EXCEPTION_NONE;
}

protected KeyValue temporaryDestination(JmsPublishObservationContext context) {
Message message = context.getCarrier();
try {
if (message != null) {
Destination destination = message.getJMSDestination();
if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic) {
return DESTINATION_TEMPORARY;
}
}
return DESTINATION_DURABLE;
}
catch (JMSException exc) {
return DESTINATION_DURABLE;
}
}

@Override
public KeyValues getHighCardinalityKeyValues(JmsPublishObservationContext context) {
return KeyValues.of(correlationId(context), destinationName(context), messageId(context));
}

protected KeyValue correlationId(JmsPublishObservationContext context) {
try {
Message message = context.getCarrier();
if (message == null || message.getJMSCorrelationID() == null) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
}
return KeyValue.of(HighCardinalityKeyNames.CONVERSATION_ID, context.getCarrier().getJMSCorrelationID());
}
catch (JMSException exc) {
return MESSAGE_CONVERSATION_ID_UNKNOWN;
}
}

protected KeyValue destinationName(JmsPublishObservationContext context) {
if (context.getCarrier() == null) {
return DESTINATION_NAME_UNKNOWN;
}
try {
Destination jmsDestination = context.getCarrier().getJMSDestination();
if (jmsDestination instanceof Queue) {
Queue queue = (Queue) jmsDestination;
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, queue.getQueueName());
}
else if (jmsDestination instanceof Topic) {
Topic topic = (Topic) jmsDestination;
return KeyValue.of(HighCardinalityKeyNames.DESTINATION_NAME, topic.getTopicName());
}
else {
return DESTINATION_NAME_UNKNOWN;
}
}
catch (JMSException e) {
return DESTINATION_NAME_UNKNOWN;
}
}

protected KeyValue messageId(JmsPublishObservationContext context) {
try {
Message message = context.getCarrier();
if (message == null || message.getJMSMessageID() == null) {
return MESSAGE_ID_UNKNOWN;
}
return KeyValue.of(HighCardinalityKeyNames.MESSAGE_ID, message.getJMSMessageID());
}
catch (JMSException exc) {
return MESSAGE_ID_UNKNOWN;
}
}

}

0 comments on commit e0661f3

Please sign in to comment.