Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Pulsar sender #273

Merged
merged 5 commits into from
Feb 12, 2025

Conversation

CodePrometheus
Copy link
Contributor

@CodePrometheus CodePrometheus commented Jan 28, 2025

Ref openzipkin/zipkin#3788

  • Add Pulsar sender
  • Add tests for all changes
  • Add corresponding documentation
Benchmark                                      (messageMaxBytes)   Mode  Cnt    Score   Error  Units
PulsarSenderBenchmarks.report                              65536  thrpt   15  944.957 ± 2.515  ops/s
PulsarSenderBenchmarks.report:messages                     65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped              65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                        65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                             500000  thrpt   15  943.387 ± 3.743  ops/s
PulsarSenderBenchmarks.report:messages                    500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped             500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                       500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                            5242880  thrpt   15  942.780 ± 8.561  ops/s
PulsarSenderBenchmarks.report:messages                   5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped            5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                      5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                           16777216  thrpt   15  942.816 ± 4.404  ops/s
PulsarSenderBenchmarks.report:messages                  16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped           16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                     16777216  thrpt   15      ≈ 0          ops/s

@codefromthecrypt
Copy link
Member

@CodePrometheus ps on things you need to complete this task, please mention me directly. I don't watch all repo notifications for hobby time stuff.

Copy link
Member

@codefromthecrypt codefromthecrypt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is very good and will defer to @reta for follow-up. Main hesitation here is about the sender creating topics, which I think is too much responsibility. Other is having an import dep on slf4j which while today could be ok, could be a headache tomorrow.


import static org.testcontainers.utility.DockerImageName.parse;

public class PulsarSenderBenchmarks extends SenderBenchmarks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add to the description the run of this benchmark in triple backticks

</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good. sender name matches the artifact here!

}
});
} catch (Exception e) {
throw new RuntimeException("Pulsar producer send failed." + e.getMessage(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createIfNeeded(message);
}

void createIfNeeded(byte[] message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to refactor this a bit like KafkaSender, you can make a function get() which can throw as needed. Then, at the call site handle the exceptions in one place.

I would recommend not doing logging and the main reason is for slf4j lockups. If the version of slf4j changes for pulsar, we'd have a revlock here. this is one reason why others either don't log or they use JUL.

Finally, consider if we should implicitly create a topic, as that causes failure cases and more code. I don't think we imiplicitly create topics anywhere else, but I could be mistaken. If the docker image we use should have a default topic of zipkin we could add it into the image for convenience or set the image to auto-create topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I try to make the message sending method clearer, for pulsar, the topic is automatically created by default.

@reta
Copy link
Contributor

reta commented Feb 9, 2025

I think this is very good and will defer to @reta for follow-up.

Great job @CodePrometheus , a few comments but pretty much LGTM ! Thank you!

@CodePrometheus
Copy link
Contributor Author

@codefromthecrypt @reta Thanks for your time! I tried to fix and answer the questions raised.

Benchmark results on my local are as follows:

Benchmark                                      (messageMaxBytes)   Mode  Cnt    Score   Error  Units
PulsarSenderBenchmarks.report                              65536  thrpt   15  944.957 ± 2.515  ops/s
PulsarSenderBenchmarks.report:messages                     65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped              65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                        65536  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                             500000  thrpt   15  943.387 ± 3.743  ops/s
PulsarSenderBenchmarks.report:messages                    500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped             500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                       500000  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                            5242880  thrpt   15  942.780 ± 8.561  ops/s
PulsarSenderBenchmarks.report:messages                   5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped            5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                      5242880  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report                           16777216  thrpt   15  942.816 ± 4.404  ops/s
PulsarSenderBenchmarks.report:messages                  16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:messagesDropped           16777216  thrpt   15      ≈ 0          ops/s
PulsarSenderBenchmarks.report:spans                     16777216  thrpt   15      ≈ 0          ops/s

@codefromthecrypt codefromthecrypt merged commit cacf9e9 into openzipkin:master Feb 12, 2025
4 checks passed
@codefromthecrypt
Copy link
Member

online until end of tomorrow UTC+8, so hopping in. Nice one!

@CodePrometheus CodePrometheus deleted the add-pulsar-sender branch February 12, 2025 02:38
void sendMessage(byte[] message) {
if (client == null) {
synchronized (this) {
if (client == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this method (sendMessage) do nothing after the client is created (client != null)? Generally meaning that calls after the first to sendMessage will not actually send the message? Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the following test to ITPulsarSender which fails on the second assertion with "Timed out waiting to read message."

@Test void send_multiple_JSON_messages() throws Exception {
  try (PulsarSender sender = pulsar.newSenderBuilder(testName)
    .encoding(Encoding.JSON)
    .build()) {
    send(sender, CLIENT_SPAN, CLIENT_SPAN);
    send(sender, CLIENT_SPAN);

    assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
      CLIENT_SPAN, CLIENT_SPAN);
    assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
      CLIENT_SPAN);
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right @shakuzen , probably lost in rounds of review

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG, this was a big mistake of mine. Thank you for pointing it out, I'll fix it as soon as possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love to see this collaboration! thanks @shakuzen!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants