Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(async client): refactor background task #1145

Merged
merged 24 commits into from
Aug 3, 2023

Conversation

niklasad1
Copy link
Member

@niklasad1 niklasad1 commented Jun 21, 2023

Split send and receive to separate tasks to support multiplexing reads and writes

Summary of what was wrong before:

  1. A request from frontend came into this task
  2. We try to send it off to backend, .awaiting that to complete
  3. But! The backend is applying backpressure, oh no! so it won't accept any new messages until some messages are read from it to clear that queue.
  4. So this send keeps .awaiting because reading happened in the same task, no reads will happen until the send completes, so essentially we are in a deadlock situation.

Benches

group                                           client-multiplex                        master
-----                                           ----------------                        ------
async/ws_batch_requests/fast_call/10            1.00     42.2±4.86µs 231.5 KElem/sec    1.30     54.6±0.85µs 178.7 KElem/sec
async/ws_batch_requests/fast_call/100           1.00   317.8±20.38µs 307.3 KElem/sec    1.12   356.3±19.86µs 274.1 KElem/sec
async/ws_batch_requests/fast_call/2             1.56     52.9±3.86µs 36.9 KElem/sec     1.00     33.8±5.25µs 57.7 KElem/sec
async/ws_batch_requests/fast_call/5             1.00     37.3±5.52µs 130.8 KElem/sec    1.08     40.5±5.66µs 120.7 KElem/sec
async/ws_batch_requests/fast_call/50            1.11    192.4±4.70µs 253.8 KElem/sec    1.00   173.3±33.70µs 281.7 KElem/sec
async/ws_concurrent_conn_calls/fast_call/128    1.05      2.1±0.05ms        ? ?/sec     1.00  1966.4±11.49µs        ? ?/sec
async/ws_concurrent_conn_calls/fast_call/16     1.00  1091.3±89.12µs        ? ?/sec     1.06  1160.0±134.30µs        ? ?/sec
async/ws_concurrent_conn_calls/fast_call/2      1.00  1092.4±112.38µs        ? ?/sec    1.07  1174.0±86.38µs        ? ?/sec
async/ws_concurrent_conn_calls/fast_call/256    1.06      3.9±0.03ms        ? ?/sec     1.00      3.7±0.04ms        ? ?/sec
async/ws_concurrent_conn_calls/fast_call/32     1.00  1204.2±135.04µs        ? ?/sec    1.00  1209.4±143.03µs        ? ?/sec
async/ws_concurrent_conn_calls/fast_call/4      1.02  1241.3±71.76µs        ? ?/sec     1.00  1212.0±104.79µs        ? ?/sec
async/ws_concurrent_conn_calls/fast_call/64     1.05  1217.5±15.00µs        ? ?/sec     1.00  1159.5±10.37µs        ? ?/sec
async/ws_concurrent_conn_calls/fast_call/8      1.01  1186.8±123.04µs        ? ?/sec    1.00  1178.7±73.72µs        ? ?/sec

Split send and receive to separate tasks to support multiplexing reads/writes
@niklasad1 niklasad1 requested a review from a team as a code owner June 21, 2023 16:31
Copy link
Collaborator

@jsdw jsdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good to me overall, and the separate taks looks pretty clean!

If the ThreadSafeRequestManager didn't need to lock over await points then maybe a standard mutex would be a better choice (a safe way to do this would be to make the mutex an integral part of RequestManager so that each function only locks internall for as long as it needs and it's impossible to hold a lock over any await points or whatever).

I'd be interested to know whether this has any impact on benchmarks in either direction :)

}
}
_ = close_tx.closed() => {
break Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could some tracing debug logs be useful on the break branches to know the reason of the send_task exit?

Copy link
Member Author

@niklasad1 niklasad1 Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added Client dropped logs when if either the send or receive task returns Ok(()) ok?

These channels should be closed if send/receive tasks are closed, if something terminates with an error that must occur before this send fails.

However, if it's bug somewhere that could be tricky detect by looking at the logs.

@niklasad1 niklasad1 force-pushed the na-client-backpressure-support branch from 0509338 to 05bb6c8 Compare July 7, 2023 13:17

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.futs.is_empty() {
return Poll::Pending;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, the waker is not handed anywhere and so if any other futures are added, they will be ignored.

I think that this is ok though because you just call next() on this in a loop, so it'll get polled at least once per loop anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using pure FutureUnordered and then the select loop will be busy that it returns Poll::Ready i.e None when it's empty and steal CPU cycles from more important stuff, that's the reason why I added it...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if it's empty and futures are added those will read on the next iteration

Copy link
Collaborator

@jsdw jsdw Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, so when it's empty it'll spam Poll::Ready(None) whenever polled; yeah that sucks. Your version looks good to me offhand in the current code now, though it's a little scary that if used wrong it'll just .await forever, so a small footgun to be wary of for future us! Perhaps it's impossible to misuse though given it needs &mut self, but I havent convinced myself 100% :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be fine for now, because we use the read_task to drive this:

read_task
	select (server_receiver, closed, pending_unsub)
		server_receiver -> if Ok(Some(m)) = handle_backend_messages
			pending_unsub.push( to_send_task.send(m) )

One case where this might be problematic is when:

  • server_receiver and closed futures are not ready
  • the pending_unsub is empty and thus will return Poll::Pending and I believe will never woke up for the current iteration loop because there's nothing that will wake up the cx.waker again of the future
  • then pending_unsub has some elements added to it

But sincepending_unsub having elements added is dependent on server_receiver being triggered first; it should be fine here as long as we don't save the current pending_unsub future that returned Pending and instead call next directly in the loop future::select(rx_or_closed, pending_unsubscribes.next())

Let me know if my reasoning is right 🙏

Indeed there might be dragons if we change something in the read_task (maybe a comment to not save pending_unsub.next() future is enough here).

We may be able to handle this by:

  • if futs.is_empty(), then we could save the cx
  • when adding a new future, if we have Some(cx) we could wake up the waker
  • preserve the same pending_unsub.next() between iterations similar to let rx_item = backend_event.next();

Not quite sure if worth going that way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you are correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did what you suggested it didn't require much code but instead I had to clone the Waker because Context is !Send

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save the Waker and not the context; that's sendable :)

Copy link
Collaborator

@jsdw jsdw Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's what you did; I misread the earlier comment and github was showing me old stuff, whoops!

@jsdw
Copy link
Collaborator

jsdw commented Aug 1, 2023

There's a lot to work through here, but offhand it looks good to me now; no more unbounded queues from what I could see, and so backpressure looks like it'll work with sending and reading, and the two queues shouldn't block progrss from eachother I think now, so nice one!

Couple of nits above but I'm happy to see this merge, and if it does well enough on burnins then it's an improvement from where we were regardless :)

});

let closed = close_tx.closed();
let pending_unsubscribes = MaybePendingFutures::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder, is this only used for unsubscribes? Wouldn't the receiver: R also provide pings and plain messages?

Copy link
Member Author

@niklasad1 niklasad1 Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is really an edge-case if a subscription fails

i) a subscription call is answered but the client dropped the subscription (we want to unsubscribe then)
ii) if the subscription couldn't keep it with the server.

Thus, this really just is a matter to unsubscribe then so that no further message are received for that particular subscription...

@lexnv
Copy link
Contributor

lexnv commented Aug 2, 2023

This looks good to me! I believe the backpressure would work fine, nice job here!
Happy to merge this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants