Skip to content

Commit

Permalink
fix(client): support batched notifications (#1327)
Browse files Browse the repository at this point in the history
  • Loading branch information
lavigneer authored and niklasad1 committed Apr 8, 2024
1 parent 8c73cc6 commit fe654e4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
20 changes: 20 additions & 0 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,26 @@ async fn notification_handler_works() {
}
}

#[tokio::test]
async fn batched_notification_handler_works() {
let server = WebSocketTestServer::with_hardcoded_notification(
"127.0.0.1:0".parse().unwrap(),
server_batched_notification("test", "batched server originated notification works".into()),
)
.with_default_timeout()
.await
.unwrap();

let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
{
let mut nh: Subscription<String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();
let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap();
assert_eq!("batched server originated notification works".to_owned(), response);
}
}

#[tokio::test]
async fn notification_without_polling_doesnt_make_client_unuseable() {
init_logger();
Expand Down
36 changes: 20 additions & 16 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,32 +767,36 @@ fn handle_backend_messages<R: TransportReceiverT>(
let mut batch = Vec::with_capacity(raw_responses.len());

let mut range = None;
let mut got_notif = false;

for r in raw_responses {
let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) else {
if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
let id = response.id.try_parse_inner_as_number()?;
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id, result });

let r = range.get_or_insert(id..id);

if id < r.start {
r.start = id;
}

if id > r.end {
r.end = id;
}
} else if let Ok(notif) = serde_json::from_str::<Notification<_>>(r.get()) {
got_notif = true;
process_notification(&mut manager.lock(), notif);
} else {
return Err(unparse_error(raw));
};

let id = response.id.try_parse_inner_as_number()?;
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id, result });

let r = range.get_or_insert(id..id);

if id < r.start {
r.start = id;
}

if id > r.end {
r.end = id;
}
}

if let Some(mut range) = range {
// the range is exclusive so need to add one.
range.end += 1;
process_batch_response(&mut manager.lock(), batch, range)?;
} else {
} else if !got_notif {
return Err(EmptyBatchRequest.into());
}
} else {
Expand Down
5 changes: 5 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ pub fn server_notification(method: &str, params: Value) -> String {
format!(r#"{{"jsonrpc":"2.0","method":"{}", "params":{} }}"#, method, serde_json::to_string(&params).unwrap())
}

/// Batched server originated notification
pub fn server_batched_notification(method: &str, params: Value) -> String {
format!(r#"[{{"jsonrpc":"2.0","method":"{}", "params":{} }}]"#, method, serde_json::to_string(&params).unwrap())
}

pub async fn http_request(body: Body, uri: Uri) -> Result<HttpResponse, String> {
let client = hyper::Client::new();
http_post(client, body, uri).await
Expand Down

0 comments on commit fe654e4

Please sign in to comment.