Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Option to Log Recovered Record to DeadLetterPublishingRecoverer #2862

Closed
garyrussell opened this issue Oct 26, 2023 · 4 comments · Fixed by #2869
Closed

Add Option to Log Recovered Record to DeadLetterPublishingRecoverer #2862

garyrussell opened this issue Oct 26, 2023 · 4 comments · Fixed by #2869

Comments

@garyrussell
Copy link
Contributor

@garyrussell garyrussell added this to the 3.1.0 milestone Oct 26, 2023
@garyrussell garyrussell changed the title Add Option to Log Recovered Record to DeadLetterPublishingRecoverer Add Option to Log Recovered Record to ErrorHandlingDeserializer Oct 26, 2023
@garyrussell garyrussell changed the title Add Option to Log Recovered Record to ErrorHandlingDeserializer Add Option to Log Recovered Record to DeadLetterPublishingRecoverer Oct 26, 2023
@Wzy19930507
Copy link
Contributor

Wzy19930507 commented Oct 29, 2023

Hi @garyrussell , may i pick it up?

If my understanding is correct, it can be add code after DeadLetterPublishingRecoverer.accept()#L505

if (this.skipSameTopicFatalExceptions
&& tp.topic().equals(record.topic())
&& !getClassifier().classify(exception)) {
this.logger.error("Recovery of " + KafkaUtils.format(record)
+ " skipped because not retryable exception " + exception.toString()
+ " and the destination resolver routed back to the same topic");
return;
}
if (consumer != null && this.verifyPartition) {

if (logRecordEnable) {
    logger.info("Recovered Record " + KafkaUtils.format(record) + " , exception " + exception);
}

@garyrussell
Copy link
Contributor Author

garyrussell commented Oct 29, 2023

Sure, but I think it actually belongs I the the DefaultErrorHandler after successful recovery.

@Wzy19930507
Copy link
Contributor

Wzy19930507 commented Oct 30, 2023

thank you for your reminder.

Here are 4 methods need to Log in DefaultErrorHandler:

  • handleOne
  • handleRemaining
  • handleBatchAndReturnRemaining
  • handleBatch

note

I have a few questions that hope you can give me a hint

  1. edge case at handleRemaining when SeekUtils.commitRecovered is true and ack mode is MANUAL_IMMEDIATE, successful recovery but consumer.sync() fail throw CommitFailedException, unable to log.
    if (commitRecovered) {
    if (container.getContainerProperties().getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
    ConsumerRecord<?, ?> record = records.get(0);
    Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
    new TopicPartition(record.topic(), record.partition()),
    ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1));
    if (container.getContainerProperties().isSyncCommits()) {
    consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
    }

Should i try catch CommitFailedException and Log recovered record at handleRemaining?


  1. at handleBatchAndReturnRemaining and handleBatch
    public <K, V> ConsumerRecords<K, V> handleBatchAndReturnRemaining(Exception thrownException,
    ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container,
    Runnable invokeListener) {

    only Log recovery record or Log ConsumerRecords data?
    if only Log recovery record, at handleBatch need change doHandle to handle

  1. same as Q2, at handleRemaining
    public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
    Consumer<?, ?> consumer, MessageListenerContainer container) {

    Log records.get(0) or Log records?

@garyrussell
Copy link
Contributor Author

I see, yes, complicated; let's go back to logging in the DLPR only.

Wzy19930507 pushed a commit to Wzy19930507/spring-kafka that referenced this issue Oct 31, 2023

Verified

This commit was signed with the committer’s verified signature.
ljharb Jordan Harband
Resolves spring-projects#2862

* add option logRecoveryRecord in `annotation-error-handling.adoc`

Fix Javadoc for CommonErrorHandler

Delete unused code in SerializationUtils
garyrussell added a commit that referenced this issue Oct 31, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
* GH-2862: Add Option to Log Recovery to DLPR

Resolves #2862

* add option logRecoveryRecord in `annotation-error-handling.adoc`

Fix Javadoc for CommonErrorHandler

Delete unused code in SerializationUtils

* Fix since.

---------

Co-authored-by: Zhiyang.Wang1 <zhiyang.wang@caocaoglobal.com>
Co-authored-by: Gary Russell <grussell@vmware.com>
@garyrussell garyrussell removed this from the 3.1.0 milestone Oct 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants