Skip to content

Commit

Permalink
fix: reading output from orphan process (#7402)
Browse files Browse the repository at this point in the history
### Description

This PR fixes an unintended behavior where an orphaned process could
result in `turbo` not exiting.

Hitting this behavior required a few things:
- the user defined task spawns it's own child process that inherits
stdout/stderr e.g. `sh -c "sleep 10"` will start a child process `sleep
10`
- an external program kills the parent in a way that doesn't allow it to
clean up children e.g. `SIGKILL`

Use of `child.wait_with_piped_outputs` would then hang until the
orphaned process exits as `stdout`/`stderr` are kept open by the orphan.
We work around this by additionally checking if the child has exited and
if it wasn't a successful exit we abort the reading of stdout/stderr.

We will still read stdout/stderr to completion if the parent process
exits with 0 as it might intentional to have the orphan produce
additional output before it finishes.

This fix only is needed for non-PTY tasks as we close up the control
side of the PTY on child exit which will close the child output.

(I also discovered that we were returning `None` on any `child.wait()`
calls after the first one completed instead of the actual exit value.
The seconds commit fixes this)

### Testing Instructions

Added red->green unit test that produces an orphaned process that keeps
stdout/stderr open.


Closes TURBO-2383
  • Loading branch information
chris-olszewski committed Feb 15, 2024
1 parent fa16459 commit d753f95
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
59 changes: 58 additions & 1 deletion crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,8 @@ impl Child {

/// Wait for the `Child` to exit, returning the exit code.
pub async fn wait(&mut self) -> Option<ChildExit> {
self.exit_channel.changed().await.ok()?;
// If sending end of exit channel closed, then return last value in the channel
self.exit_channel.changed().await.ok();
*self.exit_channel.borrow()
}

Expand Down Expand Up @@ -671,6 +672,7 @@ impl Child {
let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new();

let mut is_exited = false;
loop {
tokio::select! {
Some(result) = next_line(&mut stdout_lines, &mut stdout_buffer) => {
Expand All @@ -685,6 +687,14 @@ impl Child {
stdout_pipe.write_all(&stderr_buffer)?;
stderr_buffer.clear();
}
status = self.wait(), if !is_exited => {
is_exited = true;
// We don't abort in the cases of a zero exit code as we could be
// caching this task and should read all the logs it produces.
if status != Some(ChildExit::Finished(Some(0))) {
return Ok(status);
}
}
else => {
// In the case that both futures read a complete line
// the future not chosen in the select will return None if it's at EOF
Expand Down Expand Up @@ -827,6 +837,21 @@ mod test {
assert_matches!(&*state, ChildState::Exited(ChildExit::Killed));
}

#[test_case(false)]
#[test_case(TEST_PTY)]
#[tokio::test]
async fn test_wait(use_pty: bool) {
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap();

let exit1 = child.wait().await;
let exit2 = child.wait().await;
assert_matches!(exit1, Some(ChildExit::Finished(Some(0))));
assert_matches!(exit2, Some(ChildExit::Finished(Some(0))));
}

#[test_case(false)]
#[test_case(TEST_PTY)]
#[tokio::test]
Expand Down Expand Up @@ -1170,6 +1195,38 @@ mod test {
assert_matches!(exit, Some(ChildExit::Killed));
}

#[cfg(unix)]
#[tokio::test]
async fn test_orphan_process() {
let mut cmd = Command::new("sh");
cmd.args(["-c", "echo hello; sleep 120; echo done"]);
let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap();

tokio::time::sleep(STARTUP_DELAY).await;

let child_pid = child.pid().unwrap() as i32;
// We don't kill the process group to simulate what an external program might do
unsafe {
libc::kill(child_pid, libc::SIGKILL);
}

let exit = child.wait().await;
assert_matches!(exit, Some(ChildExit::KilledExternal));

let mut output = Vec::new();
match tokio::time::timeout(
Duration::from_millis(500),
child.wait_with_piped_outputs(&mut output),
)
.await
{
Ok(exit_status) => {
assert_matches!(exit_status, Ok(Some(ChildExit::KilledExternal)));
}
Err(_) => panic!("expected wait_with_piped_outputs to exit after it was killed"),
}
}

#[test_case(false)]
#[test_case(TEST_PTY)]
#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ mod test {
assert_eq!(kill_code, Some(ChildExit::Finished(Some(0))));

let code = child.wait().await;
assert_eq!(code, None);
assert_eq!(code, Some(ChildExit::Finished(Some(0))));
}

#[tokio::test]
Expand Down

0 comments on commit d753f95

Please sign in to comment.