Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PgListener::next_buffered(), to support batch processing of notifications #3560

Merged
merged 5 commits into from
Nov 27, 2024

Conversation

chanks
Copy link
Contributor

@chanks chanks commented Oct 13, 2024

Hello! I wanted to support batch processing of notifications when there is a lot of them, so it made sense to grab whatever notifications might be readily available. try_next() is unsuitable for that, since if the buffer is empty, I don't want to wait for a new NOTIFY to happen before progressing with the batch. So, it seemed to make sense to add a new non-async method that is essentially the first buffer check of try_recv(), and then call that in a loop to build up a batch until it returns None (or a batch limit is released).

I updated try_recv() to call the new function, just to make things as DRY as possible, but it isn't a huge improvement. Happy to revert that part if you prefer.

Also happy to bikeshed the new method's name/interface. A couple options I considered were:

// Similar to `read_to_end()` and `read_to_string()` in the Read trait 
pub fn fill_notification_buffer(&mut self, buffer: &mut Vec<PgNotification>) -> usize {
  // Add all notifications in the buffer to the vec and return the number of notifications we added.
  // UnboundedReceiver doesn't support receiving multiple messages at the same time, though,
  // so there doesn't seem to be any performance benefit the way there would be with the Read trait.
}
// Just implement the loop behavior for the caller. This isn't anything they couldn't implement
// themselves on top of try_recv_buffered(), though, and it doesn't let them reuse a buffer vec.
pub async fn try_recv_multiple(&mut self, limit: usize) -> Result<Vec<PgNotification>, Error> {
  let vec =
    if let Some(first) = self.try_recv().await? {
      vec![first]
    } else {
      return Ok(vec![]);
    };
  loop {
    if vec.len() >= limit {
      return Ok(vec)
    }
    if let Some(n) = self.try_recv_buffered() {
      vec.push(n);
    } else {
      return Ok(vec);
    }
  }

Anyway, ran the tests with cargo test -p sqlx-postgres --all-features, please let me know if you think there's anything else worth testing/running.

Thanks!

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
@chanks
Copy link
Contributor Author

chanks commented Oct 13, 2024

Changed function name in response to feedback, and got tests passing. Frustratingly, try_recv() doesn't seem to be enough to fill the buffer of more than one notification, but a simple SQL statement like 'SELECT 1' is. Not sure why that is.

@abonander
Copy link
Collaborator

try_recv() only drains the buffer, it does not add to it. If it reads a notification, it returns it directly.

The buffer exists because the server may send notifications at almost any time, including while executing a query.

I don't have much issue with changing try_recv() to read all available messages into the buffer before popping the first one, but strictly speaking the exact behavior of the method has never been guaranteed. And in fact, the documentation only says:

Receives the next notification available from any of the subscribed channels.

Now that I think about it, I'm actually wary of changing this behavior because, in the case of the server constantly emitting notifications, we'd have potential issues with it never returning and just continuing to fill the buffer forever. We'd have to stop reading messages at some arbitrary, or configurable limit.

I might end up re-thinking the PgListener API entirely, because there's a number of parts I'm not super happy with.

@abonander
Copy link
Collaborator

Now that I think about this more, if you actually want it to try reading from the socket and just not wait, you would just use .try_recv().now_or_never(): https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.now_or_never

I would hesitate to recommend it right now only because if the connection is broken, it'll try to reconnect but because it doesn't wait, it'll just open a socket and then immediately close it. We'd need to spawn the Pool::acquire() call as a task and store the JoinHandle.

@chanks
Copy link
Contributor Author

chanks commented Oct 17, 2024

Thanks @abonander. I didn't know about now_or_never(), but it seems like that approach relies a bit more on implementation details than I'd like (relying on the method remaining cancel-safe and no other await being introduced, in addition to the broken-connection issue you mention).

Having try_recv() read up to a hard-coded number of notifications (say, 100?) seems like a pretty straightforward and safe change to me, and it could be made configurable whenever the PgListener interface is rethought. I'm happy to make that change and update the PR if you're open to it.

@abonander abonander changed the title Add PgListener::try_recv_buffered(), to support batch processing of notifications Add PgListener::next_buffered(), to support batch processing of notifications Nov 27, 2024
@abonander
Copy link
Collaborator

Before long, I want to make a significant refactor to the driver code that runs the connection state machine in its own task. Then, it will always eagerly read notifications and send them over a channel to be processed.

@abonander abonander merged commit 4f10962 into launchbadge:main Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants