Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): implement time slicing for shared executor scheduling across queries #14770

Merged
merged 7 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 166 additions & 17 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -36,11 +37,14 @@ use petgraph::prelude::NodeIndex;
use petgraph::prelude::StableGraph;
use petgraph::Direction;

use crate::pipelines::executor::processor_async_task::ExecutorTasksQueue;
use crate::pipelines::executor::ExecutorTask;
use crate::pipelines::executor::ExecutorTasksQueue;
use crate::pipelines::executor::ExecutorWorkerContext;
use crate::pipelines::executor::PipelineExecutor;
use crate::pipelines::executor::ProcessorAsyncTask;
use crate::pipelines::executor::QueriesExecutorTasksQueue;
use crate::pipelines::executor::QueriesPipelineExecutor;
use crate::pipelines::executor::QueryExecutorTasksQueue;
use crate::pipelines::executor::QueryPipelineExecutor;
use crate::pipelines::executor::WorkersCondvar;
use crate::pipelines::processors::connect;
use crate::pipelines::processors::DirectedEdge;
Expand Down Expand Up @@ -115,24 +119,36 @@ impl Node {
}
}

const POINTS_MASK: u64 = 0xFFFFFFFF00000000;
const EPOCH_MASK: u64 = 0x00000000FFFFFFFF;

// TODO: Replace with a variable, not a const value
const MAX_POINTS: u64 = 3;

struct ExecutingGraph {
finished_nodes: AtomicUsize,
graph: StableGraph<Arc<Node>, EdgeInfo>,
/// points store two values
///
/// - the high 32 bit store the number of points that can be consumed
/// - the low 32 bit store this points belong to which epoch
points: AtomicU64,
}

type StateLockGuard = ExecutingGraph;

impl ExecutingGraph {
pub fn create(mut pipeline: Pipeline) -> Result<ExecutingGraph> {
pub fn create(mut pipeline: Pipeline, init_epoch: u32) -> Result<ExecutingGraph> {
let mut graph = StableGraph::new();
Self::init_graph(&mut pipeline, &mut graph);
Ok(ExecutingGraph {
graph,
finished_nodes: AtomicUsize::new(0),
points: AtomicU64::new((MAX_POINTS << 32) | init_epoch as u64),
})
}

pub fn from_pipelines(mut pipelines: Vec<Pipeline>) -> Result<ExecutingGraph> {
pub fn from_pipelines(mut pipelines: Vec<Pipeline>, init_epoch: u32) -> Result<ExecutingGraph> {
let mut graph = StableGraph::new();

for pipeline in &mut pipelines {
Expand All @@ -142,6 +158,7 @@ impl ExecutingGraph {
Ok(ExecutingGraph {
finished_nodes: AtomicUsize::new(0),
graph,
points: AtomicU64::new((MAX_POINTS << 32) | init_epoch as u64),
})
}

Expand Down Expand Up @@ -341,6 +358,37 @@ impl ExecutingGraph {

Ok(())
}

/// Checks if a task can be performed in the current epoch, consuming a point if possible.
pub fn can_perform_task(&self, global_epoch: u32, max_points: u64) -> bool {
let mut expected_value = 0;
let mut desired_value = 0;
loop {
match self.points.compare_exchange_weak(
expected_value,
desired_value,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(old_value) => {
return (old_value & EPOCH_MASK) as u32 == global_epoch;
}
Err(new_expected) => {
let remain_points = (new_expected & POINTS_MASK) >> 32;
let epoch = new_expected & EPOCH_MASK;

expected_value = new_expected;
if epoch != global_epoch as u64 {
desired_value = new_expected;
} else if remain_points >= 1 {
desired_value = (remain_points - 1) << 32 | epoch;
} else {
desired_value = max_points << 32 | (epoch + 1);
}
}
}
}
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -374,9 +422,9 @@ impl ScheduleQueue {

pub fn schedule(
mut self,
global: &Arc<ExecutorTasksQueue>,
global: &Arc<QueryExecutorTasksQueue>,
context: &mut ExecutorWorkerContext,
executor: &Arc<PipelineExecutor>,
executor: &Arc<QueryPipelineExecutor>,
) {
debug_assert!(!context.has_task());

Expand All @@ -403,14 +451,13 @@ impl ScheduleQueue {
pub fn schedule_async_task(
proc: ProcessorWrapper,
query_id: Arc<String>,
executor: &Arc<PipelineExecutor>,
executor: &Arc<QueryPipelineExecutor>,
wakeup_worker_id: usize,
workers_condvar: Arc<WorkersCondvar>,
global_queue: Arc<ExecutorTasksQueue>,
global_queue: Arc<QueryExecutorTasksQueue>,
) {
unsafe {
workers_condvar.inc_active_async_worker();
let weak_executor = Arc::downgrade(executor);
let graph = proc.graph;
let node_profile = executor.graph.get_node_profile(proc.processor.id()).clone();
let process_future = proc.processor.async_process();
Expand All @@ -420,9 +467,8 @@ impl ScheduleQueue {
query_id,
wakeup_worker_id,
proc.processor.clone(),
global_queue,
Arc::new(ExecutorTasksQueue::QueryExecutorTasksQueue(global_queue)),
workers_condvar,
weak_executor,
node_profile,
graph,
process_future,
Expand All @@ -434,13 +480,17 @@ impl ScheduleQueue {
}
}

fn schedule_sync(&mut self, _: &ExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
fn schedule_sync(&mut self, _: &QueryExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
if let Some(processor) = self.sync_queue.pop_front() {
ctx.set_task(ExecutorTask::Sync(processor));
}
}

pub fn schedule_tail(mut self, global: &ExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
pub fn schedule_tail(
mut self,
global: &QueryExecutorTasksQueue,
ctx: &mut ExecutorWorkerContext,
) {
let mut tasks = VecDeque::with_capacity(self.sync_queue.len());

while let Some(processor) = self.sync_queue.pop_front() {
Expand All @@ -449,19 +499,113 @@ impl ScheduleQueue {

global.push_tasks(ctx, tasks)
}

pub fn schedule_with_condition(
mut self,
global: &Arc<QueriesExecutorTasksQueue>,
context: &mut ExecutorWorkerContext,
executor: &Arc<QueriesPipelineExecutor>,
) {
debug_assert!(!context.has_task());

while let Some(processor) = self.async_queue.pop_front() {
if processor
.graph
.can_perform_task(executor.epoch.load(Ordering::SeqCst), MAX_POINTS)
{
Self::schedule_async_task_with_condition(
processor,
context.query_id.clone(),
executor,
context.get_worker_id(),
context.get_workers_condvar().clone(),
global.clone(),
)
} else {
let mut tasks = VecDeque::with_capacity(1);
tasks.push_back(ExecutorTask::Async(processor));
global.push_tasks(context.get_worker_id(), None, tasks);
}
}

if !self.sync_queue.is_empty() {
while let Some(processor) = self.sync_queue.pop_front() {
if processor
.graph
.can_perform_task(executor.epoch.load(Ordering::SeqCst), MAX_POINTS)
{
context.set_task(ExecutorTask::Sync(processor));
break;
} else {
let mut tasks = VecDeque::with_capacity(1);
tasks.push_back(ExecutorTask::Sync(processor));
global.push_tasks(context.get_worker_id(), None, tasks);
}
}
}

if !self.sync_queue.is_empty() {
let mut current_tasks = VecDeque::with_capacity(self.sync_queue.len());
let mut next_tasks = VecDeque::with_capacity(self.sync_queue.len());
while let Some(processor) = self.sync_queue.pop_front() {
if processor
.graph
.can_perform_task(executor.epoch.load(Ordering::SeqCst), MAX_POINTS)
{
current_tasks.push_back(ExecutorTask::Sync(processor));
} else {
next_tasks.push_back(ExecutorTask::Sync(processor));
}
}
let worker_id = context.get_worker_id();
global.push_tasks(worker_id, Some(current_tasks), next_tasks);
}
}

pub fn schedule_async_task_with_condition(
proc: ProcessorWrapper,
query_id: Arc<String>,
executor: &Arc<QueriesPipelineExecutor>,
wakeup_worker_id: usize,
workers_condvar: Arc<WorkersCondvar>,
global_queue: Arc<QueriesExecutorTasksQueue>,
) {
unsafe {
workers_condvar.inc_active_async_worker();
let graph = proc.graph;
let node_profile = executor.graph.get_node_profile(proc.processor.id()).clone();
let process_future = proc.processor.async_process();
executor.async_runtime.spawn(
query_id.as_ref().clone(),
ProcessorAsyncTask::create(
query_id,
wakeup_worker_id,
proc.processor.clone(),
Arc::new(ExecutorTasksQueue::QueriesExecutorTasksQueue(global_queue)),
workers_condvar,
node_profile,
graph,
process_future,
)
.in_span(Span::enter_with_local_parent(std::any::type_name::<
ProcessorAsyncTask,
>())),
);
}
}
}

pub struct RunningGraph(ExecutingGraph);

impl RunningGraph {
pub fn create(pipeline: Pipeline) -> Result<Arc<RunningGraph>> {
let graph_state = ExecutingGraph::create(pipeline)?;
pub fn create(pipeline: Pipeline, init_epoch: u32) -> Result<Arc<RunningGraph>> {
let graph_state = ExecutingGraph::create(pipeline, init_epoch)?;
debug!("Create running graph:{:?}", graph_state);
Ok(Arc::new(RunningGraph(graph_state)))
}

pub fn from_pipelines(pipelines: Vec<Pipeline>) -> Result<Arc<RunningGraph>> {
let graph_state = ExecutingGraph::from_pipelines(pipelines)?;
pub fn from_pipelines(pipelines: Vec<Pipeline>, init_epoch: u32) -> Result<Arc<RunningGraph>> {
let graph_state = ExecutingGraph::from_pipelines(pipelines, init_epoch)?;
debug!("Create running graph:{:?}", graph_state);
Ok(Arc::new(RunningGraph(graph_state)))
}
Expand Down Expand Up @@ -514,6 +658,11 @@ impl RunningGraph {
}
}

/// Checks if a task can be performed in the current epoch, consuming a point if possible.
pub fn can_perform_task(&self, global_epoch: u32, max_points: u64) -> bool {
self.0.can_perform_task(global_epoch, max_points)
}

pub fn format_graph_nodes(&self) -> String {
pub struct NodeDisplay {
id: usize,
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/executor/executor_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_settings::Settings;

#[derive(Clone)]
pub struct ExecutorSettings {
pub enable_new_executor: bool,
pub query_id: Arc<String>,
pub max_execute_time_in_seconds: Duration,
}
Expand All @@ -28,6 +29,7 @@ impl ExecutorSettings {
pub fn try_create(settings: &Settings, query_id: String) -> Result<ExecutorSettings> {
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;
Ok(ExecutorSettings {
enable_new_executor: settings.get_enable_experimental_new_executor()?,
query_id: Arc::new(query_id),
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,39 @@ use databend_common_exception::Result;
use petgraph::prelude::NodeIndex;

use crate::pipelines::executor::executor_graph::ProcessorWrapper;
use crate::pipelines::executor::CompletedAsyncTask;
use crate::pipelines::executor::PipelineExecutor;
use crate::pipelines::executor::RunningGraph;
use crate::pipelines::executor::WorkersCondvar;

pub enum ExecutorTask {
None,
Sync(ProcessorWrapper),
Async(ProcessorWrapper),
AsyncCompleted(CompletedAsyncTask),
}

pub struct CompletedAsyncTask {
pub id: NodeIndex,
pub worker_id: usize,
pub res: Result<()>,
pub graph: Arc<RunningGraph>,
}

impl CompletedAsyncTask {
pub fn create(
id: NodeIndex,
worker_id: usize,
res: Result<()>,
graph: Arc<RunningGraph>,
) -> Self {
CompletedAsyncTask {
id,
worker_id,
res,
graph,
}
}
}

pub struct ExecutorWorkerContext {
pub query_id: Arc<String>,
worker_id: usize,
Expand Down Expand Up @@ -74,17 +96,15 @@ impl ExecutorWorkerContext {
}

/// # Safety
pub unsafe fn execute_task(
&mut self,
_: &Arc<PipelineExecutor>,
) -> Result<Option<(NodeIndex, Arc<RunningGraph>)>> {
pub unsafe fn execute_task(&mut self) -> Result<Option<(NodeIndex, Arc<RunningGraph>)>> {
match std::mem::replace(&mut self.task, ExecutorTask::None) {
ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")),
ExecutorTask::Sync(processor) => self.execute_sync_task(processor),
ExecutorTask::AsyncCompleted(task) => match task.res {
Ok(_) => Ok(Some((task.id, task.graph))),
Err(cause) => Err(cause),
},
ExecutorTask::Async(_) => unreachable!("used for new executor"),
}
}

Expand Down Expand Up @@ -121,6 +141,12 @@ impl Debug for ExecutorTask {
p.processor.id().index(),
p.processor.name()
),
ExecutorTask::Async(p) => write!(
f,
"ExecutorTask::Async {{ id: {}, name: {}}}",
p.processor.id().index(),
p.processor.name()
),
ExecutorTask::AsyncCompleted(_) => write!(f, "ExecutorTask::CompletedAsync"),
}
}
Expand Down
Loading
Loading