Skip to content

Commit

Permalink
Create StreamMock for testing Streams
Browse files Browse the repository at this point in the history
Inroduce a new mock type to tests streams and eventually
sinks. Only includes next() and wait() for now. Fixes #4106
  • Loading branch information
Joe Thomas committed Aug 6, 2023
1 parent e5e8855 commit 169ede8
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 0 deletions.
1 change: 1 addition & 0 deletions tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities

pub mod io;
pub mod stream_mock;

mod macros;
pub mod task;
Expand Down
163 changes: 163 additions & 0 deletions tokio-test/src/stream_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#![cfg(not(loom))]

//! A mock stream implementing [`Stream`].
//!
//! # Overview
//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
//! intervals between items.
//!
//! # Usage
//! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder
//! allows you to enqueue actions such as returning items or waiting for a certain duration.
//!
//! # Example
//! ```rust
//!
//! use futures_util::StreamExt;
//! use std::time::Duration;
//! use tokio_test::stream_mock::StreamMockBuilder;
//!
//! async fn test_stream_mock_wait() {
//! let mut stream_mock = StreamMockBuilder::new()
//! .next(1)
//! .wait(Duration::from_millis(300))
//! .next(2)
//! .build();
//!
//! assert_eq!(stream_mock.next().await, Some(1));
//! let start = std::time::Instant::now();
//! assert_eq!(stream_mock.next().await, Some(2));
//! let elapsed = start.elapsed();
//! assert!(elapsed >= Duration::from_millis(300));
//! assert_eq!(stream_mock.next().await, None);
//! }
//! ```

use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use futures_core::Future;
use futures_core::{ready, Stream};
use tokio::time::{sleep_until, Instant, Sleep};

#[derive(Debug, Clone)]
enum Action<T: Unpin> {
Next(T),
Wait(Duration),
}

/// A builder for [`StreamMock`]
#[derive(Debug, Clone)]
pub struct StreamMockBuilder<T: Unpin> {
actions: VecDeque<Action<T>>,
}

impl<T: Unpin> StreamMockBuilder<T> {
/// Create a new empty [`StreamMockBuilder`]
pub fn new() -> Self {
StreamMockBuilder::default()
}

/// Queue an item to be returned by the stream
pub fn next(mut self, value: T) -> Self {
self.actions.push_back(Action::Next(value));
self
}

// Queue an item to be consumed by the sink,
// commented out until Sink is implemented.
//
// pub fn consume(mut self, value: T) -> Self {
// self.actions.push_back(Action::Consume(value));
// self
// }

/// Queue the stream to wait for a duration
pub fn wait(mut self, duration: Duration) -> Self {
self.actions.push_back(Action::Wait(duration));
self
}

/// Build the [`StreamMock`]
pub fn build(self) -> StreamMock<T> {
StreamMock {
actions: self.actions,
sleep: None,
}
}
}

impl<T: Unpin> Default for StreamMockBuilder<T> {
fn default() -> Self {
StreamMockBuilder {
actions: VecDeque::new(),
}
}
}

/// A mock stream implementing [`Stream`]
///
/// See [`StreamMockBuilder`] for more information.
#[derive(Debug)]
pub struct StreamMock<T: Unpin> {
actions: VecDeque<Action<T>>,
sleep: Option<Pin<Box<Sleep>>>,
}

impl<T: Unpin> StreamMock<T> {
fn next_action(&mut self) -> Option<Action<T>> {
self.actions.pop_front()
}
}

impl<T: Unpin> Stream for StreamMock<T> {
type Item = T;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// Try polling the sleep future first
if let Some(ref mut sleep) = self.sleep {
ready!(Pin::new(sleep).poll(cx));
// Since we're ready, discard the sleep future
self.sleep.take();
}

match self.next_action() {
Some(action) => match action {
Action::Next(item) => Poll::Ready(Some(item)),
Action::Wait(duration) => {
// Set up a sleep future and schedule this future to be polled again for it.
self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
cx.waker().wake_by_ref();

Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

impl<T: Unpin> Drop for StreamMock<T> {
fn drop(&mut self) {
let undropped_count = self
.actions
.iter()
.filter(|action| match action {
Action::Next(_) => true,
Action::Wait(_) => false,
})
.count();

assert!(
undropped_count == 0,
"StreamMock was dropped before all actions were consumed, {} actions were not consumed",
undropped_count
);
}
}
43 changes: 43 additions & 0 deletions tokio-test/tests/stream_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use futures_util::StreamExt;
use std::time::Duration;
use tokio_test::stream_mock::StreamMockBuilder;

#[tokio::test]
async fn test_stream_mock_empty() {
let mut stream_mock = StreamMockBuilder::<u32>::new().build();

assert_eq!(stream_mock.next().await, None);
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
async fn test_stream_mock_items() {
let mut stream_mock = StreamMockBuilder::new().next(1).next(2).build();

assert_eq!(stream_mock.next().await, Some(1));
assert_eq!(stream_mock.next().await, Some(2));
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
async fn test_stream_mock_wait() {
let mut stream_mock = StreamMockBuilder::new()
.next(1)
.wait(Duration::from_millis(300))
.next(2)
.build();

assert_eq!(stream_mock.next().await, Some(1));
let start = std::time::Instant::now();
assert_eq!(stream_mock.next().await, Some(2));
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(300));
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
#[should_panic(expected = "StreamMock was dropped before all actions were consumed")]
async fn test_stream_mock_drop_without_consuming_all() {
let stream_mock = StreamMockBuilder::new().next(1).next(2).build();
drop(stream_mock);
}

0 comments on commit 169ede8

Please sign in to comment.