New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: dynamically choose the number of messages for ReceiveBatch #1200
Conversation
To decide how many messages to pull at a time, we aim for the in-memory queue of messages to be a certain size. That gives us a buffer of messages to draw from, ensuring high throughput, without pulling so many messages that the unconsumed ones languish. We measure the size by time instead of message count. Time is more relevant, because ack deadlines are expressed in time, and it's easier to think about lost work (in the event of a crash) in terms of time lost rather than messages lost. We keep track of the average time it takes to process a message. Then we can convert a queue size in time to a number of messages. We compute processing time by measuring the time between when Receive returns and when it is next called. Although this is incorrect in the short term, because multiple goroutines may call Receive at the same time, in the long run it is accurate enough. We rejected the obvious alternative, measuring time from Receive to Ack, because not every message will be acked. It is perfectly reasonable for a subscriber to nack (or fail to ack) a significant fraction of the messages it receives, but processing time for those unacked messages should still be included in the calculation of how many messages to pull. This change significantly improves the Receive benchmark -- messages per second is more than quadrupled. But there is more work to do. We should pre-emptively pull messages when the queue size gets low, and we should issue multiple ReceiveBatch calls concurrently. Besides performance, this change also improves behavior over current master at very low processing rates. Currently we pull a constant 10 messages per ReceiveBatch. If it takes a long time to process one message, then the other 9 will sit in RAM and may expire. With this change, we will pull just one message at a time if need be. Addresses google#691.
pubsub/averager.go
Outdated
"time" | ||
) | ||
|
||
// An averager keeps track of an average value over a time interval. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I didn't know about it. (How did you find it?) It looks much more sophisticated than what I have, which is nice. But it says
Current implementations assume an implicit time interval of 1.0 between every sample added. That is, the passage of time is treated as though it's the same as the arrival of samples. If you need time-based decay when samples are not arriving precisely at set intervals, then this package will not support your needs at present.
which means we can't use it.
But you raise the question whether time-based decay is what we want. I thought about implementing it, but decided the complexity wasn't worth it. I'm not sure it's going to matter much whether the contribution of 1-minute-old points is reduced or not. It's more important that their contribution eventually goes to zero, which my implementation achieves rather abruptly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it like this:
https://www.google.com/search?q=golang+moving+average
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which means we can't use it.
Is that true? I am convinced by your argument about using the predicted time-to-process of the queue to determine the batch size, but I'm not sure that that implies that we need to use time-based bucketing for the estimate of how long each message takes to process.
I'm not sure that that library will do better than what you have here, but it's worth thinking about. Are there scenarios where exponential decay will do the wrong thing? (probably).
What about an app that gets bursts of message periodically (e.g., every 15m it gets 1000 messages). IIUC, your impl will start at a constant (currently 1) for each burst and ramp up (because after 1m it forgets all about previous history). The decaying moving average wouldn't forget anything during the idle 15m. Which is better? (not obvious, TBH).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. It seemed obvious to me that we wanted the number to decay in proportion to elapsed time, but you're right, there's no strong reason for that (aside from a hand-wavy argument about things generally having some time locality).
ewma
will forget in proportion to the number of messages processed, rather than time. Maybe that's fine. If it took you 100ms to process a message three days ago, why wouldn't it take the same time now? On the other hand, system behavior tends to be spiky. You want to ride the spikes when they happen, then quickly forget about them. But I'm really just speculating here.
I'm fine doing any of the following:
- Keeping my code.
- Switching to
ewma
- Doing a better job of time-based moving average. I think doing it exactly requires saving every point, but we could combine bucketing with a decay factor.
In any case, we should add this discussion to the issue, or a new issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss at stand-up tomorrow.
s.waitc = make(chan struct{}) | ||
s.mu.Unlock() | ||
// Even though the mutex is unlocked, only one goroutine can be here. | ||
// The only way here is if s.waitc was nil. This goroutine just set | ||
// s.waitc to non-nil while holding the lock. | ||
msgs, err := s.getNextBatch(ctx) | ||
msgs, err := s.getNextBatch(ctx, nMessages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test is failing:
drivertest.go:271: pubsub (code=Unknown): replayer: request not found: subscription:"projects/go-cloud-test-216917/subscriptions/TestConformance_TestSendReceiveTwo-subscription-1" max_messages:936
That looks likely to be flaky....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually 9364. The Mac tests run a bit slower. I changed the cap to 1000 for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
9364
? Why? (why isn't this flaky?) Seems like it's dependent on timing, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got the number 9364 from the actual travis log. I think you dropped a digit when you copied it. The relevant point is that it's > 1000.
It is time-dependent. But with a cap of 1000, it would have to take longer than 1ms between calls to Receive on average, and considering that we do nothing but call Ack, that's extremely unlikely.
I agree, though, that this solution isn't great.
pubsub/averager.go
Outdated
"time" | ||
) | ||
|
||
// An averager keeps track of an average value over a time interval. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which means we can't use it.
Is that true? I am convinced by your argument about using the predicted time-to-process of the queue to determine the batch size, but I'm not sure that that implies that we need to use time-based bucketing for the estimate of how long each message takes to process.
I'm not sure that that library will do better than what you have here, but it's worth thinking about. Are there scenarios where exponential decay will do the wrong thing? (probably).
What about an app that gets bursts of message periodically (e.g., every 15m it gets 1000 messages). IIUC, your impl will start at a constant (currently 1) for each burst and ramp up (because after 1m it forgets all about previous history). The decaying moving average wouldn't forget anything during the idle 15m. Which is better? (not obvious, TBH).
pubsub/averager.go
Outdated
@@ -49,10 +49,10 @@ func newAverager(dur time.Duration, nBuckets int) *averager { | |||
func (a *averager) average() float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: consider making this Average
since it's an "external" function? (even though the whole struct isn't). Not sure what typical Go style for this is, but I think it can be useful to be clear about what the expected API of internal objects is even if it's not enforced.
This code could also be moved into the internal/batcher
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the whole thing.
pubsub/pubsub.go
Outdated
// messages will wait in memory for a long time, possibly timing out (that is, | ||
// their ack deadline will be exceeded). Those messages could have been handled | ||
// by another process receiving from the same subscription. | ||
desiredQueueLength = 2 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queue lengths are not defined in units of time. They are counts, so this should have a different name. Would desiredTimeInQueuePerMessage
be more accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you okay with desiredQueueDuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, as long as the comment explains what it means. I think it's how long we want a message to spend in the queue, although I find this idea a bit strange since I don't have a preference about how long messages stay in the queue so long as it's a lot less than the ack deadline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although I find this idea a bit strange
Exactly, that's why I don't think that's the right way to think about it. It's more like, we want to keep a few messages around in case Receive speeds up and starts chewing through them faster than it has been. In other words, a buffer. How many messages do we want to keep in the buffer? No, that's the wrong question: how much runway (in time) do we want before we run out of messages?
Yes, if Receive always takes the same time, then a message will spend all that time in the queue. But that's a side effect, the price we pay for having messages available on demand to improve throughput.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so to write it in Javanese, it would be more like desiredAmountOfTimeBeforeQueueProbablyRunsOut
. Makes sense.
There's a lot going on here (not just on this PR), and we need a path forward.
We don't have to solve all of these at once. Let's start with simple changes that make big improvements and see how far that gets us. I propose:
Thoughts? |
Totally agree. Let me fix the name of the constant as @ijt requested, and change the measurement to end at ack. Then I think this is a workable and useful PR. |
PTAL. |
PTAL. |
Separately, it might be worth writing a fake Similarly for a fake Receiver that simulates how long it takes to process messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
To decide how many messages to pull at a time, we aim for the
in-memory queue of messages to be a certain size. That gives us a
buffer of messages to draw from, ensuring high throughput, without
pulling so many messages that the unconsumed ones languish.
We measure the size by time instead of message count. Time is more
relevant, because ack deadlines are expressed in time, and it's easier
to think about lost work (in the event of a crash) in terms of time
lost rather than messages lost.
We keep track of the average time it takes to process a message. Then
we can convert a queue size in time to a number of messages.
We compute processing time by measuring the time between when Receive
returns and when it is next called. Although this is incorrect in the
short term, because multiple goroutines may call Receive at the same time,
in the long run it is accurate enough.
We rejected the obvious alternative, measuring time from Receive to
Ack, because not every message will be acked. It is perfectly
reasonable for a subscriber to nack (or fail to ack) a significant
fraction of the messages it receives, but processing time for those
unacked messages should still be included in the calculation of how
many messages to pull.
This change significantly improves the Receive benchmark -- messages
per second is more than quadrupled. But there is more work to do. We
should pre-emptively pull messages when the queue size gets low, and
we should issue multiple ReceiveBatch calls concurrently.
Besides performance, this change also improves behavior over current
master at very low processing rates. Currently we pull a constant 10
messages per ReceiveBatch. If it takes a long time to process one
message, then the other 9 will sit in RAM and may expire. With this
change, we will pull just one message at a time if need be.
Addresses #691.