Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create StreamMock for testing Streams #5915

Merged
merged 5 commits into from
Aug 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities

pub mod io;
pub mod stream_mock;

mod macros;
pub mod task;
Expand Down
163 changes: 163 additions & 0 deletions tokio-test/src/stream_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#![cfg(not(loom))]

//! A mock stream implementing [`Stream`].
//!
//! # Overview
//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
//! intervals between items.
//!
//! # Usage
//! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder
//! allows you to enqueue actions such as returning items or waiting for a certain duration.
//!
//! # Example
//! ```rust
//!
//! use futures_util::StreamExt;
//! use std::time::Duration;
//! use tokio_test::stream_mock::StreamMockBuilder;
//!
//! async fn test_stream_mock_wait() {
//! let mut stream_mock = StreamMockBuilder::new()
//! .next(1)
//! .wait(Duration::from_millis(300))
//! .next(2)
//! .build();
//!
//! assert_eq!(stream_mock.next().await, Some(1));
//! let start = std::time::Instant::now();
//! assert_eq!(stream_mock.next().await, Some(2));
//! let elapsed = start.elapsed();
//! assert!(elapsed >= Duration::from_millis(300));
//! assert_eq!(stream_mock.next().await, None);
//! }
//! ```

use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use futures_core::Future;
shade marked this conversation as resolved.
Show resolved Hide resolved
use futures_core::{ready, Stream};
use tokio::time::{sleep_until, Instant, Sleep};

#[derive(Debug, Clone)]
enum Action<T: Unpin> {
Next(T),
Wait(Duration),
}

/// A builder for [`StreamMock`]
#[derive(Debug, Clone)]
pub struct StreamMockBuilder<T: Unpin> {
actions: VecDeque<Action<T>>,
}

impl<T: Unpin> StreamMockBuilder<T> {
/// Create a new empty [`StreamMockBuilder`]
pub fn new() -> Self {
StreamMockBuilder::default()
}

/// Queue an item to be returned by the stream
pub fn next(mut self, value: T) -> Self {
self.actions.push_back(Action::Next(value));
self
}

// Queue an item to be consumed by the sink,
// commented out until Sink is implemented.
//
// pub fn consume(mut self, value: T) -> Self {
// self.actions.push_back(Action::Consume(value));
// self
// }

/// Queue the stream to wait for a duration
pub fn wait(mut self, duration: Duration) -> Self {
self.actions.push_back(Action::Wait(duration));
self
}

/// Build the [`StreamMock`]
pub fn build(self) -> StreamMock<T> {
StreamMock {
actions: self.actions,
sleep: None,
}
}
}

impl<T: Unpin> Default for StreamMockBuilder<T> {
fn default() -> Self {
StreamMockBuilder {
actions: VecDeque::new(),
}
}
}

/// A mock stream implementing [`Stream`]
///
/// See [`StreamMockBuilder`] for more information.
#[derive(Debug)]
pub struct StreamMock<T: Unpin> {
actions: VecDeque<Action<T>>,
sleep: Option<Pin<Box<Sleep>>>,
}

impl<T: Unpin> StreamMock<T> {
fn next_action(&mut self) -> Option<Action<T>> {
self.actions.pop_front()
}
}

impl<T: Unpin> Stream for StreamMock<T> {
type Item = T;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// Try polling the sleep future first
if let Some(ref mut sleep) = self.sleep {
ready!(Pin::new(sleep).poll(cx));
// Since we're ready, discard the sleep future
self.sleep.take();
}

match self.next_action() {
Some(action) => match action {
Action::Next(item) => Poll::Ready(Some(item)),
Action::Wait(duration) => {
// Set up a sleep future and schedule this future to be polled again for it.
self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
cx.waker().wake_by_ref();

Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

impl<T: Unpin> Drop for StreamMock<T> {
fn drop(&mut self) {
shade marked this conversation as resolved.
Show resolved Hide resolved
let undropped_count = self
.actions
.iter()
.filter(|action| match action {
Action::Next(_) => true,
Action::Wait(_) => false,
})
.count();

assert!(
undropped_count == 0,
"StreamMock was dropped before all actions were consumed, {} actions were not consumed",
undropped_count
);
}
}
43 changes: 43 additions & 0 deletions tokio-test/tests/stream_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use futures_util::StreamExt;
use std::time::Duration;
use tokio_test::stream_mock::StreamMockBuilder;

#[tokio::test]
async fn test_stream_mock_empty() {
let mut stream_mock = StreamMockBuilder::<u32>::new().build();

assert_eq!(stream_mock.next().await, None);
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
async fn test_stream_mock_items() {
let mut stream_mock = StreamMockBuilder::new().next(1).next(2).build();

assert_eq!(stream_mock.next().await, Some(1));
assert_eq!(stream_mock.next().await, Some(2));
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
async fn test_stream_mock_wait() {
let mut stream_mock = StreamMockBuilder::new()
.next(1)
.wait(Duration::from_millis(300))
.next(2)
.build();

assert_eq!(stream_mock.next().await, Some(1));
let start = std::time::Instant::now();
assert_eq!(stream_mock.next().await, Some(2));
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(300));
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
#[should_panic(expected = "StreamMock was dropped before all actions were consumed")]
async fn test_stream_mock_drop_without_consuming_all() {
let stream_mock = StreamMockBuilder::new().next(1).next(2).build();
drop(stream_mock);
}