diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 5a8c706e32cd..33e198d6d588 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -802,6 +802,7 @@ impl DataFrame { /// Executes this DataFrame and returns a stream over a single partition /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -813,6 +814,11 @@ impl DataFrame { /// # Ok(()) /// # } /// ``` + /// + /// # Aborting Execution + /// + /// Dropping the stream will abort the execution of the query, and free up + /// any allocated resources pub async fn execute_stream(self) -> Result { let task_ctx = Arc::new(self.task_ctx()); let plan = self.create_physical_plan().await?; @@ -841,6 +847,7 @@ impl DataFrame { /// Executes this DataFrame and returns one stream per partition. /// + /// # Example /// ``` /// # use datafusion::prelude::*; /// # use datafusion::error::Result; @@ -852,6 +859,10 @@ impl DataFrame { /// # Ok(()) /// # } /// ``` + /// # Aborting Execution + /// + /// Dropping the stream will abort the execution of the query, and free up + /// any allocated resources pub async fn execute_stream_partitioned( self, ) -> Result> { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6c9e97e03cb7..1dd1392b9d86 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -288,6 +288,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// [`TryStreamExt`]: futures::stream::TryStreamExt /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter /// + /// # Cancellation / Aborting Execution + /// + /// The [`Stream`] that is returned must ensure that any allocated resources + /// are freed when the stream itself is dropped. This is particularly + /// important for [`spawn`]ed tasks or threads. Unless care is taken to + /// "abort" such tasks, they may continue to consume resources even after + /// the plan is dropped, generating intermediate results that are never + /// used. + /// + /// See [`AbortOnDropSingle`], [`AbortOnDropMany`] and + /// [`RecordBatchReceiverStreamBuilder`] for structures to help ensure all + /// background tasks are cancelled. + /// + /// [`spawn`]: tokio::task::spawn + /// [`AbortOnDropSingle`]: crate::common::AbortOnDropSingle + /// [`AbortOnDropMany`]: crate::common::AbortOnDropMany + /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder + /// /// # Implementation Examples /// /// While `async` `Stream`s have a non trivial learning curve, the @@ -491,7 +509,12 @@ pub async fn collect( common::collect(stream).await } -/// Execute the [ExecutionPlan] and return a single stream of results +/// Execute the [ExecutionPlan] and return a single stream of results. +/// +/// # Aborting Execution +/// +/// Dropping the stream will abort the execution of the query, and free up +/// any allocated resources pub fn execute_stream( plan: Arc, context: Arc, @@ -549,7 +572,13 @@ pub async fn collect_partitioned( Ok(batches) } -/// Execute the [ExecutionPlan] and return a vec with one stream per output partition +/// Execute the [ExecutionPlan] and return a vec with one stream per output +/// partition +/// +/// # Aborting Execution +/// +/// Dropping the stream will abort the execution of the query, and free up +/// any allocated resources pub fn execute_stream_partitioned( plan: Arc, context: Arc,