Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][producer] Fail all messages that are pending requests when closing like java #1059

Merged
merged 9 commits into from
Aug 30, 2023

Conversation

graysonzeng
Copy link
Contributor

@graysonzeng graysonzeng commented Jul 18, 2023

Fixes #1042 , link #1057

Motivation

producer not fail all messages that are in the pending queue when closing

Modifications

add failPendingMessages() to fail the pending requests when closing producer

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

@Gleiphir2769
Copy link
Contributor

Hi @graysonzeng. I think dataChan should be cleared in the internalFlush. Because internalFlush should guarantee flush all the messages in the dataChan too.

I submit a fix PR in #1058 and I think internalClose can just invoke internalFlush to clear dataChan.

@gunli
Copy link
Contributor

gunli commented Jul 19, 2023

I submit a fix PR in #1058 and I think internalClose can just invoke internalFlush to clear dataChan.

I think that will be better, I think we can update this PR after @Gleiphir2769's PR is merged. @graysonzeng

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jul 19, 2023

Hi @graysonzeng. I think dataChan should be cleared in the internalFlush. Because internalFlush should guarantee flush all the messages in the dataChan too.

I submit a fix PR in #1058 and I think internalClose can just invoke internalFlush to clear dataChan.

I agree. I will update this PR if #1058 is merged

@RobertIndie
Copy link
Member

Hi @graysonzeng #1058 has been merged.

@graysonzeng
Copy link
Contributor Author

update completed @RobertIndie

@RobertIndie
Copy link
Member

Seems that in the Java client, the producer will fail all messages that are in the pending queue when closing:
https://github.com/apache/pulsar/blob/2bede012c73c301b73c079aaeb122cbb472728c1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1079
https://github.com/apache/pulsar/blob/2bede012c73c301b73c079aaeb122cbb472728c1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1098

cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
            cnx.removeProducer(producerId);
            if (exception == null || !cnx.ctx().channel().isActive()) {
                // Either we've received the success response for the close producer command from the broker, or the
                // connection did break in the meantime. In any case, the producer is gone.
                log.info("[{}] [{}] Closed Producer", topic, producerName);
                closeAndClearPendingMessages();  // <- we call failPendingMessages in closeAndClearPendingMessages
                closeFuture.complete(null);
            } else {
                closeFuture.completeExceptionally(exception);
            }

            return null;
        });

@gunli
Copy link
Contributor

gunli commented Jul 21, 2023

Seems that in the Java client, the producer will fail all messages that are in the pending queue when closing:

From the user's perspective, I would like to have the submitted data sent out as much as possible when closing the client. The cost is I have to wait until timeout. And, when the user decide to close the client, as a producer, I think we would like to wait rather than handling the failed messages manually after the client is closed.

@RobertIndie
Copy link
Member

From the user's perspective, I would like to have the submitted data sent out as much as possible when closing the client. The cost is I have to wait until timeout.

The user can call the flush and then call the close. IMO, the close means that we are going to terminate the producer and release all the resources. If there are any ongoing requests, we need to fail them.

@gunli
Copy link
Contributor

gunli commented Jul 21, 2023

The user can call the flush and then call the close. IMO, the close means that we are going to terminate the producer and release all the resources. If there are any ongoing requests, we need to fail them.

Hmm, this implies that users must call flush() before closing, and if they do so but still have unsent data, it looks like that the flush() implementation is not perfect. On the other hand, if users don't call flush(), they will have to bear the consequences themselves, in this case, it seems better to encapsulate flush() in close() to reduce the burden on users.

It seems a bit contradictory.

And, as a user, when it reach the time to call close(), I won't care about the results of the async callback of the failed requests, unless the close() returns the failed requests to me as the return value, in this case, it seems failing the pending reqeusts is of little meaning.

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jul 21, 2023

https://github.com/apache/kafka/blob/4ea9394e7ebf71e2dcf96e9b96191dac253f4930/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1294C15-L1294C22

In kafka. When the user actively closes, the waiting time can be specified until all messages in the queue are cleared. In pulsar-go, i think it is a good choice to use flush to send the pending data in the queue before closing。

If you think it's better to let the message fail directly. i can change it @RobertIndie

@RobertIndie
Copy link
Member

and if they do so but still have unsent data, it looks like that the flush() implementation is not perfect.

The flush should guarantee that messages which are at least pushed into the data chan should be flushed to the broker. And that's the issue @Gleiphir2769 has fixed:#1057.

if users don't call flush(), they will have to bear the consequences themselves

Yes. That's right. The user can choose to fail or flush the messages.

IMO, the close should have only one responsibility: just close and release all the resources of the producer.
I still think it's more important to keep the behavior consistent with the Java client. If we want to change this behavior, then we need to change it in the Java client first. And this needs a discussion on the mailing list. Besides, I still think it's better to fail the messages directly.

@gunli
Copy link
Contributor

gunli commented Jul 21, 2023

And, as a user, when it reach the time to call close(), I won't care about the results of the async callback of the failed requests, unless the close() returns the failed requests to me as the return value, in this case, it seems failing the pending reqeusts is of little meaning.

OK. But I think we should think about the concerns aboved for the users, return the failed requests to the users rather than just calling the async callbacks, so that they can decide what to do to these failed requests.

@RobertIndie
Copy link
Member

OK. But I think we should think about the concerns aboved for the users, return the failed requests to the users rather than just calling the async callbacks, so that they can decide what to do to these failed requests.

@gunli Thanks. You're right. I have created an issue to track this improvement: #1063

@graysonzeng graysonzeng changed the title [Fix][producer] Ensure all data in dataChan will be processed when internalClose() was called [Fix][producer] Fail all messages that are pending Requests when closing like java Jul 21, 2023
@graysonzeng graysonzeng changed the title [Fix][producer] Fail all messages that are pending Requests when closing like java [Fix][producer] Fail all messages that are pending requests when closing like java Jul 21, 2023
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fail the pending requests in the pending queue in the producer instead of the connection. Just like failTimeoutMessages did:

func (p *partitionProducer) failTimeoutMessages() {

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. Could you add a test?

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jul 27, 2023

HI @RobertIndie . How can I stably make some messages that exist in pendQueue ?

@RobertIndie
Copy link
Member

HI @RobertIndie . How can I stably make some messages that exist in pendQueue ?

You can create a producer with relative large BatchingMaxPublishDelay and BatchingMaxMessages so that messages could stay in the pending queue stably.

@graysonzeng
Copy link
Contributor Author

PTAL @RobertIndie

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@tisonkun tisonkun merged commit 4109351 into apache:master Aug 30, 2023
6 checks passed
RobertIndie pushed a commit that referenced this pull request Sep 7, 2023
…ng (#1059)

This aligns the manner with Java client.

(cherry picked from commit 4109351)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug][Producer]Forget to consume the sendRequests before closing the producer
5 participants