Skip to content

Commit

Permalink
Use unblock_lock to protect the task's unlock process
Browse files Browse the repository at this point in the history
  • Loading branch information
hky1999 committed Sep 19, 2024
1 parent 6e841eb commit 30e3f9d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 29 deletions.
23 changes: 4 additions & 19 deletions modules/axtask/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,7 @@ impl AxRunQueue {
/// Core functions of run queue.
impl AxRunQueue {
pub fn add_task(&mut self, task: AxTaskRef) {
debug!(
"task spawn: {} on run_queue {}",
task.id_name(),
self.cpu_id
);
debug!("Add {} on run_queue {}", task.id_name(), self.cpu_id);
assert!(task.is_ready());
self.scheduler.add_task(task);
}
Expand Down Expand Up @@ -280,31 +276,20 @@ impl AxRunQueue {

/// Unblock one task by inserting it into the run queue.
/// If task state is `BLOCKING`, it will enter a loop until the task is in `BLOCKED` state.
///
/// Note: this function should by called with preemption and IRQ disabled.
pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) {
// When task's state is Blocking, it has not finished its scheduling process.
if task.is_blocking() {
while task.is_blocking() {
// Wait for the task to finish its scheduling process.
core::hint::spin_loop();
}
assert!(task.is_blocked())
}

if task.is_blocked() {
task.clone().unblock_locked(|| {
let cpu_id = self.cpu_id;
debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id);
task.set_state(TaskState::Ready);
self.scheduler.add_task(task); // TODO: priority
self.scheduler.add_task(task.clone()); // TODO: priority

// Note: when the task is unblocked on another CPU's run queue,
// we just ingiore the `resched` flag.
if resched && cpu_id == this_cpu_id() {
#[cfg(feature = "preempt")]
crate::current().set_preempt_pending(true);
}
}
})
}

#[cfg(feature = "irq")]
Expand Down
30 changes: 30 additions & 0 deletions modules/axtask/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering};
use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull};

use bitmaps::Bitmap;
use kspin::SpinRaw;
use memory_addr::{align_up_4k, va, VirtAddr};

use axhal::arch::TaskContext;
Expand Down Expand Up @@ -54,6 +55,12 @@ pub struct TaskInner {
#[cfg(feature = "irq")]
in_timer_list: AtomicBool,

/// Used to protect the task from being unblocked by timer and `notify()` at the same time.
/// It is used in `unblock_task()`, which is called by wait queue's `notify()` and timer's callback.
/// Since preemption and irq are both disabled during `unblock_task()`, we can simply use a raw spin lock here.
#[cfg(feature = "irq")]
unblock_lock: SpinRaw<()>,

#[cfg(feature = "preempt")]
need_resched: AtomicBool,
#[cfg(feature = "preempt")]
Expand Down Expand Up @@ -138,6 +145,8 @@ impl TaskInner {
in_wait_queue: AtomicBool::new(false),
#[cfg(feature = "irq")]
in_timer_list: AtomicBool::new(false),
#[cfg(feature = "irq")]
unblock_lock: SpinRaw::new(()),
#[cfg(feature = "preempt")]
need_resched: AtomicBool::new(false),
#[cfg(feature = "preempt")]
Expand Down Expand Up @@ -288,6 +297,27 @@ impl TaskInner {
self.in_timer_list.store(in_timer_list, Ordering::Release);
}

pub(crate) fn unblock_locked<F>(&self, mut run_queue_push: F)
where
F: FnMut(),
{
// When task's state is Blocking, it has not finished its scheduling process.
if self.is_blocking() {
while self.is_blocking() {
// Wait for the task to finish its scheduling process.
core::hint::spin_loop();
}
assert!(self.is_blocked())
}

// When irq is enabled, use `unblock_lock` to protect the task from being unblocked by timer and `notify()` at the same time.
#[cfg(feature = "irq")]
let _lock = self.unblock_lock.lock();
if self.is_blocked() {
run_queue_push();
}
}

#[inline]
#[cfg(feature = "preempt")]
pub(crate) fn set_preempt_pending(&self, pending: bool) {
Expand Down
19 changes: 9 additions & 10 deletions modules/axtask/src/wait_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl WaitQueue {
assert!(curr.is_running());
assert!(!curr.is_idle());
// we must not block current task with preemption disabled.
// Current expected preempt count is 2.
// 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`.
#[cfg(feature = "preempt")]
assert!(curr.can_preempt(2));

Expand Down Expand Up @@ -119,6 +121,8 @@ impl WaitQueue {
debug!("{} push to wait queue on wait_until", curr.id_name());

// we must not block current task with preemption disabled.
// Current expected preempt count is 2.
// 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`.
#[cfg(feature = "preempt")]
assert!(curr.can_preempt(2));
wq.push_back(curr.clone());
Expand Down Expand Up @@ -187,6 +191,8 @@ impl WaitQueue {
assert!(!curr.is_idle());

// we must not block current task with preemption disabled.
// Current expected preempt count is 2.
// 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`.
#[cfg(feature = "preempt")]
assert!(curr.can_preempt(2));
wq.push_back(curr.clone());
Expand All @@ -207,13 +213,11 @@ impl WaitQueue {
/// If `resched` is true, the current task will be preempted when the
/// preemption is enabled.
pub fn notify_one(&self, resched: bool) -> bool {
// Todo: figure out is irq save necessary here.
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let mut wq = self.queue.lock();
if let Some(task) = wq.pop_front() {
task.set_in_wait_queue(false);
drop(wq);
unblock_one_task(task, resched);
drop(wq);
true
} else {
false
Expand All @@ -226,17 +230,14 @@ impl WaitQueue {
/// preemption is enabled.
pub fn notify_all(&self, resched: bool) {
loop {
// Todo: figure out is irq save necessary here.
let kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let mut wq = self.queue.lock();
if let Some(task) = wq.pop_front() {
task.set_in_wait_queue(false);
drop(wq);
unblock_one_task(task, resched);
} else {
break;
}
drop(kernel_guard);
drop(wq);
}
}

Expand All @@ -245,8 +246,6 @@ impl WaitQueue {
/// If `resched` is true, the current task will be preempted when the
/// preemption is enabled.
pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool {
// Todo: figure out is irq save necessary here.
let _kernel_guard = kernel_guard::NoPreemptIrqSave::new();
let mut wq = self.queue.lock();
let task_to_be_notify = {
if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) {
Expand All @@ -258,8 +257,8 @@ impl WaitQueue {
if let Some(task) = task_to_be_notify {
// Mark task as not in wait queue.
task.set_in_wait_queue(false);
drop(wq);
unblock_one_task(task, resched);
drop(wq);
true
} else {
false
Expand Down

0 comments on commit 30e3f9d

Please sign in to comment.