From 9a3913952e39494960f7594a733c90ea2ec0b1ac Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 16 Aug 2023 13:04:33 -0700 Subject: [PATCH 1/6] remove use of rc for task definitions --- crates/turborepo-lib/src/engine/builder.rs | 15 ++++++--------- crates/turborepo-lib/src/engine/mod.rs | 11 ++++------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/crates/turborepo-lib/src/engine/builder.rs b/crates/turborepo-lib/src/engine/builder.rs index 02437f91ccff3..3ff072a5f8495 100644 --- a/crates/turborepo-lib/src/engine/builder.rs +++ b/crates/turborepo-lib/src/engine/builder.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet, VecDeque}, - rc::Rc, -}; +use std::collections::{HashMap, HashSet, VecDeque}; use itertools::Itertools; use turbopath::AbsoluteSystemPath; @@ -163,11 +160,11 @@ impl<'a> EngineBuilder<'a> { task_id: task_id.to_string(), }); } - let task_definition = Rc::new(TaskDefinition::from_iter(self.task_definition_chain( + let task_definition = TaskDefinition::from_iter(self.task_definition_chain( &mut turbo_jsons, &task_id, &task_id.as_non_workspace_task_name(), - )?)); + )?); // Skip this iteration of the loop if we've already seen this taskID if visited.contains(&task_id) { @@ -176,8 +173,6 @@ impl<'a> EngineBuilder<'a> { visited.insert(task_id.clone()); - engine.add_definition(task_id.clone().into_owned(), task_definition.clone()); - // Note that the Go code has a whole if/else statement for putting stuff into // deps or calling e.AddDep the bool is cannot be true so we skip to // just doing deps @@ -197,7 +192,7 @@ impl<'a> EngineBuilder<'a> { // Don't ask why, but for some reason we refer to the source as "to" // and the target node as "from" - let to_task_id = task_id.into_owned(); + let to_task_id = task_id.clone().into_owned(); let to_task_index = engine.get_index(&to_task_id); let dep_pkgs = self @@ -236,6 +231,8 @@ impl<'a> EngineBuilder<'a> { traversal_queue.push_back(from_task_id); } + engine.add_definition(task_id.clone().into_owned(), task_definition); + if !has_deps && !has_topo_deps { engine.connect_to_root(&to_task_id); } diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index 26b8fa4c7eb2f..8476b6c67abee 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -1,9 +1,6 @@ mod builder; -use std::{ - collections::{HashMap, HashSet}, - rc::Rc, -}; +use std::collections::{HashMap, HashSet}; pub use builder::EngineBuilder; use petgraph::Graph; @@ -37,7 +34,7 @@ pub struct Engine { task_graph: Graph, root_index: petgraph::graph::NodeIndex, task_lookup: HashMap, petgraph::graph::NodeIndex>, - task_definitions: HashMap, Rc>, + task_definitions: HashMap, TaskDefinition>, } impl Engine { @@ -69,8 +66,8 @@ impl Engine { pub fn add_definition( &mut self, task_id: TaskId<'static>, - definition: Rc, - ) -> Option> { + definition: TaskDefinition, + ) -> Option { self.task_definitions.insert(task_id, definition) } From d164ed7d5a3242b201e4d866b4783aa3704497b7 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 18 Aug 2023 09:15:57 -0700 Subject: [PATCH 2/6] add task graph engine execution --- crates/turborepo-lib/src/engine/execute.rs | 154 +++++++++++++++++++++ crates/turborepo-lib/src/engine/mod.rs | 1 + 2 files changed, 155 insertions(+) create mode 100644 crates/turborepo-lib/src/engine/execute.rs diff --git a/crates/turborepo-lib/src/engine/execute.rs b/crates/turborepo-lib/src/engine/execute.rs new file mode 100644 index 0000000000000..9d22b814af7ad --- /dev/null +++ b/crates/turborepo-lib/src/engine/execute.rs @@ -0,0 +1,154 @@ +use std::sync::{Arc, Mutex}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use tokio::sync::{mpsc, oneshot, Semaphore}; +use tracing::log::debug; + +use super::{Engine, TaskNode}; +use crate::{graph::Walker, run::task_id::TaskId}; + +pub struct Message { + pub info: T, + pub callback: oneshot::Sender, +} + +// Type alias used just to make altering the data sent to the visitor easier in +// the future +type VisitorData = TaskId<'static>; +type VisitorResult = Result<(), StopExecution>; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ExecutionOptions { + parallel: bool, + concurrency: usize, +} + +impl ExecutionOptions { + pub fn new(parallel: bool, concurrency: usize) -> Self { + Self { + parallel, + concurrency, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ExecuteError { + #[error("Semaphore closed before all tasks finished")] + Semaphore(#[from] tokio::sync::AcquireError), + #[error("Engine visitor closed channel before walk finished")] + Visitor, +} + +impl From>> for ExecuteError { + fn from( + _: mpsc::error::SendError, Result<(), StopExecution>>>, + ) -> Self { + ExecuteError::Visitor + } +} + +#[derive(Debug, Clone, Copy)] +pub struct StopExecution; + +impl Engine { + /// Execute a task graph by sending task ids to the visitor + /// while respecting concurrency limits. + /// The visitor is expected to handle any error handling on it's end. + /// We enforce this by only allowing the returning of a sentinel error + /// type which will stop any further execution of tasks. + /// This will not stop any task which is currently running, simply it will + /// stop scheduling new tasks. + // (olszewski) The current impl requires that the visitor receiver is read until + // finish even once a task sends back the stop signal. This is suboptimal + // since it would mean the visitor would need to also track if + // it is cancelled :) + pub async fn execute( + self: Arc, + options: ExecutionOptions, + visitor: mpsc::Sender>, + ) -> Result<(), ExecuteError> { + let ExecutionOptions { + parallel, + concurrency, + } = options; + let sema = Arc::new(Semaphore::new(concurrency)); + let mut tasks: FuturesUnordered>> = + FuturesUnordered::new(); + + let (walker, mut nodes) = Walker::new(&self.task_graph).walk(); + let walker = Arc::new(Mutex::new(walker)); + + while let Some((node_id, done)) = nodes.recv().await { + let visitor = visitor.clone(); + let sema = sema.clone(); + let walker = walker.clone(); + let this = self.clone(); + + tasks.push(tokio::spawn(async move { + let TaskNode::Task(task_id) = this + .task_graph + .node_weight(node_id) + .expect("node id should be present") + else { + // Root task has nothing to do so we don't emit any event for it + if done.send(()).is_err() { + debug!( + "Graph walker done callback receiver was closed before done signal \ + could be sent" + ); + } + return Ok(()); + }; + + // Acquire the semaphore unless parallel + let _permit = match parallel { + false => Some(sema.acquire().await.expect( + "Graph concurrency semaphore closed while tasks are still attempting to \ + acquire permits", + )), + true => None, + }; + + let (message, result) = Message::new(task_id.clone()); + visitor.send(message).await?; + + if let Err(StopExecution) = result.await.unwrap_or_else(|_| { + // If the visitor doesn't send a callback, then we assume the task finished + debug!("Engine visitor dropped callback sender without sending result"); + Ok(()) + }) { + if walker + .lock() + .expect("Walker mutex poisoned") + .cancel() + .is_err() + { + debug!("Unable to cancel graph walk"); + } + } + if done.send(()).is_err() { + debug!("Graph walk done receiver closed before node was finished processing"); + } + Ok(()) + })); + } + + while let Some(res) = tasks.next().await { + res.expect("unable to join task")?; + } + + Ok(()) + } +} + +// the visitor fn +// Visitor is just a channel that we send task info into along w/ callback +// oneshot + +impl Message { + pub fn new(info: T) -> (Self, oneshot::Receiver) { + let (callback, receiver) = oneshot::channel(); + (Self { info, callback }, receiver) + } +} diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index 8476b6c67abee..caed9a4be5949 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -1,4 +1,5 @@ mod builder; +mod execute; use std::collections::{HashMap, HashSet}; From 688bd5b1206a05d97fe09582ec8226a63203ef8d Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 22 Aug 2023 14:09:23 -0700 Subject: [PATCH 3/6] add package task visitor --- crates/turborepo-lib/src/engine/mod.rs | 7 ++ crates/turborepo-lib/src/task_graph/mod.rs | 2 + .../turborepo-lib/src/task_graph/visitor.rs | 107 ++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 crates/turborepo-lib/src/task_graph/visitor.rs diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index caed9a4be5949..73b6b4e12194c 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -4,6 +4,7 @@ mod execute; use std::collections::{HashMap, HashSet}; pub use builder::EngineBuilder; +pub use execute::{ExecutionOptions, Message}; use petgraph::Graph; use crate::{ @@ -113,6 +114,12 @@ impl Engine { ) } + // TODO get rid of static lifetime and figure out right way to tell compiler the + // lifetime of the return ref + pub fn task_definition(&self, task_id: &TaskId<'static>) -> Option<&TaskDefinition> { + self.task_definitions.get(task_id) + } + pub fn validate( &self, package_graph: &PackageGraph, diff --git a/crates/turborepo-lib/src/task_graph/mod.rs b/crates/turborepo-lib/src/task_graph/mod.rs index 1be1725aa6e16..53bd1b689880a 100644 --- a/crates/turborepo-lib/src/task_graph/mod.rs +++ b/crates/turborepo-lib/src/task_graph/mod.rs @@ -1,3 +1,5 @@ +mod visitor; + use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs new file mode 100644 index 0000000000000..873cf75d144c8 --- /dev/null +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -0,0 +1,107 @@ +use std::sync::{Arc, OnceLock}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use regex::Regex; +use tokio::sync::mpsc; + +use crate::{ + cli::EnvMode, + engine::{Engine, ExecutionOptions}, + opts::Opts, + package_graph::{PackageGraph, WorkspaceName}, + run::task_id::{self, TaskId}, +}; + +// This holds the whole world +pub struct Visitor<'a> { + package_graph: Arc, + opts: &'a Opts<'a>, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("cannot find package {package_name} for task {task_id}")] + MissingPackage { + package_name: WorkspaceName, + task_id: TaskId<'static>, + }, + #[error( + "root task {task_name} ({command}) looks like it invokes turbo and might cause a loop" + )] + RecursiveTurbo { task_name: String, command: String }, + #[error("Could not find definition for task")] + MissingDefinition, +} + +impl<'a> Visitor<'a> { + pub fn new(package_graph: Arc, opts: &'a Opts) -> Self { + Self { + package_graph, + opts, + } + } + + pub async fn visit(&self, engine: Arc) -> Result<(), Error> { + let concurrency = self.opts.run_opts.concurrency as usize; + let (node_sender, mut node_stream) = mpsc::channel(concurrency); + + let engine_handle = { + let engine = engine.clone(); + tokio::spawn(engine.execute(ExecutionOptions::new(false, concurrency), node_sender)) + }; + + let mut tasks = FuturesUnordered::new(); + + while let Some(message) = node_stream.recv().await { + let crate::engine::Message { info, callback } = message; + let package_name = WorkspaceName::from(info.package()); + let package_json = self + .package_graph + .package_json(&package_name) + .ok_or_else(|| Error::MissingPackage { + package_name: package_name.clone(), + task_id: info.clone(), + })?; + + let command = package_json.scripts.get(info.task()).cloned(); + + match command { + Some(cmd) + if info.package() == task_id::ROOT_PKG_NAME && turbo_regex().is_match(&cmd) => + { + return Err(Error::RecursiveTurbo { + task_name: info.to_string(), + command: cmd.to_string(), + }) + } + _ => (), + } + + let task_def = engine + .task_definition(&info) + .ok_or(Error::MissingDefinition)?; + + tasks.push(tokio::spawn(async move { + println!( + "Executing {info}: {}", + command.as_deref().unwrap_or("no script def") + ); + callback.send(Ok(())).unwrap(); + })); + } + + // Wait for the engine task to finish and for all of our tasks to finish + let engine_result = engine_handle.await.expect("engine execution panicked"); + // This will poll the futures until they are all completed + while let Some(result) = tasks.next().await { + result.expect("task executor panicked"); + } + + Ok(()) + } +} + +fn turbo_regex() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"(?:^|\s)turbo(?:$|\s)").unwrap()) +} From 9db304414d340ab9ea4621233ab0f22d3e1c26c7 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 22 Aug 2023 14:12:25 -0700 Subject: [PATCH 4/6] surface types --- crates/turborepo-lib/src/engine/mod.rs | 2 +- crates/turborepo-lib/src/task_graph/mod.rs | 1 + crates/turborepo-lib/src/task_graph/visitor.rs | 7 ++++--- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index 73b6b4e12194c..8b104e3edc004 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -4,7 +4,7 @@ mod execute; use std::collections::{HashMap, HashSet}; pub use builder::EngineBuilder; -pub use execute::{ExecutionOptions, Message}; +pub use execute::{ExecuteError, ExecutionOptions, Message}; use petgraph::Graph; use crate::{ diff --git a/crates/turborepo-lib/src/task_graph/mod.rs b/crates/turborepo-lib/src/task_graph/mod.rs index 53bd1b689880a..bb6a747b08bef 100644 --- a/crates/turborepo-lib/src/task_graph/mod.rs +++ b/crates/turborepo-lib/src/task_graph/mod.rs @@ -4,6 +4,7 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; use turbopath::RelativeUnixPathBuf; +pub use visitor::{Error, Visitor}; use crate::{ cli::OutputLogsMode, diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index 873cf75d144c8..a47a64f4733a8 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -5,7 +5,6 @@ use regex::Regex; use tokio::sync::mpsc; use crate::{ - cli::EnvMode, engine::{Engine, ExecutionOptions}, opts::Opts, package_graph::{PackageGraph, WorkspaceName}, @@ -31,6 +30,8 @@ pub enum Error { RecursiveTurbo { task_name: String, command: String }, #[error("Could not find definition for task")] MissingDefinition, + #[error("error while executing engine: {0}")] + Engine(#[from] crate::engine::ExecuteError), } impl<'a> Visitor<'a> { @@ -77,7 +78,7 @@ impl<'a> Visitor<'a> { _ => (), } - let task_def = engine + let _task_def = engine .task_definition(&info) .ok_or(Error::MissingDefinition)?; @@ -91,7 +92,7 @@ impl<'a> Visitor<'a> { } // Wait for the engine task to finish and for all of our tasks to finish - let engine_result = engine_handle.await.expect("engine execution panicked"); + engine_handle.await.expect("engine execution panicked")?; // This will poll the futures until they are all completed while let Some(result) = tasks.next().await { result.expect("task executor panicked"); From ccdce01a2884a141ba5e60dca0c2c50db1b031bb Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 23 Aug 2023 10:02:30 -0700 Subject: [PATCH 5/6] hook up noop visitor --- crates/turborepo-lib/src/run/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 80612e966810e..db548e76a8e0c 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -5,7 +5,10 @@ mod global_hash; mod scope; pub mod task_id; -use std::io::{BufWriter, IsTerminal}; +use std::{ + io::{BufWriter, IsTerminal}, + sync::Arc, +}; use anyhow::{anyhow, Context as ErrorContext, Result}; use itertools::Itertools; @@ -27,6 +30,7 @@ use crate::{ package_graph::{PackageGraph, WorkspaceName}, package_json::PackageJson, run::{cache::RunCache, global_hash::get_global_hash_inputs}, + task_graph::Visitor, }; #[derive(Debug)] @@ -206,6 +210,11 @@ impl Run { self.base.ui, ); + let pkg_dep_graph = Arc::new(pkg_dep_graph); + let engine = Arc::new(engine); + let visitor = Visitor::new(pkg_dep_graph, &opts); + visitor.visit(engine).await?; + Ok(()) } } From 457a471ebad92d6b502ceb7d3ef99b2ee67ad214 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 23 Aug 2023 11:42:59 -0700 Subject: [PATCH 6/6] remove old comments --- crates/turborepo-lib/src/engine/execute.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/turborepo-lib/src/engine/execute.rs b/crates/turborepo-lib/src/engine/execute.rs index 9d22b814af7ad..57d40e18972f0 100644 --- a/crates/turborepo-lib/src/engine/execute.rs +++ b/crates/turborepo-lib/src/engine/execute.rs @@ -142,10 +142,6 @@ impl Engine { } } -// the visitor fn -// Visitor is just a channel that we send task info into along w/ callback -// oneshot - impl Message { pub fn new(info: T) -> (Self, oneshot::Receiver) { let (callback, receiver) = oneshot::channel();