Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: spring-projects/spring-kafka
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v3.2.7
Choose a base ref
...
head repository: spring-projects/spring-kafka
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v3.2.8
Choose a head ref
  • 11 commits
  • 7 files changed
  • 5 contributors

Commits on Feb 18, 2025

  1. [artifactory-release] Next development version

    spring-builds committed Feb 18, 2025

    Unverified

    This user has not yet uploaded their public signing key.
    Copy the full SHA
    f4dbf9a View commit details

Commits on Mar 5, 2025

  1. GH-3779: Fix KafkaTemplate from hiding error when starting observation

    Fixes: #3779
    
    This fixes that exceptions thrown from `observation.start()` are hidden by `KafkaTemplate` throwing a new exception due to registering observation error without successfully starting the observation.
    
    Signed-off-by: Christian Fredriksson <christian.fredriksson.2@volvocars.com>
    
    (cherry picked from commit bdd1fd3)
    cfredri4 authored and spring-builds committed Mar 5, 2025

    Partially verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    We cannot verify signatures from co-authors, and some of the co-authors attributed to this commit require their commits to be signed.
    Copy the full SHA
    5a317dd View commit details

Commits on Mar 6, 2025

  1. Handle null group id in listener observation (#3778)

    This change fixes an NPE when group id is null and observation is enabled.
    
    Fixes: #3778
    
    Signed-off-by: cfredri4 <christian.fredriksson.2@volvocars.com>
    
    (cherry picked from commit ccf4666)
    cfredri4 authored and sobychacko committed Mar 6, 2025
    Copy the full SHA
    829bf5e View commit details

Commits on Mar 14, 2025

  1. GH-3786: Remove duplicated trace header

    Fixes: #3786
    Issue link: #3786
    
    When tracing is enabled, the KafkaRecordSenderContext was adding a new
    trace header without removing existing ones, resulting in multiple
    headers in the same record. This commit fixes the issue by
    Updating KafkaRecordSenderContext to remove existing trace  headers
    before adding new ones.
    
    # Conflicts:
    #	spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
    sobychacko authored and artembilan committed Mar 14, 2025
    Copy the full SHA
    2cca235 View commit details
  2. Add missed import into ObservationTests

    artembilan committed Mar 14, 2025
    Copy the full SHA
    30a0ceb View commit details

Commits on Mar 15, 2025

  1. Bump org.springframework:spring-framework-bom from 6.1.17 to 6.1.18

    Bumps [org.springframework:spring-framework-bom](https://github.com/spring-projects/spring-framework) from 6.1.17 to 6.1.18.
    - [Release notes](https://github.com/spring-projects/spring-framework/releases)
    - [Commits](spring-projects/spring-framework@v6.1.17...v6.1.18)
    
    ---
    updated-dependencies:
    - dependency-name: org.springframework:spring-framework-bom
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    dependabot[bot] authored Mar 15, 2025
    Copy the full SHA
    208f0b1 View commit details
  2. Bump io.projectreactor:reactor-bom from 2023.0.15 to 2023.0.16

    Bumps [io.projectreactor:reactor-bom](https://github.com/reactor/reactor) from 2023.0.15 to 2023.0.16.
    - [Release notes](https://github.com/reactor/reactor/releases)
    - [Commits](reactor/reactor@2023.0.15...2023.0.16)
    
    ---
    updated-dependencies:
    - dependency-name: io.projectreactor:reactor-bom
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    dependabot[bot] authored Mar 15, 2025
    Copy the full SHA
    dc368cd View commit details
  3. Bump io.micrometer:micrometer-tracing-bom from 1.3.9 to 1.3.10

    Bumps [io.micrometer:micrometer-tracing-bom](https://github.com/micrometer-metrics/tracing) from 1.3.9 to 1.3.10.
    - [Release notes](https://github.com/micrometer-metrics/tracing/releases)
    - [Commits](micrometer-metrics/tracing@v1.3.9...v1.3.10)
    
    ---
    updated-dependencies:
    - dependency-name: io.micrometer:micrometer-tracing-bom
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    dependabot[bot] authored Mar 15, 2025
    Copy the full SHA
    ca541ec View commit details
  4. Bump org.springframework.data:spring-data-bom from 2024.0.9 to 2024.0.10

    Bumps [org.springframework.data:spring-data-bom](https://github.com/spring-projects/spring-data-bom) from 2024.0.9 to 2024.0.10.
    - [Release notes](https://github.com/spring-projects/spring-data-bom/releases)
    - [Commits](spring-projects/spring-data-bom@2024.0.9...2024.0.10)
    
    ---
    updated-dependencies:
    - dependency-name: org.springframework.data:spring-data-bom
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    dependabot[bot] authored Mar 15, 2025
    Copy the full SHA
    86cfa65 View commit details
  5. Bump io.micrometer:micrometer-bom from 1.13.11 to 1.13.12

    Bumps [io.micrometer:micrometer-bom](https://github.com/micrometer-metrics/micrometer) from 1.13.11 to 1.13.12.
    - [Release notes](https://github.com/micrometer-metrics/micrometer/releases)
    - [Commits](micrometer-metrics/micrometer@v1.13.11...v1.13.12)
    
    ---
    updated-dependencies:
    - dependency-name: io.micrometer:micrometer-bom
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    dependabot[bot] authored Mar 15, 2025
    Copy the full SHA
    6249c01 View commit details

Commits on Mar 17, 2025

  1. [artifactory-release] Release version 3.2.8

    spring-builds committed Mar 17, 2025
    Copy the full SHA
    1edeeb4 View commit details
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -62,15 +62,15 @@ ext {
kotlinCoroutinesVersion = '1.8.1'
log4jVersion = '2.23.1'
micrometerDocsVersion = '1.0.4'
micrometerVersion = '1.13.11'
micrometerTracingVersion = '1.3.9'
micrometerVersion = '1.13.12'
micrometerTracingVersion = '1.3.10'
mockitoVersion = '5.10.0'
reactorVersion = '2023.0.15'
reactorVersion = '2023.0.16'
scalaVersion = '2.13'
springBootVersion = '3.2.12' // docs module
springDataVersion = '2024.0.9'
springDataVersion = '2024.0.10'
springRetryVersion = '2.0.11'
springVersion = '6.1.17'
springVersion = '6.1.18'
zookeeperVersion = '3.8.4'

idPrefix = 'kafka'
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=3.2.7
version=3.2.8
org.gradle.jvmargs=-Xmx1536M -Dfile.encoding=UTF-8
org.gradle.caching=true
org.gradle.parallel=true
Original file line number Diff line number Diff line change
@@ -786,8 +786,8 @@ private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K,
this.observationConvention, DefaultKafkaTemplateObservationConvention.INSTANCE,
() -> new KafkaRecordSenderContext(producerRecord, this.beanName, this::clusterId),
this.observationRegistry);
observation.start();
try {
observation.start();
try (Observation.Scope ignored = observation.openScope()) {
return doSend(producerRecord, observation);
}
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
* @author Gary Russell
* @author Christian Mergenthaler
* @author Wang Zhiyang
* @author Christian Fredriksson
*
* @since 3.0
*
@@ -224,33 +225,45 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi
new DefaultKafkaListenerObservationConvention();

@Override
@NonNull
public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) {

return KeyValues.of(
String groupId = context.getGroupId();
KeyValues keyValues = KeyValues.of(
ListenerLowCardinalityTags.LISTENER_ID.withValue(context.getListenerId()),
ListenerLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"),
ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"),
ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()),
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"),
ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId())
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic")
);

if (StringUtils.hasText(groupId)) {
keyValues = keyValues
.and(ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(groupId));
}

return keyValues;
}

@Override
@NonNull
public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) {
String clientId = context.getClientId();
String consumerId = getConsumerId(context.getGroupId(), clientId);
KeyValues keyValues = KeyValues.of(
ListenerHighCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()),
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()),
ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context, clientId))
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset())
);

if (StringUtils.hasText(clientId)) {
keyValues = keyValues
.and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(clientId));
}

if (StringUtils.hasText(consumerId)) {
keyValues = keyValues
.and(ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(consumerId));
}

return keyValues;
}

@@ -259,11 +272,14 @@ public String getContextualName(KafkaRecordReceiverContext context) {
return context.getSource() + " receive";
}

private static String getConsumerId(KafkaRecordReceiverContext context, @Nullable String clientId) {
if (StringUtils.hasText(clientId)) {
return context.getGroupId() + " - " + clientId;
private static @Nullable String getConsumerId(@Nullable String groupId, @Nullable String clientId) {
if (StringUtils.hasText(groupId)) {
if (StringUtils.hasText(clientId)) {
return groupId + " - " + clientId;
}
return groupId;
}
return context.getGroupId();
return clientId;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 the original author or authors.
* Copyright 2022-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,13 +21,15 @@

import io.micrometer.observation.transport.SenderContext;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;

/**
* {@link SenderContext} for {@link ProducerRecord}s.
*
* @author Gary Russell
* @author Christian Mergenthaler
* @author Wang Zhiyang
* @author Soby Chacko
*
* @since 3.0
*
@@ -39,8 +41,12 @@ public class KafkaRecordSenderContext extends SenderContext<ProducerRecord<?, ?>
private final ProducerRecord<?, ?> record;

public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName, Supplier<String> clusterId) {
super((carrier, key, value) -> record.headers().add(key,
value == null ? null : value.getBytes(StandardCharsets.UTF_8)));
super((carrier, key, value) -> {
Headers headers = record.headers();
headers.remove(key);
headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8));
});

setCarrier(record);
this.beanName = beanName;
this.record = record;
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.micrometer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;

/**
* @author Christian Fredriksson
*/
public class KafkaListenerObservationTests {

@Test
void lowCardinalityKeyValues() {
ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 1, 2, "key", "value");
KafkaRecordReceiverContext context = new KafkaRecordReceiverContext(record, "listener", () -> null);
DefaultKafkaListenerObservationConvention.INSTANCE.getLowCardinalityKeyValues(context);
}

@Test
void highCardinalityKeyValues() {
ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 1, 2, "key", "value");
KafkaRecordReceiverContext context = new KafkaRecordReceiverContext(record, "listener", () -> null);
DefaultKafkaListenerObservationConvention.INSTANCE.getHighCardinalityKeyValues(context);
}
}
Original file line number Diff line number Diff line change
@@ -16,15 +16,18 @@

package org.springframework.kafka.support.micrometer;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.MeterRegistry;
@@ -53,6 +56,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
@@ -72,6 +76,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -97,9 +102,9 @@
* @since 3.0
*/
@SpringJUnitConfig
@EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
ObservationTests.OBSERVATION_ERROR }, partitions = 1)
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
@DirtiesContext
public class ObservationTests {

@@ -113,18 +118,21 @@ public class ObservationTests {

public final static String OBSERVATION_ERROR = "observation.error";

public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";

@Test
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker,
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin,
@Autowired @Qualifier("customTemplate") KafkaTemplate<Integer, String> customTemplate,
@Autowired Config config)
throws InterruptedException, ExecutionException, TimeoutException {
throws InterruptedException, ExecutionException, TimeoutException {

AtomicReference<SimpleSpan> spanFromCallback = new AtomicReference<>();

template.setProducerInterceptor(new ProducerInterceptor<>() {

@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
tracer.currentSpanCustomizer().tag("key", "value");
@@ -309,10 +317,10 @@ private void assertThatTemplateHasTimerWithNameAndTags(MeterRegistryAssert meter

meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.template",
KeyValues.of("spring.kafka.template.name", "template",
"messaging.operation", "publish",
"messaging.system", "kafka",
"messaging.destination.kind", "topic",
"messaging.destination.name", destName)
"messaging.operation", "publish",
"messaging.system", "kafka",
"messaging.destination.kind", "topic",
"messaging.destination.name", destName)
.and(keyValues));
}

@@ -321,12 +329,12 @@ private void assertThatListenerHasTimerWithNameAndTags(MeterRegistryAssert meter

meterRegistryAssert.hasTimerWithNameAndTags("spring.kafka.listener",
KeyValues.of(
"messaging.kafka.consumer.group", consumerGroup,
"messaging.operation", "receive",
"messaging.source.kind", "topic",
"messaging.source.name", destName,
"messaging.system", "kafka",
"spring.kafka.listener.id", listenerId)
"messaging.kafka.consumer.group", consumerGroup,
"messaging.operation", "receive",
"messaging.source.kind", "topic",
"messaging.source.name", destName,
"messaging.system", "kafka",
"spring.kafka.listener.id", listenerId)
.and(keyValues));
}

@@ -369,7 +377,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {
throws ExecutionException, InterruptedException, TimeoutException {

errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
@@ -394,6 +402,63 @@ void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin);
}

@Test
void verifyKafkaRecordSenderContextTraceParentHandling() {
String initialTraceParent = "traceparent-from-previous";
String updatedTraceParent = "traceparent-current";
ProducerRecord<Integer, String> record = new ProducerRecord<>("test-topic", "test-value");
record.headers().add("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));

// Create the context and update the traceparent
KafkaRecordSenderContext context = new KafkaRecordSenderContext(
record,
"test-bean",
() -> "test-cluster"
);
context.getSetter().set(record, "traceparent", updatedTraceParent);

Iterable<Header> traceparentHeaders = record.headers().headers("traceparent");

List<String> headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false)
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.toList();

// Verify there's only one traceparent header and it contains the updated value
assertThat(headerValues).containsExactly(updatedTraceParent);
}

@Test
void verifyTraceParentHeader(@Autowired KafkaTemplate<Integer, String> template,
@Autowired SimpleTracer tracer) throws Exception {
CompletableFuture<ProducerRecord<Integer, String>> producerRecordFuture = new CompletableFuture<>();
template.setProducerListener(new ProducerListener<>() {

@Override
public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMetadata recordMetadata) {
producerRecordFuture.complete(producerRecord);
}
});
String initialTraceParent = "traceparent-from-previous";
Header header = new RecordHeader("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(
OBSERVATION_TRACEPARENT_DUPLICATE,
null, null, null,
"test-value",
List.of(header)
);

template.send(producerRecord).get(10, TimeUnit.SECONDS);
ProducerRecord<Integer, String> recordResult = producerRecordFuture.get(10, TimeUnit.SECONDS);

Iterable<Header> traceparentHeaders = recordResult.headers().headers("traceparent");
assertThat(traceparentHeaders).hasSize(1);

String traceparentValue = new String(traceparentHeaders.iterator().next().value(), StandardCharsets.UTF_8);
assertThat(traceparentValue).isEqualTo("traceparent-from-propagator");

tracer.getSpans().clear();
}

@Configuration
@EnableKafka
public static class Config {
@@ -523,6 +588,9 @@ public List<String> fields() {
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
setter.set(carrier, "foo", "some foo value");
setter.set(carrier, "bar", "some bar value");

// Add a traceparent header to simulate W3C trace context
setter.set(carrier, "traceparent", "traceparent-from-propagator");
}

// This is called on the consumer side when the message is consumed
@@ -531,7 +599,9 @@ public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> sett
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
String foo = getter.get(carrier, "foo");
String bar = getter.get(carrier, "bar");
return tracer.spanBuilder().tag("foo", foo).tag("bar", bar);
return tracer.spanBuilder()
.tag("foo", foo)
.tag("bar", bar);
}
};
}