Skip to content

Commit

Permalink
fix: dont poll stream again if done (#2245)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Mar 9, 2023
1 parent ce26dfc commit 49dccf9
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions ethers-providers/src/stream/tx_stream.rs
Expand Up @@ -56,6 +56,8 @@ pub struct TransactionStream<'a, P, St> {
pub(crate) provider: &'a Provider<P>,
/// A stream of transaction hashes.
pub(crate) stream: St,
/// Marks if the stream is done
stream_done: bool,
/// max allowed futures to execute at once.
pub(crate) max_concurrent: usize,
}
Expand All @@ -68,6 +70,7 @@ impl<'a, P: JsonRpcClient, St> TransactionStream<'a, P, St> {
buffered: Default::default(),
provider,
stream,
stream_done: false,
max_concurrent,
}
}
Expand Down Expand Up @@ -102,21 +105,22 @@ where
}
}

let mut stream_done = false;
loop {
match Stream::poll_next(Pin::new(&mut this.stream), cx) {
Poll::Ready(Some(tx)) => {
if this.pending.len() < this.max_concurrent {
this.push_tx(tx);
} else {
this.buffered.push_back(tx);
if !this.stream_done {
loop {
match Stream::poll_next(Pin::new(&mut this.stream), cx) {
Poll::Ready(Some(tx)) => {
if this.pending.len() < this.max_concurrent {
this.push_tx(tx);
} else {
this.buffered.push_back(tx);
}
}
Poll::Ready(None) => {
this.stream_done = true;
break
}
_ => break,
}
Poll::Ready(None) => {
stream_done = true;
break
}
_ => break,
}
}

Expand All @@ -125,7 +129,7 @@ where
return tx
}

if stream_done && this.pending.is_empty() {
if this.stream_done && this.pending.is_empty() {
// all done
return Poll::Ready(None)
}
Expand Down

0 comments on commit 49dccf9

Please sign in to comment.