-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incorrect retry topic suffix #2654
Comments
The problem is here: Line 272 in f351a7a
Why this.backOffValues.size() > 1 ?Is 1 attempt with fixed delay is not fixed delay according to this method? Or it's feature that I didn't realize? |
Confirmed that this is a bug (when As a work around, see https://docs.spring.io/spring-kafka/docs/current/reference/html/#custom-naming-strategies Here's an example: @SpringBootApplication
public class Kgh2654Application extends RetryTopicConfigurationSupport {
public static void main(String[] args) {
SpringApplication.run(Kgh2654Application.class, args);
}
@Bean
NewTopic topic() {
return TopicBuilder.name("kgh2654").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("kgh2654", "foo");
};
}
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
@Bean
TaskScheduler sched() {
return new ThreadPoolTaskScheduler();
}
}
@Component
class Listener {
@RetryableTopic(attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(id = "kgh2654", topics = "kgh2654")
void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
System.out.println(in + " from " + topic);
throw new RuntimeException("test");
}
@DltHandler
void handle(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
System.out.println(in + " from " + topic);
}
}
class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint() || properties.isDltTopic()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return topic + "-retry";
}
};
}
}
} |
Got it, thanks! |
This is what worked for us in spring-kafka 2.8.8 👇 First we created a CustomRetryTopicNamesProviderFactory that implements RetryTopicNamesProviderFactory import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(final DestinationTopic.Properties properties) {
if (properties.isMainEndpoint() || properties.isDltTopic()) {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties);
} else {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(final String topic) {
return topic + "-retry";
}
};
}
}
} and then we created a bean for class RetryTopicNamesProviderFactory like this package com.bitpanda.legaltenancy.infrastructure.kafka.factories;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
@Configuration
public class RetryTopicComponentFactory {
@Bean
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
} Works like a charm 🥳 |
2.9.x and later require extending |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
Resolves spring-projects#2654 * fix incorrect retry topic suffix when attempts is 2 * optimization method `isDelayWithReusedTopic` * add retry topic integration tests when reuse retry topic fix java doc in RetryTopic * fix javadoc in `@RetryTopic` * fix javadoc in `RetryTopicConfigurationProvider`
* GH-2654: Incorrect retry topic suffix Resolves #2654 * fix incorrect retry topic suffix when attempts is 2 * optimization method `isDelayWithReusedTopic` * add retry topic integration tests when reuse retry topic fix java doc in RetryTopic * fix javadoc in `@RetryTopic` * fix javadoc in `RetryTopicConfigurationProvider` * fix javadoc in `RetryableTopic` * add a note to the `whats-new.adoc` * Doc polishing. --------- Co-authored-by: Zhiyang.Wang1 <zhiyang.wang@caocaoglobal.com> Co-authored-by: Gary Russell <grussell@vmware.com>
In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.8.2
Describe the bug
Incorrect retry topic suffix (
retry-5000
) when attempts is equal to 2 in@RetryableTopic
andfixedDelayStrategy=SINGLE_TOPIC
To Reproduce
Expected behavior
Retry topic suffix =
-retry
The text was updated successfully, but these errors were encountered: