Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messaging] Reduce allocation for Send by using RedableBuffer #34591

Merged
merged 9 commits into from
May 3, 2023

Conversation

anuchandy
Copy link
Member

@anuchandy anuchandy commented Apr 21, 2023

When sending a batch, the EH and SB Sender allocates a byte array upfront with a length equal to the entity's MaxMessageSize. For a Queue entity, the MaxMessageSize can range from 1MB to 100MB. It is an allocation overhead; e.g., the library allocates 50MB (if the entity's MaxMessageSize is 50MB) to send a batch of size 2KB. The current allocation behavior in T1 and T2 libraries are the same.

This PR address this overhead by taking a different approach -

  1. The ProtonJ CompositeRedableBuffer can hold a collection of byte[].
  2. Messaging libraries can add the encoded byte representation of each message in the batch to CompositeRedableBuffer.
  3. An overload of PortonJ send API exists that takes RedableBuffer.
  4. Given that each message in the batch has to be in AMQP 1.0 spec Data Binary format, we can precompute the encoded size, saving us from making an additional byte[] copy per message.

allocation-pattern

Shown below is the memory allocation pattern before and after the new apporach. The Queue used has a Maximum message size of 25MB.

OldVsNew

The code used the measurement is below:

expand
import com.azure.core.util.BinaryData;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public final class MeasureBatchSendAllocation {
    public static void main(String[] args) throws InterruptedException {

        final String conStr = System.getenv("CON_STR");
        final String qname = System.getenv("Q_NAME"); // Max-Message-Size:25MB

        ServiceBusSenderClient sender = new ServiceBusClientBuilder()
            .connectionString(conStr)
            .sender()
            .topicName(qname)
            .buildClient();

        final int runs = 1000;
        for (int run = 0; run < runs; run++) {
            final int iterations = 1000;
            final List<ServiceBusMessage> testMessages = testMessages();
            final CountDownLatch latch = new CountDownLatch(iterations);
            final AtomicInteger errorCount = new AtomicInteger();

            System.out.println("Sending iterations: Start.. (" + run + ")");
            final Scheduler scheduler = Schedulers.newBoundedElastic(20, 100000, "send-worker");
            for (int i = 0; i < iterations; i++) {
                scheduler.schedule(() -> {
                    try {
                        send(sender, testMessages);
                    } catch (RuntimeException __) {
                        errorCount.incrementAndGet();
                    } finally {
                        latch.countDown();
                    }
                });
            }
            System.out.println("Sending iterations: Scheduled.. Now waiting (" + run + ")");
            latch.await();
            System.out.println("Sending iterations: End...(" + run + ") Error-Count:" + errorCount.get());
        }
        sender.close();
    }

    private static void send(ServiceBusSenderClient sender, List<ServiceBusMessage> testMessages) {
        ServiceBusMessageBatch currentBatch = sender.createMessageBatch();
        for (ServiceBusMessage message : testMessages) {
            if (currentBatch.tryAddMessage(message)) {
                continue;
            }
            sender.sendMessages(currentBatch);
            currentBatch = sender.createMessageBatch();
            if (!currentBatch.tryAddMessage(message)) {
                System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s. Message: %s%n",
                    currentBatch.getMaxSizeInBytes(), message.getBody().toString());
            }
        }
        sender.sendMessages(currentBatch);
    }

    private static List<ServiceBusMessage> testMessages() {
        final String longMessage = "Depend on Service Bus when you need highly reliable cloud messaging service between applications"
            + " and services even when they’re offline. Available in every Azure region, this fully managed service eliminates"
            + " the burdens of server management and licensing. Get more flexibility when brokering messaging between client and"
            + " server with asynchronous operations along with structured first-in, first-out (FIFO) messaging and publish/subscribe"
            + " capabilities. Fully enterprise messaging service jms support.";
        List<ServiceBusMessage> testMessages = Arrays.asList(
            new ServiceBusMessage(BinaryData.fromString(longMessage)),
            new ServiceBusMessage(BinaryData.fromString("Red")),
            new ServiceBusMessage(BinaryData.fromString("Blue")),
            new ServiceBusMessage(BinaryData.fromString("Orange")));

        testMessages.get(0).setTimeToLive(Duration.ofSeconds(1));
        return testMessages;
    }
}

Notes on precomputing the byte[] size for encode api

expand

When encoding for Amqp 1.0 spec "amqp:data:binary" format -

  1. the QPid type org.apache.qpid.proton.codec.messaging.FastPathDataType writes the descriptor codes.
  2. the QPid type org.apache.qpid.proton.codec.BinaryType writes the binary.

The 'FastPathDataType' writes the descriptor codes as below:

       .put(EncodingCodes.DESCRIBED_TYPE_INDICATOR); <- 0x00 (1 byte)
       .put(EncodingCodes.SMALLULONG);               <- 0x53 (1 byte)
       .put(DESCRIPTOR_CODE);                        <- 0x75 (1 byte)

After the descriptor codes, the 'BinaryType' writes the binary (byte[]) as below:

 a. For binary of size <= 255:
       .put(EncodingCodes.VBIN8);                    <- 0xa0 (1 byte)
       .put((byte) binary.getLength());              <- bytes[].length (1 byte)
       .put(binary.getArray(), ...);
        
 b. For binary (bye[]) of size > 255
       .put(EncodingCodes.VBIN32);                   <- 0xb0 (1 byte)
       .put(binary.getLength());                     <- bytes[].length (integer, 4 bytes)
       .put(binary.getArray(), ...);

Knowing the spec we can precompute the size of byte array to pass to encode api.

Reported issue

@github-actions github-actions bot added the Azure.Core.AMQP azure-core-amqp label Apr 21, 2023
@anuchandy anuchandy changed the title [Messaging] Using RedableBuffer to send [Messaging] Reduce allocation for Send by using RedableBuffer Apr 21, 2023
@azure-sdk
Copy link
Collaborator

API change check

API changes are not detected in this pull request.

@anuchandy anuchandy marked this pull request as ready for review April 28, 2023 20:31
@anuchandy anuchandy requested a review from conniey as a code owner April 28, 2023 20:31
Copy link
Member

@conniey conniey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

  • CHANGELOG entry

  • Does this work on our integration tests for both languages?

  • Since this affects both SB and EH... Do we happen to have some data on how much this saves when moving one from the other? I think it would be great to have so we feel confident that this improves things. :)

@anuchandy
Copy link
Member Author

/azp run java - servicebus - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@anuchandy
Copy link
Member Author

/azp run java - eventhubs - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Member Author

@anuchandy anuchandy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Measuring batch send allocation

I ran a couple of memory measurements, and the new changes reduced the memory and total GC count; this gives good confidence in the new approach—the more the entity's configured maximum message size, the more visible the savings.

For memory measurements, we

  • Set -Xmx3072m. 3 to 5GB is a typical container allocation.
  • The application sends a batch with a total size of 1KB, containing four messages. The app uses 20 threads to invoke send API concurrently after warmup.
  • Used JFR to capture 10 minutes of profiling, which is good enough to see the allocation pattern.
  • Measured for max message size of 25mb and 1mb.

See the details below -

Max-message-size:25mb

With the new approach, the memory consumption for batch send, on average, stayed below ~650 MB. With the old, it's around 1.3 GB. Also, the GC count is relatively less in the new approach.

Metrics New Old
Average memory ~650MB ~1.3GB
Total Memory ~150GB ~256GB
GC Count 7317 9859

New approach

New_Apporach_MaxMessageSize_25mb

New_GC_Apporach_MaxMessageSize_25mb

Old Approach

Old_Apporach_MaxMessageSize_25mb

Old_GC_Apporach_MaxMessageSize_25mb

Max-message-size:1mb

1MB is the minimum configurable size for the Service Bus entity. For Event Hubs, 1 MB is the default, and Event Hubs does not allow users to override this default max message size.

With the new approach, the memory consumption for batch send, at its peak, is ~1.7 GB. With the old, it's around 2.1 GB. Also, the GC count is relatively less in the new approach.

Metrics New Old
Peek Memory ~1.7GB ~2.1GB
Total Memory ~8GB ~16GB
GC Count 981 1774

New approach

New_Apporach_MaxMessageSize_1mb

New_GC_Apporach_MaxMessageSize_1mb

Old Approach

Old_Apporach_MaxMessageSize_1mb

Old_GC_Apporach_MaxMessageSize_1mb


Overall, the new approach is showing good results for batch send.

The code for memory measurement.

Measuring non-batch send allocation

The non-batch send (i.e., single byte[]) code path stays the same as the old one, so it is not impacted. See this discussion.

@anuchandy
Copy link
Member Author

/azp run java - servicebus - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@anuchandy
Copy link
Member Author

/azp run java - eventhubs - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Azure.Core.AMQP azure-core-amqp
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants