Skip to content

Commit

Permalink
add DefaultTimeoutService to set default timeouts per route
Browse files Browse the repository at this point in the history
This also makes turborepo respect the client's preferred timeout also,
rather than defaulting to 100ms across the board.
  • Loading branch information
Alexander Lyon authored and Alexander Lyon committed Feb 20, 2024
1 parent 9046991 commit e365d7c
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 8 deletions.
16 changes: 12 additions & 4 deletions crates/turborepo-lib/src/daemon/connector.rs
Expand Up @@ -416,11 +416,15 @@ mod test {
select,
sync::{oneshot::Sender, Mutex},
};
use tower::ServiceBuilder;
use tracing::info;
use turbopath::AbsoluteSystemPathBuf;

use super::*;
use crate::daemon::proto;
use crate::daemon::{
default_timeout_layer::DefaultTimeoutLayer,
proto::{self, turbod_client::TurbodClient},
};

#[cfg(not(target_os = "windows"))]
const NODE_EXE: &str = "node";
Expand Down Expand Up @@ -650,10 +654,14 @@ mod test {
}
};

let server_fut = tonic::transport::Server::builder()
.add_service(proto::turbod_server::TurbodServer::new(DummyServer {
let service = ServiceBuilder::new().layer(DefaultTimeoutLayer).service(
proto::turbod_server::TurbodServer::new(DummyServer {
shutdown: Mutex::new(Some(shutdown_tx)),
}))
}),
);

let server_fut = tonic::transport::Server::builder()
.add_service(service)
.serve_with_incoming(stream);

let tmp_dir = tempfile::tempdir().unwrap();
Expand Down
81 changes: 81 additions & 0 deletions crates/turborepo-lib/src/daemon/default_timeout_layer.rs
@@ -0,0 +1,81 @@
//! default timeout layer
//!
//! This module provides some basic middleware that aims to
//! improve the flexibility of the daemon server by doing
//! two things:
//!
//! a) remove the server-wide timeout of 100ms in favour of
//! a less aggressive 30s. the way tonic works is the
//! lowest timeout (server vs request-specific) is always
//! used meaning clients' timeout requests were ignored
//! if set to >100ms
//! b) add a middleware to reinstate the timeout, if the
//! client does not specify it, defaulting to 100ms for
//! 'non-blocking' calls (requests in the hot path for
//! a run of turbo), and falling back to the server
//! limit for blocking ones (useful in cases like the
//! LSP)
//!
//! With this in place, it means that clients can specify
//! a timeout that it wants (as long as it is less than 30s),
//! and the server has sane defaults

use std::time::Duration;

use tonic::{codegen::http::Request, server::NamedService, transport::Body};
use tower::{Layer, Service};

#[derive(Clone, Debug)]
pub struct DefaultTimeoutService<S> {
inner: S,
}

impl<S> Service<Request<Body>> for DefaultTimeoutService<S>
where
S: Service<Request<Body>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: Request<Body>) -> Self::Future {
if !req.uri().path().ends_with("Blocking") {
req.headers_mut()
.entry("grpc-timeout")
.or_insert_with(move || {
let dur = Duration::from_millis(100);
tonic::codegen::http::HeaderValue::from_str(&format!("{}u", dur.as_micros()))
.expect("numbers are always valid ascii")
});
};

self.inner.call(req)
}
}

#[derive(Clone, Debug)]
pub struct DefaultTimeoutLayer;

impl<S> Layer<S> for DefaultTimeoutLayer {
type Service = DefaultTimeoutService<S>;

fn layer(&self, inner: S) -> Self::Service {
DefaultTimeoutService { inner }
}
}

impl<T: NamedService> NamedService for DefaultTimeoutService<T> {
const NAME: &'static str = T::NAME;
}

mod test {
#[test]
fn overrides_timeout_for_non_blocking() {}
}
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/daemon/mod.rs
Expand Up @@ -24,6 +24,7 @@ mod bump_timeout;
mod bump_timeout_layer;
mod client;
mod connector;
mod default_timeout_layer;
pub(crate) mod endpoint;
mod server;

Expand Down
17 changes: 13 additions & 4 deletions crates/turborepo-lib/src/daemon/server.rs
Expand Up @@ -13,14 +13,15 @@ use std::{
};

use futures::Future;
use prost::DecodeError;
use semver::Version;
use thiserror::Error;
use tokio::{
select,
sync::{mpsc, oneshot},
task::JoinHandle,
};
use tonic::transport::{NamedService, Server};
use tonic::{server::NamedService, transport::Server};
use tower::ServiceBuilder;
use tracing::{error, info, trace, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
Expand All @@ -35,7 +36,10 @@ use turborepo_repository::discovery::{
};

use super::{bump_timeout::BumpTimeout, endpoint::SocketOpenError, proto};
use crate::daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket, Paths};
use crate::daemon::{
bump_timeout_layer::BumpTimeoutLayer, default_timeout_layer::DefaultTimeoutLayer,
endpoint::listen_socket, Paths,
};

#[derive(Debug)]
#[allow(dead_code)]
Expand Down Expand Up @@ -121,7 +125,7 @@ impl FileWatching {
}

/// Timeout for every RPC the server handles
const REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);

pub struct TurboGrpcService<S, PDB> {
repo_root: AbsoluteSystemPathBuf,
Expand Down Expand Up @@ -231,12 +235,17 @@ where
let server_fut = {
let service = ServiceBuilder::new()
.layer(BumpTimeoutLayer::new(bump_timeout.clone()))
.layer(DefaultTimeoutLayer)
.service(crate::daemon::proto::turbod_server::TurbodServer::new(
service,
));

Server::builder()
// set a max timeout for RPCs
// we respect the timeout specified by the client if it is set, but
// have a default timeout for non-blocking calls of 100ms, courtest of
// `DefaultTimeoutMiddleware`. the REQUEST_TIMEOUT, however, is the
// maximum time we will wait for a response, regardless of the client's
// preferences. it cannot be exceeded.
.timeout(REQUEST_TIMEOUT)
.add_service(service)
.serve_with_incoming_shutdown(stream, shutdown_fut)
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/lib.rs
Expand Up @@ -6,6 +6,7 @@
#![feature(option_get_or_insert_default)]
#![feature(once_cell_try)]
#![feature(try_blocks)]
#![feature(impl_trait_in_assoc_type)]
#![deny(clippy::all)]
// Clippy's needless mut lint is buggy: https://github.com/rust-lang/rust-clippy/issues/11299
#![allow(clippy::needless_pass_by_ref_mut)]
Expand Down

0 comments on commit e365d7c

Please sign in to comment.