diff --git a/src/async_impl/h3_client/mod.rs b/src/async_impl/h3_client/mod.rs index 59e3a7667..3f57fbe98 100644 --- a/src/async_impl/h3_client/mod.rs +++ b/src/async_impl/h3_client/mod.rs @@ -40,8 +40,12 @@ impl H3Client { trace!("did not find connection {:?} in pool so connecting...", key); let dest = pool::domain_as_uri(key.clone()); + + self.pool.connecting(key.clone())?; let (driver, tx) = self.connector.connect(dest).await?; - Ok(self.pool.new_connection(key, driver, tx)) + let client = self.pool.new_connection(key.clone(), driver, tx); + + Ok(client) } async fn send_request( diff --git a/src/async_impl/h3_client/pool.rs b/src/async_impl/h3_client/pool.rs index c31102aab..6fcb8e719 100644 --- a/src/async_impl/h3_client/pool.rs +++ b/src/async_impl/h3_client/pool.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::mpsc::{Receiver, TryRecvError}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -27,12 +27,21 @@ impl Pool { pub fn new(timeout: Option) -> Self { Self { inner: Arc::new(Mutex::new(PoolInner { + connecting: HashSet::new(), idle_conns: HashMap::new(), timeout, })), } } + pub fn connecting(&self, key: Key) -> Result<(), BoxError> { + let mut inner = self.inner.lock().unwrap(); + if !inner.connecting.insert(key.clone()) { + return Err(format!("HTTP/3 connecting already in progress for {:?}", key).into()); + } + return Ok(()); + } + pub fn try_pool(&self, key: &Key) -> Option { let mut inner = self.inner.lock().unwrap(); let timeout = inner.timeout; @@ -77,13 +86,18 @@ impl Pool { let client = PoolClient::new(tx); let conn = PoolConnection::new(client.clone(), close_rx); - inner.insert(key, conn); + inner.insert(key.clone(), conn); + + // We clean up "connecting" here so we don't have to acquire the lock again. + let existed = inner.connecting.remove(&key); + debug_assert!(existed, "key not in connecting set"); client } } struct PoolInner { + connecting: HashSet, idle_conns: HashMap, timeout: Option, }