Skip to content

Commit

Permalink
Implement opt-in for enabling WASI to block the current thread (#8190)
Browse files Browse the repository at this point in the history
* Implement opt-in for enabling WASI to block the current thread

Currently all of Wasmtime's implementation of WASI is built on Tokio,
but some operations are currently not asynchronous such as opening a
file or reading a directory. Internally these use `spawn_blocking` to
satisfy the requirements of async users of WASI to avoid blocking the
current thread. This use of `spawn_blocking`, however, ends up causing
mostly just performance overhead in the use case of the CLI, for
example, where async wasm is not used. That then leads to this commit,
implementing an opt-in mechanism to be able to block the current thread.

A `WasiCtx` now has a flag indicating whether it's ok to block the
current thread and that's carried to various filesystem operations that
use `spawn_blocking`. The call to `spawn_blocking` is now conditional
and skipped if this flag is set.

Additionally the invocation point in the CLI for wasm modules is wrapped
in a Tokio runtime to avoid entering/exiting Tokio in the "leaves" when
wasm calls the host, as happens today. This hits a better fast path in
Tokio that appears to be more efficient.

Semantically this should not result in any change for CLI programs
except in one case: file writes. By default writes on `output-stream` in
WASI are asynchronous meaning that only one write can be in flight at a
time. That being said all current users are immediately blocking waiting
for this write to finish anyway, so this commit won't end up changing
much. It's already the case that file reads are always blocking, for
example. If necessary in the future though this can be further
special-cased at the preview1 layer.

* Thread around allow_blocking_current_thread less

Pass around `&File` instead.
  • Loading branch information
alexcrichton committed Mar 20, 2024
1 parent 5e05171 commit f4bee25
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 64 deletions.
44 changes: 43 additions & 1 deletion crates/wasi/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct WasiCtxBuilder {
wall_clock: Box<dyn HostWallClock + Send>,
monotonic_clock: Box<dyn HostMonotonicClock + Send>,
allowed_network_uses: AllowedNetworkUses,
allow_blocking_current_thread: bool,
built: bool,
}

Expand Down Expand Up @@ -80,6 +81,7 @@ impl WasiCtxBuilder {
wall_clock: wall_clock(),
monotonic_clock: monotonic_clock(),
allowed_network_uses: AllowedNetworkUses::default(),
allow_blocking_current_thread: false,
built: false,
}
}
Expand Down Expand Up @@ -115,6 +117,37 @@ impl WasiCtxBuilder {
self.inherit_stdin().inherit_stdout().inherit_stderr()
}

/// Configures whether or not blocking operations made through this
/// `WasiCtx` are allowed to block the current thread.
///
/// WASI is currently implemented on top of the Rust
/// [Tokio](https://tokio.rs/) library. While most WASI APIs are
/// non-blocking some are instead blocking from the perspective of
/// WebAssembly. For example opening a file is a blocking operation with
/// respect to WebAssembly but it's implemented as an asynchronous operation
/// on the host. This is currently done with Tokio's
/// [`spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).
///
/// When WebAssembly is used in a synchronous context, for example when
/// [`Config::async_support`] is disabled, then this asynchronous operation
/// is quickly turned back into a synchronous operation with a `block_on` in
/// Rust. This switching back-and-forth between a blocking a non-blocking
/// context can have overhead, and this option exists to help alleviate this
/// overhead.
///
/// This option indicates that for WASI functions that are blocking from the
/// perspective of WebAssembly it's ok to block the native thread as well.
/// This means that this back-and-forth between async and sync won't happen
/// and instead blocking operations are performed on-thread (such as opening
/// a file). This can improve the performance of WASI operations when async
/// support is disabled.
///
/// [`Config::async_support`]: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.async_support
pub fn allow_blocking_current_thread(&mut self, enable: bool) -> &mut Self {
self.allow_blocking_current_thread = enable;
self
}

pub fn envs(&mut self, env: &[(impl AsRef<str>, impl AsRef<str>)]) -> &mut Self {
self.env.extend(
env.iter()
Expand Down Expand Up @@ -162,7 +195,13 @@ impl WasiCtxBuilder {
open_mode |= OpenMode::WRITE;
}
self.preopens.push((
Dir::new(dir, perms, file_perms, open_mode),
Dir::new(
dir,
perms,
file_perms,
open_mode,
self.allow_blocking_current_thread,
),
path.as_ref().to_owned(),
));
self
Expand Down Expand Up @@ -263,6 +302,7 @@ impl WasiCtxBuilder {
wall_clock,
monotonic_clock,
allowed_network_uses,
allow_blocking_current_thread,
built: _,
} = mem::replace(self, Self::new());
self.built = true;
Expand All @@ -281,6 +321,7 @@ impl WasiCtxBuilder {
wall_clock,
monotonic_clock,
allowed_network_uses,
allow_blocking_current_thread,
}
}

Expand Down Expand Up @@ -310,6 +351,7 @@ pub struct WasiCtx {
pub(crate) stderr: Box<dyn StdoutStream>,
pub(crate) socket_addr_check: SocketAddrCheck,
pub(crate) allowed_network_uses: AllowedNetworkUses,
pub(crate) allow_blocking_current_thread: bool,
}

pub struct AllowedNetworkUses {
Expand Down
136 changes: 94 additions & 42 deletions crates/wasi/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ bitflags::bitflags! {
}
}

#[derive(Clone)]
pub struct File {
/// The operating system File this struct is mediating access to.
///
Expand All @@ -92,14 +93,22 @@ pub struct File {
/// doesn't presently provide a cross-platform equivelant of reading the
/// oflags back out using fcntl.
pub open_mode: OpenMode,

allow_blocking_current_thread: bool,
}

impl File {
pub fn new(file: cap_std::fs::File, perms: FilePerms, open_mode: OpenMode) -> Self {
pub fn new(
file: cap_std::fs::File,
perms: FilePerms,
open_mode: OpenMode,
allow_blocking_current_thread: bool,
) -> Self {
Self {
file: Arc::new(file),
perms,
open_mode,
allow_blocking_current_thread,
}
}

Expand All @@ -110,11 +119,31 @@ impl File {
F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
R: Send + 'static,
{
let f = self.file.clone();
spawn_blocking(move || body(&f)).await
match self._spawn_blocking(body) {
SpawnBlocking::Done(result) => result,
SpawnBlocking::Spawned(task) => task.await,
}
}

fn _spawn_blocking<F, R>(&self, body: F) -> SpawnBlocking<R>
where
F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
R: Send + 'static,
{
if self.allow_blocking_current_thread {
SpawnBlocking::Done(body(&self.file))
} else {
let f = self.file.clone();
SpawnBlocking::Spawned(spawn_blocking(move || body(&f)))
}
}
}

enum SpawnBlocking<T> {
Done(T),
Spawned(AbortOnDropJoinHandle<T>),
}

bitflags::bitflags! {
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct DirPerms: usize {
Expand Down Expand Up @@ -146,6 +175,8 @@ pub struct Dir {
/// doesn't presently provide a cross-platform equivelant of reading the
/// oflags back out using fcntl.
pub open_mode: OpenMode,

allow_blocking_current_thread: bool,
}

impl Dir {
Expand All @@ -154,12 +185,14 @@ impl Dir {
perms: DirPerms,
file_perms: FilePerms,
open_mode: OpenMode,
allow_blocking_current_thread: bool,
) -> Self {
Dir {
dir: Arc::new(dir),
perms,
file_perms,
open_mode,
allow_blocking_current_thread,
}
}

Expand All @@ -170,30 +203,38 @@ impl Dir {
F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
R: Send + 'static,
{
let d = self.dir.clone();
spawn_blocking(move || body(&d)).await
if self.allow_blocking_current_thread {
body(&self.dir)
} else {
let d = self.dir.clone();
spawn_blocking(move || body(&d)).await
}
}
}

pub struct FileInputStream {
file: Arc<cap_std::fs::File>,
file: File,
position: u64,
}
impl FileInputStream {
pub fn new(file: Arc<cap_std::fs::File>, position: u64) -> Self {
Self { file, position }
pub fn new(file: &File, position: u64) -> Self {
Self {
file: file.clone(),
position,
}
}

pub async fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
use system_interface::fs::FileIoExt;
let f = Arc::clone(&self.file);
let p = self.position;
let (r, mut buf) = spawn_blocking(move || {
let mut buf = BytesMut::zeroed(size);
let r = f.read_at(&mut buf, p);
(r, buf)
})
.await;
let (r, mut buf) = self
.file
.spawn_blocking(move |f| {
let mut buf = BytesMut::zeroed(size);
let r = f.read_at(&mut buf, p);
(r, buf)
})
.await;
let n = read_result(r)?;
buf.truncate(n);
self.position += n as u64;
Expand Down Expand Up @@ -222,7 +263,7 @@ pub(crate) enum FileOutputMode {
}

pub(crate) struct FileOutputStream {
file: Arc<cap_std::fs::File>,
file: File,
mode: FileOutputMode,
state: OutputState,
}
Expand All @@ -238,16 +279,17 @@ enum OutputState {
}

impl FileOutputStream {
pub fn write_at(file: Arc<cap_std::fs::File>, position: u64) -> Self {
pub fn write_at(file: &File, position: u64) -> Self {
Self {
file,
file: file.clone(),
mode: FileOutputMode::Position(position),
state: OutputState::Ready,
}
}
pub fn append(file: Arc<cap_std::fs::File>) -> Self {

pub fn append(file: &File) -> Self {
Self {
file,
file: file.clone(),
mode: FileOutputMode::Append,
state: OutputState::Ready,
}
Expand Down Expand Up @@ -275,33 +317,43 @@ impl HostOutputStream for FileOutputStream {
return Ok(());
}

let f = Arc::clone(&self.file);
let m = self.mode;
let task = spawn_blocking(move || match m {
FileOutputMode::Position(mut p) => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.write_at(buf.as_ref(), p)?;
// afterwards buf contains [nwritten, len):
let _ = buf.split_to(nwritten);
p += nwritten as u64;
total += nwritten;
let result = self.file._spawn_blocking(move |f| {
match m {
FileOutputMode::Position(mut p) => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.write_at(buf.as_ref(), p)?;
// afterwards buf contains [nwritten, len):
let _ = buf.split_to(nwritten);
p += nwritten as u64;
total += nwritten;
}
Ok(total)
}
Ok(total)
}
FileOutputMode::Append => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.append(buf.as_ref())?;
let _ = buf.split_to(nwritten);
total += nwritten;
FileOutputMode::Append => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.append(buf.as_ref())?;
let _ = buf.split_to(nwritten);
total += nwritten;
}
Ok(total)
}
Ok(total)
}
});
self.state = OutputState::Waiting(task);
self.state = match result {
SpawnBlocking::Done(Ok(nwritten)) => {
if let FileOutputMode::Position(ref mut p) = &mut self.mode {
*p += nwritten as u64;
}
OutputState::Ready
}
SpawnBlocking::Done(Err(e)) => OutputState::Error(e),
SpawnBlocking::Spawned(task) => OutputState::Waiting(task),
};
Ok(())
}
fn flush(&mut self) -> Result<(), StreamError> {
Expand Down
24 changes: 11 additions & 13 deletions crates/wasi/src/host/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ impl<T: WasiView> HostDescriptor for T {
use system_interface::fs::{FdFlags, GetSetFdFlags};
use types::{DescriptorFlags, OpenFlags};

let allow_blocking_current_thread = self.ctx().allow_blocking_current_thread;
let table = self.table();
let d = table.get(&fd)?.dir()?;
if !d.perms.contains(DirPerms::READ) {
Expand Down Expand Up @@ -609,11 +610,15 @@ impl<T: WasiView> HostDescriptor for T {
d.perms,
d.file_perms,
open_mode,
allow_blocking_current_thread,
)))?),

OpenResult::File(file) => {
Ok(table.push(Descriptor::File(File::new(file, d.file_perms, open_mode)))?)
}
OpenResult::File(file) => Ok(table.push(Descriptor::File(File::new(
file,
d.file_perms,
open_mode,
allow_blocking_current_thread,
)))?),

OpenResult::NotDir => Err(ErrorCode::NotDirectory.into()),
}
Expand Down Expand Up @@ -730,11 +735,9 @@ impl<T: WasiView> HostDescriptor for T {
if !f.perms.contains(FilePerms::READ) {
Err(types::ErrorCode::BadDescriptor)?;
}
// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let reader = FileInputStream::new(clone, offset);
let reader = FileInputStream::new(f, offset);

// Insert the stream view into the table. Trap if the table is full.
let index = self.table().push(InputStream::File(reader))?;
Expand All @@ -754,11 +757,8 @@ impl<T: WasiView> HostDescriptor for T {
Err(types::ErrorCode::BadDescriptor)?;
}

// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let writer = FileOutputStream::write_at(clone, offset);
let writer = FileOutputStream::write_at(f, offset);
let writer: OutputStream = Box::new(writer);

// Insert the stream view into the table. Trap if the table is full.
Expand All @@ -777,11 +777,9 @@ impl<T: WasiView> HostDescriptor for T {
if !f.perms.contains(FilePerms::WRITE) {
Err(types::ErrorCode::BadDescriptor)?;
}
// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let appender = FileOutputStream::append(clone);
let appender = FileOutputStream::append(f);
let appender: OutputStream = Box::new(appender);

// Insert the stream view into the table. Trap if the table is full.
Expand Down

0 comments on commit f4bee25

Please sign in to comment.