Skip to content

Commit

Permalink
proc_macro: use crossbeam channels for the proc_macro cross-thread br…
Browse files Browse the repository at this point in the history
…idge

This is done by having the crossbeam dependency inserted into the
proc_macro server code from the server side, to avoid adding a
dependency to proc_macro.

In addition, this introduces a -Z command-line option which will switch
rustc to run proc-macros using this cross-thread executor. With the
changes to the bridge in #98186, #98187, #98188 and #98189, the
performance of the executor should be much closer to same-thread
execution.

In local testing, the crossbeam executor was substantially more
performant than either of the two existing CrossThread strategies, so
they have been removed to keep things simple.
  • Loading branch information
mystor committed Jul 29, 2022
1 parent 2f847b8 commit 6d1650f
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 80 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3885,6 +3885,7 @@ dependencies = [
name = "rustc_expand"
version = "0.0.0"
dependencies = [
"crossbeam-channel",
"rustc_ast",
"rustc_ast_passes",
"rustc_ast_pretty",
Expand Down
1 change: 1 addition & 0 deletions compiler/rustc_expand/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ rustc_parse = { path = "../rustc_parse" }
rustc_session = { path = "../rustc_session" }
smallvec = { version = "1.8.1", features = ["union", "may_dangle"] }
rustc_ast = { path = "../rustc_ast" }
crossbeam-channel = "0.5.0"
44 changes: 37 additions & 7 deletions compiler/rustc_expand/src/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,37 @@ use rustc_ast::tokenstream::{TokenStream, TokenTree};
use rustc_data_structures::sync::Lrc;
use rustc_errors::ErrorGuaranteed;
use rustc_parse::parser::ForceCollect;
use rustc_session::config::ProcMacroExecutionStrategy;
use rustc_span::profiling::SpannedEventArgRecorder;
use rustc_span::{Span, DUMMY_SP};

const EXEC_STRATEGY: pm::bridge::server::SameThread = pm::bridge::server::SameThread;
struct CrossbeamMessagePipe<T> {
tx: crossbeam_channel::Sender<T>,
rx: crossbeam_channel::Receiver<T>,
}

impl<T> pm::bridge::server::MessagePipe<T> for CrossbeamMessagePipe<T> {
fn new() -> (Self, Self) {
let (tx1, rx1) = crossbeam_channel::bounded(1);
let (tx2, rx2) = crossbeam_channel::bounded(1);
(CrossbeamMessagePipe { tx: tx1, rx: rx2 }, CrossbeamMessagePipe { tx: tx2, rx: rx1 })
}

fn send(&mut self, value: T) {
self.tx.send(value).unwrap();
}

fn recv(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}

fn exec_strategy(ecx: &ExtCtxt<'_>) -> impl pm::bridge::server::ExecutionStrategy {
pm::bridge::server::MaybeCrossThread::<CrossbeamMessagePipe<_>>::new(
ecx.sess.opts.unstable_opts.proc_macro_execution_strategy
== ProcMacroExecutionStrategy::CrossThread,
)
}

pub struct BangProcMacro {
pub client: pm::bridge::client::Client<pm::TokenStream, pm::TokenStream>,
Expand All @@ -30,8 +57,9 @@ impl base::BangProcMacro for BangProcMacro {
});

let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace).map_err(|e| {
self.client.run(&strategy, server, input, proc_macro_backtrace).map_err(|e| {
let mut err = ecx.struct_span_err(span, "proc macro panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
Expand Down Expand Up @@ -59,16 +87,17 @@ impl base::AttrProcMacro for AttrProcMacro {
});

let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client
.run(&EXEC_STRATEGY, server, annotation, annotated, proc_macro_backtrace)
.map_err(|e| {
self.client.run(&strategy, server, annotation, annotated, proc_macro_backtrace).map_err(
|e| {
let mut err = ecx.struct_span_err(span, "custom attribute panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
}
err.emit()
})
},
)
}
}

Expand Down Expand Up @@ -105,8 +134,9 @@ impl MultiItemModifier for DeriveProcMacro {
recorder.record_arg_with_span(ecx.expansion_descr(), span);
});
let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
match self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace) {
match self.client.run(&strategy, server, input, proc_macro_backtrace) {
Ok(stream) => stream,
Err(e) => {
let mut err = ecx.struct_span_err(span, "proc-macro derive panicked");
Expand Down
3 changes: 2 additions & 1 deletion compiler/rustc_interface/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rustc_session::config::{
};
use rustc_session::config::{
BranchProtection, Externs, OomStrategy, OutputType, OutputTypes, PAuthKey, PacRet,
SymbolManglingVersion, WasiExecModel,
ProcMacroExecutionStrategy, SymbolManglingVersion, WasiExecModel,
};
use rustc_session::config::{CFGuard, ExternEntry, LinkerPluginLto, LtoCli, SwitchWithOptPath};
use rustc_session::lint::Level;
Expand Down Expand Up @@ -685,6 +685,7 @@ fn test_unstable_options_tracking_hash() {
untracked!(print_mono_items, Some(String::from("abc")));
untracked!(print_type_sizes, true);
untracked!(proc_macro_backtrace, true);
untracked!(proc_macro_execution_strategy, ProcMacroExecutionStrategy::CrossThread);
untracked!(query_dep_graph, true);
untracked!(save_analysis, true);
untracked!(self_profile, SwitchWithOptPath::Enabled(None));
Expand Down
10 changes: 10 additions & 0 deletions compiler/rustc_session/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2959,3 +2959,13 @@ impl OomStrategy {
}
}
}

/// How to run proc-macro code when building this crate
#[derive(Clone, Copy, PartialEq, Hash, Debug)]
pub enum ProcMacroExecutionStrategy {
/// Run the proc-macro code on the same thread as the server.
SameThread,

/// Run the proc-macro code on a different thread.
CrossThread,
}
17 changes: 17 additions & 0 deletions compiler/rustc_session/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ mod desc {
"one of (`none` (default), `basic`, `strong`, or `all`)";
pub const parse_branch_protection: &str =
"a `,` separated combination of `bti`, `b-key`, `pac-ret`, or `leaf`";
pub const parse_proc_macro_execution_strategy: &str =
"one of supported execution strategies (`same-thread`, or `cross-thread`)";
}

mod parse {
Expand Down Expand Up @@ -1062,6 +1064,18 @@ mod parse {
}
true
}

pub(crate) fn parse_proc_macro_execution_strategy(
slot: &mut ProcMacroExecutionStrategy,
v: Option<&str>,
) -> bool {
*slot = match v {
Some("same-thread") => ProcMacroExecutionStrategy::SameThread,
Some("cross-thread") => ProcMacroExecutionStrategy::CrossThread,
_ => return false,
};
true
}
}

options! {
Expand Down Expand Up @@ -1457,6 +1471,9 @@ options! {
"print layout information for each type encountered (default: no)"),
proc_macro_backtrace: bool = (false, parse_bool, [UNTRACKED],
"show backtraces for panics during proc-macro execution (default: no)"),
proc_macro_execution_strategy: ProcMacroExecutionStrategy = (ProcMacroExecutionStrategy::SameThread,
parse_proc_macro_execution_strategy, [UNTRACKED],
"how to run proc-macro code (default: same-thread)"),
profile: bool = (false, parse_bool, [TRACKED],
"insert profiling code (default: no)"),
profile_closures: bool = (false, parse_no_flag, [UNTRACKED],
Expand Down
136 changes: 64 additions & 72 deletions library/proc_macro/src/bridge/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use super::*;

use std::marker::PhantomData;

// FIXME(eddyb) generate the definition of `HandleStore` in `server.rs`.
use super::client::HandleStore;

Expand Down Expand Up @@ -143,6 +145,41 @@ pub trait ExecutionStrategy {
) -> Buffer;
}

pub struct MaybeCrossThread<P> {
cross_thread: bool,
marker: PhantomData<P>,
}

impl<P> MaybeCrossThread<P> {
pub const fn new(cross_thread: bool) -> Self {
MaybeCrossThread { cross_thread, marker: PhantomData }
}
}

impl<P> ExecutionStrategy for MaybeCrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
if self.cross_thread {
<CrossThread<P>>::new().run_bridge_and_client(
dispatcher,
input,
run_client,
force_show_panics,
)
} else {
SameThread.run_bridge_and_client(dispatcher, input, run_client, force_show_panics)
}
}
}

pub struct SameThread;

impl ExecutionStrategy for SameThread {
Expand All @@ -164,28 +201,31 @@ impl ExecutionStrategy for SameThread {
}
}

// NOTE(eddyb) Two implementations are provided, the second one is a bit
// faster but neither is anywhere near as fast as same-thread execution.
pub struct CrossThread<P>(PhantomData<P>);

pub struct CrossThread1;
impl<P> CrossThread<P> {
pub const fn new() -> Self {
CrossThread(PhantomData)
}
}

impl ExecutionStrategy for CrossThread1 {
impl<P> ExecutionStrategy for CrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::mpsc::channel;

let (req_tx, req_rx) = channel();
let (res_tx, res_rx) = channel();
let (mut server, mut client) = P::new();

let join_handle = thread::spawn(move || {
let mut dispatch = |buf| {
req_tx.send(buf).unwrap();
res_rx.recv().unwrap()
let mut dispatch = |b: Buffer| -> Buffer {
client.send(b);
client.recv().expect("server died while client waiting for reply")
};

run_client(BridgeConfig {
Expand All @@ -196,75 +236,27 @@ impl ExecutionStrategy for CrossThread1 {
})
});

for b in req_rx {
res_tx.send(dispatcher.dispatch(b)).unwrap();
while let Some(b) = server.recv() {
server.send(dispatcher.dispatch(b));
}

join_handle.join().unwrap()
}
}

pub struct CrossThread2;
/// A message pipe used for communicating between server and client threads.
pub trait MessagePipe<T>: Sized {
/// Create a new pair of endpoints for the message pipe.
fn new() -> (Self, Self);

impl ExecutionStrategy for CrossThread2 {
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::{Arc, Mutex};

enum State<T> {
Req(T),
Res(T),
}

let mut state = Arc::new(Mutex::new(State::Res(Buffer::new())));

let server_thread = thread::current();
let state2 = state.clone();
let join_handle = thread::spawn(move || {
let mut dispatch = |b| {
*state2.lock().unwrap() = State::Req(b);
server_thread.unpark();
loop {
thread::park();
if let State::Res(b) = &mut *state2.lock().unwrap() {
break b.take();
}
}
};

let r = run_client(BridgeConfig {
input,
dispatch: (&mut dispatch).into(),
force_show_panics,
_marker: marker::PhantomData,
});

// Wake up the server so it can exit the dispatch loop.
drop(state2);
server_thread.unpark();

r
});

// Check whether `state2` was dropped, to know when to stop.
while Arc::get_mut(&mut state).is_none() {
thread::park();
let mut b = match &mut *state.lock().unwrap() {
State::Req(b) => b.take(),
_ => continue,
};
b = dispatcher.dispatch(b.take());
*state.lock().unwrap() = State::Res(b);
join_handle.thread().unpark();
}
/// Send a message to the other endpoint of this pipe.
fn send(&mut self, value: T);

join_handle.join().unwrap()
}
/// Receive a message from the other endpoint of this pipe.
///
/// Returns `None` if the other end of the pipe has been destroyed, and no
/// message was received.
fn recv(&mut self) -> Option<T>;
}

fn run_server<
Expand Down
1 change: 1 addition & 0 deletions src/test/rustdoc-ui/z-help.stdout
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
-Z print-mono-items=val -- print the result of the monomorphization collection pass
-Z print-type-sizes=val -- print layout information for each type encountered (default: no)
-Z proc-macro-backtrace=val -- show backtraces for panics during proc-macro execution (default: no)
-Z proc-macro-execution-strategy=val -- how to run proc-macro code (default: same-thread)
-Z profile=val -- insert profiling code (default: no)
-Z profile-closures=val -- profile size of closures
-Z profile-emit=val -- file path to emit profiling data at runtime when using 'profile' (default based on relative source path)
Expand Down

0 comments on commit 6d1650f

Please sign in to comment.