Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hang on broken stderr. #9201

Merged
merged 1 commit into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct DrainState<'cfg> {
pending_queue: Vec<(Unit, Job)>,
print: DiagnosticPrinter<'cfg>,

// How many jobs we've finished
/// How many jobs we've finished
finished: usize,
}

Expand Down Expand Up @@ -469,7 +469,15 @@ impl<'cfg> DrainState<'cfg> {
// we're able to perform some parallel work.
while self.has_extra_tokens() && !self.pending_queue.is_empty() {
let (unit, job) = self.pending_queue.remove(0);
self.run(&unit, job, cx, scope)?;
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
// NOTE: An error here will drop the job without starting it.
// That should be OK, since we want to exit as soon as
// possible during an error.
self.note_working_on(cx.bcx.config, &unit, job.freshness())?;
}
self.run(&unit, job, cx, scope);
}

Ok(())
Expand Down Expand Up @@ -835,31 +843,22 @@ impl<'cfg> DrainState<'cfg> {
}
}

/// Executes a job, pushing the spawned thread's handled onto `threads`.
fn run(
&mut self,
unit: &Unit,
job: Job,
cx: &Context<'_, '_>,
scope: &Scope<'_>,
) -> CargoResult<()> {
/// Executes a job.
///
/// Fresh jobs block until finished (which should be very fast!), Dirty
/// jobs will spawn a thread in the background and return immediately.
fn run(&mut self, unit: &Unit, job: Job, cx: &Context<'_, '_>, scope: &Scope<'_>) {
let id = JobId(self.next_id);
self.next_id = self.next_id.checked_add(1).unwrap();

info!("start {}: {:?}", id, unit);

assert!(self.active.insert(id, unit.clone()).is_none());
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;

let messages = self.messages.clone();
let fresh = job.freshness();
let rmeta_required = cx.rmeta_required(unit);

if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
self.note_working_on(cx.bcx.config, unit, fresh)?;
}

let doit = move |state: JobState<'_>| {
let mut sender = FinishOnDrop {
messages: &state.messages,
Expand Down Expand Up @@ -934,8 +933,6 @@ impl<'cfg> DrainState<'cfg> {
});
}
}

Ok(())
}

fn emit_warnings(
Expand Down
84 changes: 82 additions & 2 deletions tests/testsuite/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use cargo::{
use cargo_test_support::paths::{root, CargoPathExt};
use cargo_test_support::registry::Package;
use cargo_test_support::{
basic_bin_manifest, basic_lib_manifest, basic_manifest, git, is_nightly, lines_match_unordered,
main_file, paths, project, rustc_host, sleep_ms, symlink_supported, t, Execs, ProjectBuilder,
basic_bin_manifest, basic_lib_manifest, basic_manifest, cargo_exe, git, is_nightly,
lines_match_unordered, main_file, paths, process, project, rustc_host, sleep_ms,
symlink_supported, t, Execs, ProjectBuilder,
};
use std::env;
use std::fs;
Expand Down Expand Up @@ -5256,6 +5257,85 @@ hello stderr!
lines_match_unordered("hello stdout!\n", &stdout).unwrap();
}

#[cargo_test]
fn close_output_during_drain() {
// Test to close the output during the build phase (drain_the_queue).
// There was a bug where it would hang.

// Server to know when rustc has spawned.
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();

// Create a wrapper so the test can know when compiling has started.
let rustc_wrapper = {
let p = project()
.at("compiler")
.file("Cargo.toml", &basic_manifest("compiler", "1.0.0"))
.file(
"src/main.rs",
&r#"
use std::process::Command;
use std::env;
use std::io::Read;

fn main() {
// Only wait on the first dependency.
if matches!(env::var("CARGO_PKG_NAME").as_deref(), Ok("dep")) {
let mut socket = std::net::TcpStream::connect("__ADDR__").unwrap();
// Wait for the test to tell us to start printing.
let mut buf = [0];
drop(socket.read_exact(&mut buf));
}
let mut cmd = Command::new("rustc");
for arg in env::args_os().skip(1) {
cmd.arg(arg);
}
std::process::exit(cmd.status().unwrap().code().unwrap());
}
"#
.replace("__ADDR__", &addr.to_string()),
)
.build();
p.cargo("build").run();
p.bin("compiler")
};

Package::new("dep", "1.0.0").publish();
let p = project()
.file(
"Cargo.toml",
r#"
[package]
name = "foo"
version = "0.1.0"

[dependencies]
dep = "1.0"
"#,
)
.file("src/lib.rs", "")
.build();

// Spawn cargo, wait for the first rustc to start, and then close stderr.
let mut cmd = process(&cargo_exe())
.arg("check")
.cwd(p.root())
.env("RUSTC", rustc_wrapper)
.build_command();
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let mut child = cmd.spawn().expect("cargo should spawn");
// Wait for the rustc wrapper to start.
let rustc_conn = listener.accept().unwrap().0;
// Close stderr to force an error.
drop(child.stderr.take());
// Tell the wrapper to continue.
drop(rustc_conn);
match child.wait() {
Ok(status) => assert!(!status.success()),
Err(e) => panic!("child wait failed: {}", e),
}
}

use cargo_test_support::registry::Dependency;

#[cargo_test]
Expand Down