From 92762894992a5779e7de3a51692ef6e6695115e2 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Tue, 3 Sep 2024 21:41:02 +0800 Subject: [PATCH 01/12] feat: support percpu run-queue and cpu affinity for axtask, bug in wait queue? --- .github/workflows/test.yml | 2 +- Cargo.lock | 1 + Makefile | 6 +- api/axfeat/Cargo.toml | 2 +- modules/axhal/src/arch/aarch64/trap.rs | 2 +- modules/axhal/src/arch/riscv/trap.rs | 2 +- modules/axhal/src/arch/x86_64/trap.rs | 2 +- .../axhal/src/platform/aarch64_common/boot.rs | 2 +- .../src/platform/aarch64_qemu_virt/mem.rs | 2 +- .../src/platform/aarch64_qemu_virt/mp.rs | 2 +- .../src/platform/riscv64_qemu_virt/mp.rs | 2 +- modules/axruntime/Cargo.toml | 2 +- modules/axruntime/src/mp.rs | 2 + modules/axtask/Cargo.toml | 13 +- modules/axtask/src/api.rs | 36 +- modules/axtask/src/lib.rs | 1 + modules/axtask/src/run_queue.rs | 314 +++++++++++++++--- modules/axtask/src/task.rs | 75 ++++- modules/axtask/src/timers.rs | 45 ++- modules/axtask/src/wait_queue.rs | 105 +++--- 20 files changed, 476 insertions(+), 142 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d3c1de36f1..1eedadff69 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ on: [push, pull_request] env: qemu-version: 8.2.0 rust-toolchain: nightly-2024-05-02 - arceos-apps: '68054e8' + arceos-apps: 'b25b7e2' jobs: unit-test: diff --git a/Cargo.lock b/Cargo.lock index 3b0e3b5452..dd1c3dcfa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,6 +510,7 @@ dependencies = [ "axconfig", "axhal", "axtask", + "bitmaps", "cfg-if", "crate_interface", "kernel_guard", diff --git a/Makefile b/Makefile index 3d0dd587b0..1736116923 100644 --- a/Makefile +++ b/Makefile @@ -175,11 +175,13 @@ justrun: $(call run_qemu) debug: build - $(call run_qemu_debug) & - sleep 1 + $(call run_qemu_debug) + +gdb: $(GDB) $(OUT_ELF) \ -ex 'target remote localhost:1234' \ -ex 'b rust_entry' \ + -ex 'b panic' \ -ex 'continue' \ -ex 'disp /16i $$pc' diff --git a/api/axfeat/Cargo.toml b/api/axfeat/Cargo.toml index 6e246b36df..17121025f8 100644 --- a/api/axfeat/Cargo.toml +++ b/api/axfeat/Cargo.toml @@ -13,7 +13,7 @@ documentation = "https://arceos-org.github.io/arceos/axfeat/index.html" default = [] # Multicore -smp = ["axhal/smp", "axruntime/smp", "kspin/smp"] +smp = ["axhal/smp", "axruntime/smp", "axtask/smp", "kspin/smp"] # Floating point/SIMD fp_simd = ["axhal/fp_simd"] diff --git a/modules/axhal/src/arch/aarch64/trap.rs b/modules/axhal/src/arch/aarch64/trap.rs index e602c409f9..46dca0e59e 100644 --- a/modules/axhal/src/arch/aarch64/trap.rs +++ b/modules/axhal/src/arch/aarch64/trap.rs @@ -1,7 +1,7 @@ use core::arch::global_asm; use aarch64_cpu::registers::{ESR_EL1, FAR_EL1}; -use memory_addr::{va, VirtAddr}; +use memory_addr::va; use page_table_entry::MappingFlags; use tock_registers::interfaces::Readable; diff --git a/modules/axhal/src/arch/riscv/trap.rs b/modules/axhal/src/arch/riscv/trap.rs index 3eca9af99e..59fd2f8b61 100644 --- a/modules/axhal/src/arch/riscv/trap.rs +++ b/modules/axhal/src/arch/riscv/trap.rs @@ -1,4 +1,4 @@ -use memory_addr::{va, VirtAddr}; +use memory_addr::va; use page_table_entry::MappingFlags; use riscv::register::scause::{self, Exception as E, Trap}; use riscv::register::stval; diff --git a/modules/axhal/src/arch/x86_64/trap.rs b/modules/axhal/src/arch/x86_64/trap.rs index 9c00e5f9aa..970dc6e738 100644 --- a/modules/axhal/src/arch/x86_64/trap.rs +++ b/modules/axhal/src/arch/x86_64/trap.rs @@ -1,4 +1,4 @@ -use memory_addr::{va, VirtAddr}; +use memory_addr::va; use page_table_entry::MappingFlags; use x86::{controlregs::cr2, irq::*}; use x86_64::structures::idt::PageFaultErrorCode; diff --git a/modules/axhal/src/platform/aarch64_common/boot.rs b/modules/axhal/src/platform/aarch64_common/boot.rs index 1174af5f7c..b9176037ac 100644 --- a/modules/axhal/src/platform/aarch64_common/boot.rs +++ b/modules/axhal/src/platform/aarch64_common/boot.rs @@ -1,6 +1,6 @@ use aarch64_cpu::{asm, asm::barrier, registers::*}; use core::ptr::addr_of_mut; -use memory_addr::{pa, PhysAddr}; +use memory_addr::pa; use page_table_entry::aarch64::{MemAttr, A64PTE}; use tock_registers::interfaces::{ReadWriteable, Readable, Writeable}; diff --git a/modules/axhal/src/platform/aarch64_qemu_virt/mem.rs b/modules/axhal/src/platform/aarch64_qemu_virt/mem.rs index 23e6b00428..b6d7901901 100644 --- a/modules/axhal/src/platform/aarch64_qemu_virt/mem.rs +++ b/modules/axhal/src/platform/aarch64_qemu_virt/mem.rs @@ -1,4 +1,4 @@ -use crate::mem::{pa, MemRegion, PhysAddr}; +use crate::mem::{pa, MemRegion}; use page_table_entry::{aarch64::A64PTE, GenericPTE, MappingFlags}; /// Returns platform-specific memory regions. diff --git a/modules/axhal/src/platform/aarch64_qemu_virt/mp.rs b/modules/axhal/src/platform/aarch64_qemu_virt/mp.rs index 95e7625591..52cf8c67fb 100644 --- a/modules/axhal/src/platform/aarch64_qemu_virt/mp.rs +++ b/modules/axhal/src/platform/aarch64_qemu_virt/mp.rs @@ -1,4 +1,4 @@ -use crate::mem::{va, virt_to_phys, PhysAddr, VirtAddr}; +use crate::mem::{va, virt_to_phys, PhysAddr}; /// Starts the given secondary CPU with its boot stack. pub fn start_secondary_cpu(cpu_id: usize, stack_top: PhysAddr) { diff --git a/modules/axhal/src/platform/riscv64_qemu_virt/mp.rs b/modules/axhal/src/platform/riscv64_qemu_virt/mp.rs index 8e001b96c7..11c4966ebe 100644 --- a/modules/axhal/src/platform/riscv64_qemu_virt/mp.rs +++ b/modules/axhal/src/platform/riscv64_qemu_virt/mp.rs @@ -1,4 +1,4 @@ -use crate::mem::{va, virt_to_phys, PhysAddr, VirtAddr}; +use crate::mem::{va, virt_to_phys, PhysAddr}; /// Starts the given secondary CPU with its boot stack. pub fn start_secondary_cpu(hartid: usize, stack_top: PhysAddr) { diff --git a/modules/axruntime/Cargo.toml b/modules/axruntime/Cargo.toml index b904fe6b89..c26237b718 100644 --- a/modules/axruntime/Cargo.toml +++ b/modules/axruntime/Cargo.toml @@ -12,7 +12,7 @@ documentation = "https://arceos-org.github.io/arceos/axruntime/index.html" [features] default = [] -smp = ["axhal/smp"] +smp = ["axhal/smp", "axtask?/smp"] irq = ["axhal/irq", "axtask?/irq", "percpu", "kernel_guard"] tls = ["axhal/tls", "axtask?/tls"] alloc = ["axalloc"] diff --git a/modules/axruntime/src/mp.rs b/modules/axruntime/src/mp.rs index a622fdb7e8..beef5d27cc 100644 --- a/modules/axruntime/src/mp.rs +++ b/modules/axruntime/src/mp.rs @@ -52,6 +52,8 @@ pub extern "C" fn rust_main_secondary(cpu_id: usize) -> ! { #[cfg(feature = "irq")] axhal::arch::enable_irqs(); + debug!("Secondary CPU {:x} running...", cpu_id); + #[cfg(all(feature = "tls", not(feature = "multitask")))] super::init_tls(); diff --git a/modules/axtask/Cargo.toml b/modules/axtask/Cargo.toml index 25fe7fb371..09ef12da1b 100644 --- a/modules/axtask/Cargo.toml +++ b/modules/axtask/Cargo.toml @@ -13,12 +13,20 @@ documentation = "https://arceos-org.github.io/arceos/axtask/index.html" default = [] multitask = [ - "dep:axconfig", "dep:percpu", "dep:kspin", "dep:lazyinit", "dep:memory_addr", - "dep:scheduler", "dep:timer_list", "kernel_guard", "dep:crate_interface", + "dep:axconfig", + "dep:percpu", + "dep:kspin", + "dep:lazyinit", + "dep:memory_addr", + "dep:scheduler", + "dep:timer_list", + "kernel_guard", + "dep:crate_interface", ] irq = [] tls = ["axhal/tls"] preempt = ["irq", "percpu?/preempt", "kernel_guard/preempt"] +smp = ["kspin/smp"] sched_fifo = ["multitask"] sched_rr = ["multitask", "preempt"] @@ -29,6 +37,7 @@ test = ["percpu?/sp-naive"] [dependencies] cfg-if = "1.0" log = "0.4.21" +bitmaps = { version = "3.2.1", default-features = false } axhal = { workspace = true } axconfig = { workspace = true, optional = true } percpu = { version = "0.1", optional = true } diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 595d3ae37a..2c01629a65 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -2,7 +2,7 @@ use alloc::{string::String, sync::Arc}; -pub(crate) use crate::run_queue::{AxRunQueue, RUN_QUEUE}; +pub(crate) use crate::run_queue::{current_run_queue, select_run_queue}; #[doc(cfg(feature = "multitask"))] pub use crate::task::{CurrentTask, TaskId, TaskInner}; @@ -75,6 +75,8 @@ pub fn init_scheduler() { /// Initializes the task scheduler for secondary CPUs. pub fn init_scheduler_secondary() { crate::run_queue::init_secondary(); + #[cfg(feature = "irq")] + crate::timers::init(); } /// Handles periodic timer ticks for the task manager. @@ -84,7 +86,10 @@ pub fn init_scheduler_secondary() { #[doc(cfg(feature = "irq"))] pub fn on_timer_tick() { crate::timers::check_events(); - RUN_QUEUE.lock().scheduler_timer_tick(); + current_run_queue() + .scheduler() + .lock() + .scheduler_timer_tick(); } /// Spawns a new task with the given parameters. @@ -94,8 +99,20 @@ pub fn spawn_raw(f: F, name: String, stack_size: usize) -> AxTaskRef where F: FnOnce() + Send + 'static, { - let task = TaskInner::new(f, name, stack_size); - RUN_QUEUE.lock().add_task(task.clone()); + let task = TaskInner::new( + f, + name, + stack_size, + #[cfg(feature = "smp")] + None, + ); + crate::select_run_queue( + #[cfg(feature = "smp")] + task.clone(), + ) + .scheduler() + .lock() + .add_task(task.clone()); task } @@ -122,13 +139,16 @@ where /// /// [CFS]: https://en.wikipedia.org/wiki/Completely_Fair_Scheduler pub fn set_priority(prio: isize) -> bool { - RUN_QUEUE.lock().set_current_priority(prio) + current_run_queue() + .scheduler() + .lock() + .set_current_priority(prio) } /// Current task gives up the CPU time voluntarily, and switches to another /// ready task. pub fn yield_now() { - RUN_QUEUE.lock().yield_current(); + current_run_queue().scheduler().lock().yield_current() } /// Current task is going to sleep for the given duration. @@ -143,14 +163,14 @@ pub fn sleep(dur: core::time::Duration) { /// If the feature `irq` is not enabled, it uses busy-wait instead. pub fn sleep_until(deadline: axhal::time::TimeValue) { #[cfg(feature = "irq")] - RUN_QUEUE.lock().sleep_until(deadline); + current_run_queue().scheduler().lock().sleep_until(deadline); #[cfg(not(feature = "irq"))] axhal::time::busy_wait_until(deadline); } /// Exits the current task. pub fn exit(exit_code: i32) -> ! { - RUN_QUEUE.lock().exit_current(exit_code) + current_run_queue().exit_current(exit_code) } /// The idle task routine. diff --git a/modules/axtask/src/lib.rs b/modules/axtask/src/lib.rs index c82b842a9a..2aecb3796b 100644 --- a/modules/axtask/src/lib.rs +++ b/modules/axtask/src/lib.rs @@ -38,6 +38,7 @@ cfg_if::cfg_if! { extern crate log; extern crate alloc; + #[macro_use] mod run_queue; mod task; mod api; diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index e015265d69..fe0a6c0e35 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -1,39 +1,247 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicUsize, Ordering}; + +#[cfg(feature = "smp")] +use bitmaps::Bitmap; use kspin::SpinNoIrq; use lazyinit::LazyInit; use scheduler::BaseScheduler; +use axhal::cpu::this_cpu_id; + use crate::task::{CurrentTask, TaskState}; use crate::{AxTaskRef, Scheduler, TaskInner, WaitQueue}; -// TODO: per-CPU -pub(crate) static RUN_QUEUE: LazyInit> = LazyInit::new(); +#[percpu::def_percpu] +static RUN_QUEUE: LazyInit = LazyInit::new(); -// TODO: per-CPU -static EXITED_TASKS: SpinNoIrq> = SpinNoIrq::new(VecDeque::new()); +#[percpu::def_percpu] +static EXITED_TASKS: VecDeque = VecDeque::new(); +#[percpu::def_percpu] static WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(); #[percpu::def_percpu] static IDLE_TASK: LazyInit = LazyInit::new(); +/// An array of references to run queues, one for each CPU, indexed by cpu_id. +/// +/// This static variable holds references to the run queues for each CPU in the system. +/// +/// # Safety +/// +/// Access to this variable is marked as `unsafe` because it contains `MaybeUninit` references, +/// which require careful handling to avoid undefined behavior. The array should be fully +/// initialized before being accessed to ensure safe usage. +static mut RUN_QUEUES: [MaybeUninit<&'static AxRunQueue>; axconfig::SMP] = + [MaybeUninit::uninit(); axconfig::SMP]; + +/// Returns a reference to the current run queue. +/// +/// ## Safety +/// +/// This function returns a static reference to the current run queue, which +/// is inherently unsafe. It assumes that the `RUN_QUEUE` has been properly +/// initialized and is not accessed concurrently in a way that could cause +/// data races or undefined behavior. +/// +/// ## Returns +/// +/// A static reference to the current run queue. +#[inline] +pub(crate) fn current_run_queue() -> &'static AxRunQueue { + unsafe { RUN_QUEUE.current_ref_raw() } +} + +/// Selects the run queue index based on a CPU set bitmap, minimizing the number of tasks. +/// +/// This function filters the available run queues based on the provided `cpu_set` and +/// selects the one with the fewest tasks. The selected run queue's index (cpu_id) is returned. +/// +/// ## Arguments +/// +/// * `cpu_set` - A bitmap representing the CPUs that are eligible for task execution. +/// +/// ## Returns +/// +/// The index (cpu_id) of the selected run queue. +/// +/// ## Panics +/// +/// This function will panic if there is no available run queue that matches the CPU set. +/// +#[cfg(feature = "smp")] +#[inline] +fn select_run_queue_index(cpu_set: Bitmap<{ axconfig::SMP }>) -> usize { + unsafe { + RUN_QUEUES + .iter() + .filter(|rq| cpu_set.get(rq.assume_init().cpu_id())) + .min_by_key(|rq| rq.assume_init().num_tasks()) + .expect("No available run queue that matches the CPU set") + .assume_init() + .cpu_id() + } +} + +/// Retrieves a `'static` reference to the run queue corresponding to the given index. +/// +/// This function asserts that the provided index is within the range of available CPUs +/// and returns a reference to the corresponding run queue. +/// +/// ## Arguments +/// +/// * `index` - The index of the run queue to retrieve. +/// +/// ## Returns +/// +/// A reference to the `AxRunQueue` corresponding to the provided index. +/// +/// ## Panics +/// +/// This function will panic if the index is out of bounds. +/// +#[inline] +fn get_run_queue(index: usize) -> &'static AxRunQueue { + assert!(index < axconfig::SMP); + unsafe { RUN_QUEUES[index].assume_init() } +} + +/// Selects the appropriate run queue for the provided task. +/// +/// * In a single-core system, this function always returns a reference to the global run queue. +/// * In a multi-core system, this function selects the run queue based on the task's CPU affinity and load balance. +/// +/// ## Arguments +/// +/// * `task` - A reference to the task for which a run queue is being selected. +/// +/// ## Returns +/// +/// A reference to the selected `AxRunQueue`. +/// +/// ## TODO +/// +/// 1. Implement better load balancing across CPUs for more efficient task distribution. +/// 2. Use a more generic load balancing algorithm that can be customized or replaced. +/// +#[inline] +pub(crate) fn select_run_queue(#[cfg(feature = "smp")] task: AxTaskRef) -> &'static AxRunQueue { + #[cfg(not(feature = "smp"))] + { + // When SMP is disabled, all tasks are scheduled on the same global run queue. + current_run_queue() + } + #[cfg(feature = "smp")] + { + // When SMP is enabled, select the run queue based on the task's CPU affinity and load balance. + let index = select_run_queue_index(task.cpu_set()); + get_run_queue(index) + } +} + +/// AxRunQueue represents a run queue for global system or a specific CPU. pub(crate) struct AxRunQueue { + /// The ID of the CPU this run queue is associated with. + cpu_id: usize, + /// The number of tasks currently in the run queue. + num_tasks: AtomicUsize, + /// The inner structure of the run queue, protected by a SpinNoIrq lock to ensure thread safety. + inner: SpinNoIrq, +} + +/// A structure that holds the core components of a run queue. +/// protected by a `SpinNoIrq` lock to ensure thread safety during scheduling. +pub struct AxRunQueueInner { + /// The ID of the CPU this run queue is associated with. + cpu_id: usize, + /// The core scheduler of this run queue. scheduler: Scheduler, } impl AxRunQueue { - pub fn new() -> SpinNoIrq { - let gc_task = TaskInner::new(gc_entry, "gc".into(), axconfig::TASK_STACK_SIZE); + pub fn new(cpu_id: usize) -> Self { + let gc_task = TaskInner::new( + gc_entry, + "gc".into(), + axconfig::TASK_STACK_SIZE, + // gc task shoule be pinned to the current CPU. + #[cfg(feature = "smp")] + Some(1 << cpu_id), + ); let mut scheduler = Scheduler::new(); scheduler.add_task(gc_task); - SpinNoIrq::new(Self { scheduler }) + Self { + cpu_id, + num_tasks: AtomicUsize::new(2), + inner: SpinNoIrq::new(AxRunQueueInner { cpu_id, scheduler }), + } + } + + /// Returns the cpu id of current run queue, + /// which is also its index in `RUN_QUEUES`. + pub fn cpu_id(&self) -> usize { + self.cpu_id + } + + /// Returns the number of tasks in current run queue, + /// which is used for load balance during scheduling. + #[cfg(feature = "smp")] + pub fn num_tasks(&self) -> usize { + self.num_tasks.load(Ordering::Acquire) + } + + /// Returns a reference to the inner scheduler of the run queue locked by a `SpinNoIrq` lock. + /// Note: the scheduler lock is explicitly held during the scheduling process where task scheduling may happen, + /// it is explicitly released before the context switch by `force_unlock()`. + pub(crate) fn scheduler(&self) -> &SpinNoIrq { + &self.inner } + pub fn exit_current(&self, exit_code: i32) -> ! { + // We do not own an `SpinNoIrq` lock here, so we need to disable IRQ and preempt manually. + let _kernel_guard = kernel_guard::IrqSave::new(); + + let curr = crate::current(); + debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + if curr.is_init() { + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.clear()); + axhal::misc::terminate(); + } else { + curr.set_state(TaskState::Exited); + self.num_tasks.fetch_sub(1, Ordering::AcqRel); + + // Notify the joiner task. + curr.notify_exit(exit_code); + + // Push current task to the `EXITED_TASKS` list, which will be consumed by the GC task. + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(curr.clone())); + // Wake up the GC task to drop the exited tasks. + WAIT_FOR_EXIT.with_current(|wq| wq.notify_one(false)); + // `SpinNoIrq` lock until now. + self.scheduler().lock().resched(false); + } + unreachable!("task exited!"); + } +} + +/// Core functions of run queue, which should be called after holding the scheduler() lock. +impl AxRunQueueInner { pub fn add_task(&mut self, task: AxTaskRef) { - debug!("task spawn: {}", task.id_name()); + debug!( + "task spawn: {} on run_queue {}", + task.id_name(), + self.cpu_id + ); assert!(task.is_ready()); self.scheduler.add_task(task); + get_run_queue(self.cpu_id) + .num_tasks + .fetch_add(1, Ordering::AcqRel); } #[cfg(feature = "irq")] @@ -81,24 +289,6 @@ impl AxRunQueue { } } - pub fn exit_current(&mut self, exit_code: i32) -> ! { - let curr = crate::current(); - debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); - assert!(curr.is_running()); - assert!(!curr.is_idle()); - if curr.is_init() { - EXITED_TASKS.lock().clear(); - axhal::misc::terminate(); - } else { - curr.set_state(TaskState::Exited); - curr.notify_exit(exit_code, self); - EXITED_TASKS.lock().push_back(curr.clone()); - WAIT_FOR_EXIT.notify_one_locked(false, self); - self.resched(false); - } - unreachable!("task exited!"); - } - pub fn block_current(&mut self, wait_queue_push: F) where F: FnOnce(AxTaskRef), @@ -113,16 +303,25 @@ impl AxRunQueue { assert!(curr.can_preempt(1)); curr.set_state(TaskState::Blocked); + current_run_queue().num_tasks.fetch_sub(1, Ordering::AcqRel); wait_queue_push(curr.clone()); self.resched(false); } pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { - debug!("task unblock: {}", task.id_name()); + let cpu_id = self.cpu_id; + debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); if task.is_blocked() { task.set_state(TaskState::Ready); self.scheduler.add_task(task); // TODO: priority - if resched { + + get_run_queue(cpu_id) + .num_tasks + .fetch_add(1, Ordering::AcqRel); + + // Note: when the task is unblocked on another CPU's run queue, + // we just ingiore the `resched` flag. + if resched && cpu_id == this_cpu_id() { #[cfg(feature = "preempt")] crate::current().set_preempt_pending(true); } @@ -140,12 +339,15 @@ impl AxRunQueue { if now < deadline { crate::timers::set_alarm_wakeup(deadline, curr.clone()); curr.set_state(TaskState::Blocked); + get_run_queue(self.cpu_id) + .num_tasks + .fetch_sub(1, Ordering::AcqRel); self.resched(false); } } } -impl AxRunQueue { +impl AxRunQueueInner { /// Common reschedule subroutine. If `preempt`, keep current task's time /// slice, otherwise reset it. fn resched(&mut self, preempt: bool) { @@ -164,11 +366,13 @@ impl AxRunQueue { } fn switch_to(&mut self, prev_task: CurrentTask, next_task: AxTaskRef) { - trace!( - "context switch: {} -> {}", - prev_task.id_name(), - next_task.id_name() - ); + if !prev_task.is_idle() || !next_task.is_idle() { + debug!( + "context switch: {} -> {}", + prev_task.id_name(), + next_task.id_name() + ); + } #[cfg(feature = "preempt")] next_task.set_preempt_pending(false); next_task.set_state(TaskState::Running); @@ -186,6 +390,10 @@ impl AxRunQueue { assert!(Arc::strong_count(&next_task) >= 1); CurrentTask::set_current(prev_task, next_task); + + // Release the lock that was explicitly acquired by `scheduler()`. + crate::current_run_queue().scheduler().force_unlock(); + (*prev_ctx_ptr).switch_to(&*next_ctx_ptr); } } @@ -194,10 +402,10 @@ impl AxRunQueue { fn gc_entry() { loop { // Drop all exited tasks and recycle resources. - let n = EXITED_TASKS.lock().len(); + let n = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.len()); for _ in 0..n { // Do not do the slow drops in the critical section. - let task = EXITED_TASKS.lock().pop_front(); + let task = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.pop_front()); if let Some(task) = task { if Arc::strong_count(&task) == 1 { // If I'm the last holder of the task, drop it immediately. @@ -205,33 +413,57 @@ fn gc_entry() { } else { // Otherwise (e.g, `switch_to` is not compeleted, held by the // joiner, etc), push it back and wait for them to drop first. - EXITED_TASKS.lock().push_back(task); + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(task)); } } } - WAIT_FOR_EXIT.wait(); + unsafe { WAIT_FOR_EXIT.current_ref_raw() }.wait(); } } pub(crate) fn init() { + let cpu_id = this_cpu_id(); + + // Create the `idle` task (not current task). const IDLE_TASK_STACK_SIZE: usize = 4096; - let idle_task = TaskInner::new(|| crate::run_idle(), "idle".into(), IDLE_TASK_STACK_SIZE); + let idle_task = TaskInner::new( + || crate::run_idle(), + "idle".into(), + IDLE_TASK_STACK_SIZE, + #[cfg(feature = "smp")] + Some(1 << cpu_id), + ); IDLE_TASK.with_current(|i| { i.init_once(idle_task.clone()); }); let main_task = TaskInner::new_init("main".into()); main_task.set_state(TaskState::Running); - - RUN_QUEUE.init_once(AxRunQueue::new()); unsafe { CurrentTask::init_current(main_task) } + + info!("Initialize RUN_QUEUES"); + RUN_QUEUE.with_current(|rq| { + rq.init_once(AxRunQueue::new(cpu_id)); + }); + unsafe { + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_raw()); + } } pub(crate) fn init_secondary() { + let cpu_id = this_cpu_id(); + + // Put the subsequent execution into the `idle` task. let idle_task = TaskInner::new_init("idle".into()); idle_task.set_state(TaskState::Running); IDLE_TASK.with_current(|i| { i.init_once(idle_task.clone()); }); unsafe { CurrentTask::init_current(idle_task) } + RUN_QUEUE.with_current(|rq| { + rq.init_once(AxRunQueue::new(cpu_id)); + }); + unsafe { + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_raw()); + } } diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index a19e105ac6..3f0dd760d0 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -1,18 +1,21 @@ use alloc::{boxed::Box, string::String, sync::Arc}; use core::ops::Deref; +#[cfg(any(feature = "preempt", feature = "irq"))] +use core::sync::atomic::AtomicUsize; use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull}; -#[cfg(feature = "preempt")] -use core::sync::atomic::AtomicUsize; +#[cfg(feature = "smp")] +use bitmaps::Bitmap; +use memory_addr::{align_up_4k, va, VirtAddr}; +use axhal::arch::TaskContext; +#[cfg(feature = "smp")] +use axhal::cpu::this_cpu_id; #[cfg(feature = "tls")] use axhal::tls::TlsArea; -use axhal::arch::TaskContext; -use memory_addr::{align_up_4k, va, VirtAddr}; - -use crate::{AxRunQueue, AxTask, AxTaskRef, WaitQueue}; +use crate::{AxTask, AxTaskRef, WaitQueue}; /// A unique identifier for a thread. #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -38,6 +41,10 @@ pub struct TaskInner { entry: Option<*mut dyn FnOnce()>, state: AtomicU8, + /// CPU affinity mask. + #[cfg(feature = "smp")] + cpu_set: Bitmap<{ axconfig::SMP }>, + in_wait_queue: AtomicBool, #[cfg(feature = "irq")] in_timer_list: AtomicBool, @@ -121,6 +128,8 @@ impl TaskInner { is_init: false, entry: None, state: AtomicU8::new(TaskState::Ready as u8), + #[cfg(feature = "smp")] + cpu_set: Bitmap::new(), in_wait_queue: AtomicBool::new(false), #[cfg(feature = "irq")] in_timer_list: AtomicBool::new(false), @@ -138,7 +147,21 @@ impl TaskInner { } /// Create a new task with the given entry function and stack size. - pub(crate) fn new(entry: F, name: String, stack_size: usize) -> AxTaskRef + /// + /// When "smp" feature is enabled: + /// `cpu_set` represents a set of physical CPUs, which is implemented as a bit mask, + /// refering to `cpu_set_t` in Linux. + /// The task will be only scheduled on the specified CPUs if `cpu_set` is set as `Some(cpu_mask)`, + /// Otherwise, the task will be scheduled on all CPUs under specific load balancing policy. + /// Reference: + /// * https://man7.org/linux/man-pages/man2/sched_setaffinity.2.html + /// * https://man7.org/linux/man-pages/man3/CPU_SET.3.html + pub(crate) fn new( + entry: F, + name: String, + stack_size: usize, + #[cfg(feature = "smp")] cpu_set: Option, + ) -> AxTaskRef where F: FnOnce() + Send + 'static, { @@ -157,6 +180,24 @@ impl TaskInner { if t.name == "idle" { t.is_idle = true; } + #[cfg(feature = "smp")] + { + t.cpu_set = match cpu_set { + Some(cpu_set) => { + let mut bit_map = Bitmap::new(); + let mut i = 0; + while i < axconfig::SMP { + if cpu_set & (1 << i) != 0 { + bit_map.set(i, true); + } + i += 1; + } + bit_map + } + // This task can be scheduled on all CPUs by default. + None => Bitmap::mask(axconfig::SMP), + }; + } Arc::new(AxTask::new(t)) } @@ -171,6 +212,8 @@ impl TaskInner { pub(crate) fn new_init(name: String) -> AxTaskRef { let mut t = Self::new_common(TaskId::new(), name); t.is_init = true; + #[cfg(feature = "smp")] + t.cpu_set.set(this_cpu_id(), true); if t.name == "idle" { t.is_idle = true; } @@ -212,6 +255,12 @@ impl TaskInner { self.is_idle } + #[cfg(feature = "smp")] + #[inline] + pub(crate) const fn cpu_set(&self) -> Bitmap<{ axconfig::SMP }> { + self.cpu_set + } + #[inline] pub(crate) fn in_wait_queue(&self) -> bool { self.in_wait_queue.load(Ordering::Acquire) @@ -265,16 +314,17 @@ impl TaskInner { fn current_check_preempt_pending() { let curr = crate::current(); if curr.need_resched.load(Ordering::Acquire) && curr.can_preempt(0) { - let mut rq = crate::RUN_QUEUE.lock(); + let mut rq_locked = crate::current_run_queue().scheduler().lock(); if curr.need_resched.load(Ordering::Acquire) { - rq.preempt_resched(); + rq_locked.preempt_resched(); } } } - pub(crate) fn notify_exit(&self, exit_code: i32, rq: &mut AxRunQueue) { + /// Notify all tasks that join on this task. + pub(crate) fn notify_exit(&self, exit_code: i32) { self.exit_code.store(exit_code, Ordering::Release); - self.wait_for_exit.notify_all_locked(false, rq); + self.wait_for_exit.notify_all(false); } #[inline] @@ -379,8 +429,7 @@ impl Deref for CurrentTask { } extern "C" fn task_entry() -> ! { - // release the lock that was implicitly held across the reschedule - unsafe { crate::RUN_QUEUE.force_unlock() }; + // Enable irq (if feature "irq" is enabled) before running the task entry function. #[cfg(feature = "irq")] axhal::arch::enable_irqs(); let task = crate::current(); diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index 1c4a8eed05..ca7afbfa47 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -1,40 +1,57 @@ use alloc::sync::Arc; -use axhal::time::wall_time; + use kspin::SpinNoIrq; use lazyinit::LazyInit; use timer_list::{TimeValue, TimerEvent, TimerList}; -use crate::{AxTaskRef, RUN_QUEUE}; +use axhal::time::wall_time; + +use crate::{select_run_queue, AxTaskRef}; -// TODO: per-CPU +#[percpu::def_percpu] static TIMER_LIST: LazyInit>> = LazyInit::new(); struct TaskWakeupEvent(AxTaskRef); impl TimerEvent for TaskWakeupEvent { fn callback(self, _now: TimeValue) { - let mut rq = RUN_QUEUE.lock(); + // Originally, irq and preempt are disabled by SpinNoIrq lock hold by RUN_QUEUE. + // But, we can't use RUN_QUEUE here, so we need to disable irq and preempt manually. + // Todo: figure out if `NoPreempt` is needed here. + // let _guard = kernel_guard::NoPreemptIrqSave::new(); + let mut rq_locked = select_run_queue( + #[cfg(feature = "smp")] + self.0.clone(), + ) + .scheduler() + .lock(); + self.0.set_in_timer_list(false); - rq.unblock_task(self.0, true); + + rq_locked.unblock_task(self.0, true); } } pub fn set_alarm_wakeup(deadline: TimeValue, task: AxTaskRef) { - let mut timers = TIMER_LIST.lock(); - task.set_in_timer_list(true); - timers.set(deadline, TaskWakeupEvent(task)); + TIMER_LIST.with_current(|timer_list| { + let mut timers = timer_list.lock(); + task.set_in_timer_list(true); + timers.set(deadline, TaskWakeupEvent(task)); + }) } pub fn cancel_alarm(task: &AxTaskRef) { - let mut timers = TIMER_LIST.lock(); - task.set_in_timer_list(false); - timers.cancel(|t| Arc::ptr_eq(&t.0, task)); + TIMER_LIST.with_current(|timer_list| { + let mut timers = timer_list.lock(); + task.set_in_timer_list(false); + timers.cancel(|t| Arc::ptr_eq(&t.0, task)); + }) } pub fn check_events() { loop { let now = wall_time(); - let event = TIMER_LIST.lock().expire_one(now); + let event = TIMER_LIST.with_current(|timers| timers.lock().expire_one(now)); if let Some((_deadline, event)) = event { event.callback(now); } else { @@ -44,5 +61,7 @@ pub fn check_events() { } pub fn init() { - TIMER_LIST.init_once(SpinNoIrq::new(TimerList::new())); + TIMER_LIST.with_current(|timer_list| { + timer_list.init_once(SpinNoIrq::new(TimerList::new())); + }); } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index d13628ad6f..816f08fca0 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -1,8 +1,8 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; -use kspin::SpinRaw; +use kspin::SpinNoIrq; -use crate::{AxRunQueue, AxTaskRef, CurrentTask, RUN_QUEUE}; +use crate::{current_run_queue, select_run_queue, AxTaskRef, CurrentTask}; /// A queue to store sleeping tasks. /// @@ -27,21 +27,21 @@ use crate::{AxRunQueue, AxTaskRef, CurrentTask, RUN_QUEUE}; /// assert_eq!(VALUE.load(Ordering::Relaxed), 1); /// ``` pub struct WaitQueue { - queue: SpinRaw>, // we already disabled IRQs when lock the `RUN_QUEUE` + queue: SpinNoIrq>, } impl WaitQueue { /// Creates an empty wait queue. pub const fn new() -> Self { Self { - queue: SpinRaw::new(VecDeque::new()), + queue: SpinNoIrq::new(VecDeque::new()), } } /// Creates an empty wait queue with space for at least `capacity` elements. pub fn with_capacity(capacity: usize) -> Self { Self { - queue: SpinRaw::new(VecDeque::with_capacity(capacity)), + queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)), } } @@ -50,9 +50,8 @@ impl WaitQueue { // the event from another queue. if curr.in_wait_queue() { // wake up by timer (timeout). - // `RUN_QUEUE` is not locked here, so disable IRQs. - let _guard = kernel_guard::IrqSave::new(); - self.queue.lock().retain(|t| !curr.ptr_eq(t)); + let mut wait_queue_locked = self.queue.lock(); + wait_queue_locked.retain(|t| !curr.ptr_eq(t)); curr.set_in_wait_queue(false); } #[cfg(feature = "irq")] @@ -65,10 +64,13 @@ impl WaitQueue { /// Blocks the current task and put it into the wait queue, until other task /// notifies it. pub fn wait(&self) { - RUN_QUEUE.lock().block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + current_run_queue() + .scheduler() + .lock() + .block_current(|task| { + task.set_in_wait_queue(true); + self.queue.lock().push_back(task) + }); self.cancel_events(crate::current()); } @@ -82,13 +84,13 @@ impl WaitQueue { F: Fn() -> bool, { loop { - let mut rq = RUN_QUEUE.lock(); + let mut rq_locked = current_run_queue().scheduler().lock(); if condition() { break; } - rq.block_current(|task| { + rq_locked.block_current(|task| { task.set_in_wait_queue(true); - self.queue.lock().push_back(task); + self.queue.lock().push_back(task) }); } self.cancel_events(crate::current()); @@ -98,6 +100,7 @@ impl WaitQueue { /// notify it, or the given duration has elapsed. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { + let mut rq_locked = current_run_queue().scheduler().lock(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -107,7 +110,7 @@ impl WaitQueue { ); crate::timers::set_alarm_wakeup(deadline, curr.clone()); - RUN_QUEUE.lock().block_current(|task| { + rq_locked.block_current(|task| { task.set_in_wait_queue(true); self.queue.lock().push_back(task) }); @@ -137,14 +140,14 @@ impl WaitQueue { let mut timeout = true; while axhal::time::wall_time() < deadline { - let mut rq = RUN_QUEUE.lock(); + let mut rq_locked = current_run_queue().scheduler().lock(); if condition() { timeout = false; break; } - rq.block_current(|task| { + rq_locked.block_current(|task| { task.set_in_wait_queue(true); - self.queue.lock().push_back(task); + self.queue.lock().push_back(task) }); } self.cancel_events(curr); @@ -156,12 +159,11 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_one(&self, resched: bool) -> bool { - let mut rq = RUN_QUEUE.lock(); - if !self.queue.lock().is_empty() { - self.notify_one_locked(resched, &mut rq) - } else { - false - } + let Some(task) = self.queue.lock().pop_front() else { + return false; + }; + unblock_one_task(task, resched); + true } /// Wakes all tasks in the wait queue. @@ -170,14 +172,10 @@ impl WaitQueue { /// preemption is enabled. pub fn notify_all(&self, resched: bool) { loop { - let mut rq = RUN_QUEUE.lock(); - if let Some(task) = self.queue.lock().pop_front() { - task.set_in_wait_queue(false); - rq.unblock_task(task, resched); - } else { + let Some(task) = self.queue.lock().pop_front() else { break; - } - drop(rq); // we must unlock `RUN_QUEUE` after unlocking `self.queue`. + }; + unblock_one_task(task, resched); } } @@ -186,31 +184,32 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool { - let mut rq = RUN_QUEUE.lock(); - let mut wq = self.queue.lock(); - if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { - task.set_in_wait_queue(false); - rq.unblock_task(wq.remove(index).unwrap(), resched); - true - } else { - false - } - } - - pub(crate) fn notify_one_locked(&self, resched: bool, rq: &mut AxRunQueue) -> bool { - if let Some(task) = self.queue.lock().pop_front() { - task.set_in_wait_queue(false); - rq.unblock_task(task, resched); + let task_to_be_notify = { + let mut wq = self.queue.lock(); + if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { + wq.remove(index) + } else { + None + } + }; + if let Some(task) = task_to_be_notify { + unblock_one_task(task, resched); true } else { false } } +} - pub(crate) fn notify_all_locked(&self, resched: bool, rq: &mut AxRunQueue) { - while let Some(task) = self.queue.lock().pop_front() { - task.set_in_wait_queue(false); - rq.unblock_task(task, resched); - } - } +pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) { + // Select run queue by the CPU set of the task. + let mut rq_locked = select_run_queue( + #[cfg(feature = "smp")] + task.clone(), + ) + .scheduler() + .lock(); + + task.set_in_wait_queue(false); + rq_locked.unblock_task(task, resched) } From 5bd98e707fd936ea8a363d38083cf4d228d30344 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Tue, 10 Sep 2024 21:54:45 +0800 Subject: [PATCH 02/12] fix: modify wait_queue's lock mechanism to fix bug related to notify_until --- modules/axtask/src/run_queue.rs | 4 ++- modules/axtask/src/wait_queue.rs | 43 +++++++++++++++++++++----------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index fe0a6c0e35..bcc89b606a 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -298,13 +298,15 @@ impl AxRunQueueInner { assert!(curr.is_running()); assert!(!curr.is_idle()); + wait_queue_push(curr.clone()); + // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] assert!(curr.can_preempt(1)); curr.set_state(TaskState::Blocked); current_run_queue().num_tasks.fetch_sub(1, Ordering::AcqRel); - wait_queue_push(curr.clone()); + self.resched(false); } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index 816f08fca0..fbf7318320 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -84,14 +84,18 @@ impl WaitQueue { F: Fn() -> bool, { loop { - let mut rq_locked = current_run_queue().scheduler().lock(); + let mut wq = self.queue.lock(); if condition() { break; } - rq_locked.block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + current_run_queue() + .scheduler() + .lock() + .block_current(|task| { + task.set_in_wait_queue(true); + wq.push_back(task); + drop(wq) + }); } self.cancel_events(crate::current()); } @@ -100,7 +104,7 @@ impl WaitQueue { /// notify it, or the given duration has elapsed. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { - let mut rq_locked = current_run_queue().scheduler().lock(); + let mut wq = self.queue.lock(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -110,10 +114,14 @@ impl WaitQueue { ); crate::timers::set_alarm_wakeup(deadline, curr.clone()); - rq_locked.block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + current_run_queue() + .scheduler() + .lock() + .block_current(|task| { + task.set_in_wait_queue(true); + wq.push_back(task); + drop(wq) + }); let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out self.cancel_events(curr); timeout @@ -140,15 +148,20 @@ impl WaitQueue { let mut timeout = true; while axhal::time::wall_time() < deadline { - let mut rq_locked = current_run_queue().scheduler().lock(); + let mut wq = self.queue.lock(); + // let mut rq_locked = current_run_queue().scheduler().lock(); if condition() { timeout = false; break; } - rq_locked.block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + current_run_queue() + .scheduler() + .lock() + .block_current(|task| { + task.set_in_wait_queue(true); + wq.push_back(task); + drop(wq) + }); } self.cancel_events(curr); timeout From 7b8289076b50a707f1c71bb00046a3ba8b549b94 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Tue, 10 Sep 2024 22:15:43 +0800 Subject: [PATCH 03/12] fix: bug in lock sequence of rq and wq --- modules/axtask/src/run_queue.rs | 10 +++++--- modules/axtask/src/wait_queue.rs | 41 +++++++++++++------------------- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index bcc89b606a..4d0663587d 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -289,7 +289,7 @@ impl AxRunQueueInner { } } - pub fn block_current(&mut self, wait_queue_push: F) + pub fn block_current(&mut self, wait_queue_push_locked: F) where F: FnOnce(AxTaskRef), { @@ -298,7 +298,11 @@ impl AxRunQueueInner { assert!(curr.is_running()); assert!(!curr.is_idle()); - wait_queue_push(curr.clone()); + // Push current task to the wait queue. + // The wait queue must be locked before calling this function. + // The lock will be released here inside this closure after the task is pushed to the wait queue. + // So this closure has to be moved here to ensure the lock is released and assertion is correct. + wait_queue_push_locked(curr.clone()); // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] @@ -306,7 +310,7 @@ impl AxRunQueueInner { curr.set_state(TaskState::Blocked); current_run_queue().num_tasks.fetch_sub(1, Ordering::AcqRel); - + self.resched(false); } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index fbf7318320..a0d5e93b61 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -50,8 +50,8 @@ impl WaitQueue { // the event from another queue. if curr.in_wait_queue() { // wake up by timer (timeout). - let mut wait_queue_locked = self.queue.lock(); - wait_queue_locked.retain(|t| !curr.ptr_eq(t)); + let mut wq_locked = self.queue.lock(); + wq_locked.retain(|t| !curr.ptr_eq(t)); curr.set_in_wait_queue(false); } #[cfg(feature = "irq")] @@ -84,18 +84,16 @@ impl WaitQueue { F: Fn() -> bool, { loop { + let mut rq = current_run_queue().scheduler().lock(); let mut wq = self.queue.lock(); if condition() { break; } - current_run_queue() - .scheduler() - .lock() - .block_current(|task| { - task.set_in_wait_queue(true); - wq.push_back(task); - drop(wq) - }); + rq.block_current(|task| { + task.set_in_wait_queue(true); + wq.push_back(task); + drop(wq) + }); } self.cancel_events(crate::current()); } @@ -104,7 +102,6 @@ impl WaitQueue { /// notify it, or the given duration has elapsed. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { - let mut wq = self.queue.lock(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -119,8 +116,7 @@ impl WaitQueue { .lock() .block_current(|task| { task.set_in_wait_queue(true); - wq.push_back(task); - drop(wq) + self.queue.lock().push_back(task) }); let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out self.cancel_events(curr); @@ -148,20 +144,17 @@ impl WaitQueue { let mut timeout = true; while axhal::time::wall_time() < deadline { + let mut rq = current_run_queue().scheduler().lock(); let mut wq = self.queue.lock(); - // let mut rq_locked = current_run_queue().scheduler().lock(); if condition() { timeout = false; break; } - current_run_queue() - .scheduler() - .lock() - .block_current(|task| { - task.set_in_wait_queue(true); - wq.push_back(task); - drop(wq) - }); + rq.block_current(|task| { + task.set_in_wait_queue(true); + wq.push_back(task); + drop(wq) + }); } self.cancel_events(curr); timeout @@ -216,7 +209,7 @@ impl WaitQueue { pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) { // Select run queue by the CPU set of the task. - let mut rq_locked = select_run_queue( + let mut rq = select_run_queue( #[cfg(feature = "smp")] task.clone(), ) @@ -224,5 +217,5 @@ pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) { .lock(); task.set_in_wait_queue(false); - rq_locked.unblock_task(task, resched) + rq.unblock_task(task, resched) } From a1a6290529ac0fcc2014bdb01ec57c5d5b36cdc7 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Wed, 11 Sep 2024 11:07:35 +0800 Subject: [PATCH 04/12] Add lock in run_queue exit_current --- modules/axtask/src/api.rs | 5 ++- modules/axtask/src/run_queue.rs | 58 +++++++++++++++++---------------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 2c01629a65..955199c473 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -170,7 +170,10 @@ pub fn sleep_until(deadline: axhal::time::TimeValue) { /// Exits the current task. pub fn exit(exit_code: i32) -> ! { - current_run_queue().exit_current(exit_code) + current_run_queue() + .scheduler() + .lock() + .exit_current(exit_code) } /// The idle task routine. diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 4d0663587d..5fdb6519d2 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -199,34 +199,6 @@ impl AxRunQueue { pub(crate) fn scheduler(&self) -> &SpinNoIrq { &self.inner } - - pub fn exit_current(&self, exit_code: i32) -> ! { - // We do not own an `SpinNoIrq` lock here, so we need to disable IRQ and preempt manually. - let _kernel_guard = kernel_guard::IrqSave::new(); - - let curr = crate::current(); - debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); - assert!(curr.is_running()); - assert!(!curr.is_idle()); - if curr.is_init() { - EXITED_TASKS.with_current(|exited_tasks| exited_tasks.clear()); - axhal::misc::terminate(); - } else { - curr.set_state(TaskState::Exited); - self.num_tasks.fetch_sub(1, Ordering::AcqRel); - - // Notify the joiner task. - curr.notify_exit(exit_code); - - // Push current task to the `EXITED_TASKS` list, which will be consumed by the GC task. - EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(curr.clone())); - // Wake up the GC task to drop the exited tasks. - WAIT_FOR_EXIT.with_current(|wq| wq.notify_one(false)); - // `SpinNoIrq` lock until now. - self.scheduler().lock().resched(false); - } - unreachable!("task exited!"); - } } /// Core functions of run queue, which should be called after holding the scheduler() lock. @@ -289,6 +261,36 @@ impl AxRunQueueInner { } } + pub fn exit_current(&mut self, exit_code: i32) -> ! { + let curr = crate::current(); + debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + if curr.is_init() { + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.clear()); + axhal::misc::terminate(); + } else { + curr.set_state(TaskState::Exited); + current_run_queue().num_tasks.fetch_sub(1, Ordering::AcqRel); + + // Unlock the run queue before notifying the joiner task. + unsafe { + current_run_queue().inner.force_unlock(); + } + + // Notify the joiner task. + curr.notify_exit(exit_code); + + // Push current task to the `EXITED_TASKS` list, which will be consumed by the GC task. + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(curr.clone())); + // Wake up the GC task to drop the exited tasks. + WAIT_FOR_EXIT.with_current(|wq| wq.notify_one(false)); + // `SpinNoIrq` lock until now. + self.resched(false); + } + unreachable!("task exited!"); + } + pub fn block_current(&mut self, wait_queue_push_locked: F) where F: FnOnce(AxTaskRef), From 14041ead551b9b871f403d0fafe5d3724cc46abe Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Wed, 11 Sep 2024 19:40:22 +0800 Subject: [PATCH 05/12] feat: refactor wait queue and block current --- modules/axtask/src/run_queue.rs | 10 ++++++- modules/axtask/src/wait_queue.rs | 51 +++++++++++++++++++++++--------- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 5fdb6519d2..5416d9c54e 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -240,7 +240,7 @@ impl AxRunQueueInner { #[cfg(feature = "preempt")] pub fn preempt_resched(&mut self) { let curr = crate::current(); - assert!(curr.is_running()); + // assert!(curr.is_running()); // When we get the mutable reference of the run queue, we must // have held the `SpinNoIrq` lock with both IRQs and preemption @@ -291,6 +291,14 @@ impl AxRunQueueInner { unreachable!("task exited!"); } + pub fn yield_blocked(&mut self) { + let curr = crate::current(); + if curr.is_blocked() { + debug!("task yield_blocked: {}", curr.id_name()); + self.resched(false); + } + } + pub fn block_current(&mut self, wait_queue_push_locked: F) where F: FnOnce(AxTaskRef), diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index a0d5e93b61..8110297e44 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -2,7 +2,7 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; use kspin::SpinNoIrq; -use crate::{current_run_queue, select_run_queue, AxTaskRef, CurrentTask}; +use crate::{current_run_queue, select_run_queue, task::TaskState, AxTaskRef, CurrentTask}; /// A queue to store sleeping tasks. /// @@ -84,16 +84,28 @@ impl WaitQueue { F: Fn() -> bool, { loop { - let mut rq = current_run_queue().scheduler().lock(); + // let mut rq = current_run_queue().scheduler().lock(); let mut wq = self.queue.lock(); if condition() { break; } - rq.block_current(|task| { - task.set_in_wait_queue(true); - wq.push_back(task); - drop(wq) - }); + let curr = crate::current(); + debug!("task block: {}", curr.id_name()); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + + curr.set_in_wait_queue(true); + + // we must not block current task with preemption disabled. + #[cfg(feature = "preempt")] + assert!(curr.can_preempt(1)); + + curr.set_state(TaskState::Blocked); + + wq.push_back(curr.clone()); + drop(wq); + + current_run_queue().scheduler().lock().yield_blocked() } self.cancel_events(crate::current()); } @@ -144,17 +156,28 @@ impl WaitQueue { let mut timeout = true; while axhal::time::wall_time() < deadline { - let mut rq = current_run_queue().scheduler().lock(); + // let mut rq = current_run_queue().scheduler().lock(); let mut wq = self.queue.lock(); if condition() { - timeout = false; break; } - rq.block_current(|task| { - task.set_in_wait_queue(true); - wq.push_back(task); - drop(wq) - }); + let curr = crate::current(); + debug!("task block: {}", curr.id_name()); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + + curr.set_in_wait_queue(true); + + // we must not block current task with preemption disabled. + #[cfg(feature = "preempt")] + assert!(curr.can_preempt(1)); + + curr.set_state(TaskState::Blocked); + + wq.push_back(curr.clone()); + drop(wq); + + current_run_queue().scheduler().lock().yield_blocked() } self.cancel_events(curr); timeout From e8c5bfac849e563b6f02aafd8213425201d83f30 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Thu, 12 Sep 2024 21:47:29 +0800 Subject: [PATCH 06/12] fix: use a new task state Blocking to avoid scheduling bug happened in wait and notify --- modules/axtask/src/api.rs | 8 +--- modules/axtask/src/run_queue.rs | 50 ++++++++--------------- modules/axtask/src/task.rs | 59 ++++++++++++++------------- modules/axtask/src/timers.rs | 13 +++--- modules/axtask/src/wait_queue.rs | 68 +++++++++++++++++--------------- 5 files changed, 92 insertions(+), 106 deletions(-) diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 955199c473..0d7b031889 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -99,13 +99,7 @@ pub fn spawn_raw(f: F, name: String, stack_size: usize) -> AxTaskRef where F: FnOnce() + Send + 'static, { - let task = TaskInner::new( - f, - name, - stack_size, - #[cfg(feature = "smp")] - None, - ); + let task = TaskInner::new(f, name, stack_size, None); crate::select_run_queue( #[cfg(feature = "smp")] task.clone(), diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 5416d9c54e..dadbd84948 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -168,7 +168,6 @@ impl AxRunQueue { "gc".into(), axconfig::TASK_STACK_SIZE, // gc task shoule be pinned to the current CPU. - #[cfg(feature = "smp")] Some(1 << cpu_id), ); let mut scheduler = Scheduler::new(); @@ -264,7 +263,7 @@ impl AxRunQueueInner { pub fn exit_current(&mut self, exit_code: i32) -> ! { let curr = crate::current(); debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); - assert!(curr.is_running()); + assert!(curr.is_running(), "task is not running: {:?}", curr.state()); assert!(!curr.is_idle()); if curr.is_init() { EXITED_TASKS.with_current(|exited_tasks| exited_tasks.clear()); @@ -291,43 +290,24 @@ impl AxRunQueueInner { unreachable!("task exited!"); } - pub fn yield_blocked(&mut self) { - let curr = crate::current(); - if curr.is_blocked() { - debug!("task yield_blocked: {}", curr.id_name()); - self.resched(false); - } - } - - pub fn block_current(&mut self, wait_queue_push_locked: F) - where - F: FnOnce(AxTaskRef), - { + pub fn blocked_resched(&mut self) { let curr = crate::current(); + assert!(curr.is_blocking()); debug!("task block: {}", curr.id_name()); - assert!(curr.is_running()); - assert!(!curr.is_idle()); - - // Push current task to the wait queue. - // The wait queue must be locked before calling this function. - // The lock will be released here inside this closure after the task is pushed to the wait queue. - // So this closure has to be moved here to ensure the lock is released and assertion is correct. - wait_queue_push_locked(curr.clone()); - - // we must not block current task with preemption disabled. - #[cfg(feature = "preempt")] - assert!(curr.can_preempt(1)); - - curr.set_state(TaskState::Blocked); - current_run_queue().num_tasks.fetch_sub(1, Ordering::AcqRel); - self.resched(false); } pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { let cpu_id = self.cpu_id; - debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); + + loop { + if !task.is_blocking() { + assert!(task.is_blocked()); + break; + } + } if task.is_blocked() { + debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); task.set_state(TaskState::Ready); self.scheduler.add_task(task); // TODO: priority @@ -354,7 +334,7 @@ impl AxRunQueueInner { let now = axhal::time::wall_time(); if now < deadline { crate::timers::set_alarm_wakeup(deadline, curr.clone()); - curr.set_state(TaskState::Blocked); + curr.set_state(TaskState::Blocking); get_run_queue(self.cpu_id) .num_tasks .fetch_sub(1, Ordering::AcqRel); @@ -374,6 +354,11 @@ impl AxRunQueueInner { self.scheduler.put_prev_task(prev.clone(), preempt); } } + + if prev.is_blocking() { + prev.set_state(TaskState::Blocked); + } + let next = self.scheduler.pick_next_task().unwrap_or_else(|| unsafe { // Safety: IRQs must be disabled at this time. IDLE_TASK.current_ref_raw().get_unchecked().clone() @@ -446,7 +431,6 @@ pub(crate) fn init() { || crate::run_idle(), "idle".into(), IDLE_TASK_STACK_SIZE, - #[cfg(feature = "smp")] Some(1 << cpu_id), ); IDLE_TASK.with_current(|i| { diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index 3f0dd760d0..f101637cc2 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -5,12 +5,10 @@ use core::sync::atomic::AtomicUsize; use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull}; -#[cfg(feature = "smp")] use bitmaps::Bitmap; use memory_addr::{align_up_4k, va, VirtAddr}; use axhal::arch::TaskContext; -#[cfg(feature = "smp")] use axhal::cpu::this_cpu_id; #[cfg(feature = "tls")] use axhal::tls::TlsArea; @@ -25,10 +23,18 @@ pub struct TaskId(u64); #[repr(u8)] #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum TaskState { + /// Task is running on some CPU. Running = 1, + /// Task is ready to run on some scheduler's ready queue. Ready = 2, - Blocked = 3, - Exited = 4, + /// Task is just be blocked and inserted into the wait queue, + /// but still have **NOT finished** its scheduling process. + Blocking = 3, + /// Task is blocked (in the wait queue or timer list), + /// and it has finished its scheduling process, it can be wake up by `notify()` on any run queue safely. + Blocked = 4, + /// Task is exited and waiting for being dropped. + Exited = 5, } /// The inner task structure. @@ -42,7 +48,6 @@ pub struct TaskInner { state: AtomicU8, /// CPU affinity mask. - #[cfg(feature = "smp")] cpu_set: Bitmap<{ axconfig::SMP }>, in_wait_queue: AtomicBool, @@ -82,8 +87,9 @@ impl From for TaskState { match state { 1 => Self::Running, 2 => Self::Ready, - 3 => Self::Blocked, - 4 => Self::Exited, + 3 => Self::Blocking, + 4 => Self::Blocked, + 5 => Self::Exited, _ => unreachable!(), } } @@ -128,7 +134,6 @@ impl TaskInner { is_init: false, entry: None, state: AtomicU8::new(TaskState::Ready as u8), - #[cfg(feature = "smp")] cpu_set: Bitmap::new(), in_wait_queue: AtomicBool::new(false), #[cfg(feature = "irq")] @@ -160,7 +165,7 @@ impl TaskInner { entry: F, name: String, stack_size: usize, - #[cfg(feature = "smp")] cpu_set: Option, + cpu_set: Option, ) -> AxTaskRef where F: FnOnce() + Send + 'static, @@ -180,24 +185,21 @@ impl TaskInner { if t.name == "idle" { t.is_idle = true; } - #[cfg(feature = "smp")] - { - t.cpu_set = match cpu_set { - Some(cpu_set) => { - let mut bit_map = Bitmap::new(); - let mut i = 0; - while i < axconfig::SMP { - if cpu_set & (1 << i) != 0 { - bit_map.set(i, true); - } - i += 1; + t.cpu_set = match cpu_set { + Some(cpu_set) => { + let mut bit_map = Bitmap::new(); + let mut i = 0; + while i < axconfig::SMP { + if cpu_set & (1 << i) != 0 { + bit_map.set(i, true); } - bit_map + i += 1; } - // This task can be scheduled on all CPUs by default. - None => Bitmap::mask(axconfig::SMP), - }; - } + bit_map + } + // This task can be scheduled on all CPUs by default. + None => Bitmap::mask(axconfig::SMP), + }; Arc::new(AxTask::new(t)) } @@ -212,7 +214,6 @@ impl TaskInner { pub(crate) fn new_init(name: String) -> AxTaskRef { let mut t = Self::new_common(TaskId::new(), name); t.is_init = true; - #[cfg(feature = "smp")] t.cpu_set.set(this_cpu_id(), true); if t.name == "idle" { t.is_idle = true; @@ -245,6 +246,11 @@ impl TaskInner { matches!(self.state(), TaskState::Blocked) } + #[inline] + pub(crate) fn is_blocking(&self) -> bool { + matches!(self.state(), TaskState::Blocking) + } + #[inline] pub(crate) const fn is_init(&self) -> bool { self.is_init @@ -255,7 +261,6 @@ impl TaskInner { self.is_idle } - #[cfg(feature = "smp")] #[inline] pub(crate) const fn cpu_set(&self) -> Bitmap<{ axconfig::SMP }> { self.cpu_set diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index ca7afbfa47..b25e9bd2f3 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -1,6 +1,5 @@ use alloc::sync::Arc; -use kspin::SpinNoIrq; use lazyinit::LazyInit; use timer_list::{TimeValue, TimerEvent, TimerList}; @@ -9,7 +8,7 @@ use axhal::time::wall_time; use crate::{select_run_queue, AxTaskRef}; #[percpu::def_percpu] -static TIMER_LIST: LazyInit>> = LazyInit::new(); +static TIMER_LIST: LazyInit> = LazyInit::new(); struct TaskWakeupEvent(AxTaskRef); @@ -34,24 +33,22 @@ impl TimerEvent for TaskWakeupEvent { pub fn set_alarm_wakeup(deadline: TimeValue, task: AxTaskRef) { TIMER_LIST.with_current(|timer_list| { - let mut timers = timer_list.lock(); task.set_in_timer_list(true); - timers.set(deadline, TaskWakeupEvent(task)); + timer_list.set(deadline, TaskWakeupEvent(task)); }) } pub fn cancel_alarm(task: &AxTaskRef) { TIMER_LIST.with_current(|timer_list| { - let mut timers = timer_list.lock(); task.set_in_timer_list(false); - timers.cancel(|t| Arc::ptr_eq(&t.0, task)); + timer_list.cancel(|t| Arc::ptr_eq(&t.0, task)); }) } pub fn check_events() { loop { let now = wall_time(); - let event = TIMER_LIST.with_current(|timers| timers.lock().expire_one(now)); + let event = TIMER_LIST.with_current(|timer_list| timer_list.expire_one(now)); if let Some((_deadline, event)) = event { event.callback(now); } else { @@ -62,6 +59,6 @@ pub fn check_events() { pub fn init() { TIMER_LIST.with_current(|timer_list| { - timer_list.init_once(SpinNoIrq::new(TimerList::new())); + timer_list.init_once(TimerList::new()); }); } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index 8110297e44..0512350054 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -61,16 +61,34 @@ impl WaitQueue { } } + fn push_to_wait_queue(&self) { + let curr = crate::current(); + assert!(curr.is_running()); + assert!(!curr.is_idle()); + let mut wq_locked = self.queue.lock(); + // we must not block current task with preemption disabled. + #[cfg(feature = "preempt")] + assert!(curr.can_preempt(1)); + wq_locked.push_back(curr.clone()); + + // We set task state as `Blocking` to clarify that the task is blocked + // but **still NOT** finished its scheduling process. + // + // When another task (generally on another run queue) try to unblock this task, + // * if this task's state is still `Blocking`: + // it can just change this task's state to `Running` and this task will come back to running on this run queue again. + // * if this task's state is `Blocked`: + // it means this task is blocked and finished its scheduling process, in another word, it has left current run queue, + // so this task can be scheduled on any run queue. + curr.set_state(TaskState::Blocking); + curr.set_in_wait_queue(true); + } + /// Blocks the current task and put it into the wait queue, until other task /// notifies it. pub fn wait(&self) { - current_run_queue() - .scheduler() - .lock() - .block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + self.push_to_wait_queue(); + current_run_queue().scheduler().lock().blocked_resched(); self.cancel_events(crate::current()); } @@ -84,28 +102,24 @@ impl WaitQueue { F: Fn() -> bool, { loop { - // let mut rq = current_run_queue().scheduler().lock(); let mut wq = self.queue.lock(); if condition() { break; } let curr = crate::current(); - debug!("task block: {}", curr.id_name()); assert!(curr.is_running()); assert!(!curr.is_idle()); - curr.set_in_wait_queue(true); - // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] assert!(curr.can_preempt(1)); - - curr.set_state(TaskState::Blocked); - wq.push_back(curr.clone()); + + curr.set_state(TaskState::Blocking); + curr.set_in_wait_queue(true); drop(wq); - current_run_queue().scheduler().lock().yield_blocked() + current_run_queue().scheduler().lock().blocked_resched() } self.cancel_events(crate::current()); } @@ -123,13 +137,9 @@ impl WaitQueue { ); crate::timers::set_alarm_wakeup(deadline, curr.clone()); - current_run_queue() - .scheduler() - .lock() - .block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + self.push_to_wait_queue(); + current_run_queue().scheduler().lock().blocked_resched(); + let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out self.cancel_events(curr); timeout @@ -156,28 +166,24 @@ impl WaitQueue { let mut timeout = true; while axhal::time::wall_time() < deadline { - // let mut rq = current_run_queue().scheduler().lock(); let mut wq = self.queue.lock(); if condition() { + timeout = false; break; } - let curr = crate::current(); - debug!("task block: {}", curr.id_name()); assert!(curr.is_running()); assert!(!curr.is_idle()); - curr.set_in_wait_queue(true); - // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] assert!(curr.can_preempt(1)); - - curr.set_state(TaskState::Blocked); - wq.push_back(curr.clone()); + + curr.set_state(TaskState::Blocking); + curr.set_in_wait_queue(true); drop(wq); - current_run_queue().scheduler().lock().yield_blocked() + current_run_queue().scheduler().lock().blocked_resched() } self.cancel_events(curr); timeout From ffe843359e706a22aba3c1436556bb309b52b877 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Fri, 13 Sep 2024 14:55:58 +0800 Subject: [PATCH 07/12] Use percpu_static macro --- modules/axtask/src/run_queue.rs | 24 ++++++++++++++---------- modules/axtask/src/timers.rs | 5 +++-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index dadbd84948..b1816777cb 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -14,17 +14,21 @@ use axhal::cpu::this_cpu_id; use crate::task::{CurrentTask, TaskState}; use crate::{AxTaskRef, Scheduler, TaskInner, WaitQueue}; -#[percpu::def_percpu] -static RUN_QUEUE: LazyInit = LazyInit::new(); - -#[percpu::def_percpu] -static EXITED_TASKS: VecDeque = VecDeque::new(); - -#[percpu::def_percpu] -static WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(); +macro_rules! percpu_static { + ($($name:ident: $ty:ty = $init:expr),* $(,)?) => { + $( + #[percpu::def_percpu] + static $name: $ty = $init; + )* + }; +} -#[percpu::def_percpu] -static IDLE_TASK: LazyInit = LazyInit::new(); +percpu_static! { + RUN_QUEUE: LazyInit = LazyInit::new(), + EXITED_TASKS: VecDeque = VecDeque::new(), + WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(), + IDLE_TASK: LazyInit = LazyInit::new(), +} /// An array of references to run queues, one for each CPU, indexed by cpu_id. /// diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index b25e9bd2f3..8925ee7923 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -7,8 +7,9 @@ use axhal::time::wall_time; use crate::{select_run_queue, AxTaskRef}; -#[percpu::def_percpu] -static TIMER_LIST: LazyInit> = LazyInit::new(); +percpu_static! { + TIMER_LIST: LazyInit> = LazyInit::new(), +} struct TaskWakeupEvent(AxTaskRef); From a5c65244ee459b7d936a95d63be147e87c5c603a Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Fri, 13 Sep 2024 22:34:53 +0800 Subject: [PATCH 08/12] wip: refactor run queue lock --- Cargo.lock | 2 +- modules/axtask/Cargo.toml | 3 +- modules/axtask/src/api.rs | 21 +++------- modules/axtask/src/run_queue.rs | 66 +++++++++++++++----------------- modules/axtask/src/task.rs | 3 +- modules/axtask/src/timers.rs | 14 ++----- modules/axtask/src/wait_queue.rs | 20 +++++----- 7 files changed, 51 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd1c3dcfa2..9c334a92c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,7 +1315,7 @@ checksum = "e6e36312fb5ddc10d08ecdc65187402baba4ac34585cb9d1b78522ae2358d890" [[package]] name = "scheduler" version = "0.1.0" -source = "git+https://github.com/arceos-org/scheduler.git?tag=v0.1.0#c8d25d9aed146dca28dc8987afd229b52c20361a" +source = "git+https://github.com/arceos-org/scheduler.git?branch=num_tasks#6ca9626da9892dd85f8485a7b1649a93302ac90c" dependencies = [ "linked_list", ] diff --git a/modules/axtask/Cargo.toml b/modules/axtask/Cargo.toml index 09ef12da1b..ec5a16ab3e 100644 --- a/modules/axtask/Cargo.toml +++ b/modules/axtask/Cargo.toml @@ -47,7 +47,8 @@ memory_addr = { version = "0.3", optional = true } timer_list = { version = "0.1", optional = true } kernel_guard = { version = "0.1", optional = true } crate_interface = { version = "0.1", optional = true } -scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true } +# scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true } +scheduler = { git = "https://github.com/arceos-org/scheduler.git", branch = "num_tasks", optional = true } [dev-dependencies] rand = "0.8" diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 0d7b031889..4c13d553a1 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -86,10 +86,7 @@ pub fn init_scheduler_secondary() { #[doc(cfg(feature = "irq"))] pub fn on_timer_tick() { crate::timers::check_events(); - current_run_queue() - .scheduler() - .lock() - .scheduler_timer_tick(); + current_run_queue().scheduler_timer_tick(); } /// Spawns a new task with the given parameters. @@ -104,8 +101,6 @@ where #[cfg(feature = "smp")] task.clone(), ) - .scheduler() - .lock() .add_task(task.clone()); task } @@ -133,16 +128,13 @@ where /// /// [CFS]: https://en.wikipedia.org/wiki/Completely_Fair_Scheduler pub fn set_priority(prio: isize) -> bool { - current_run_queue() - .scheduler() - .lock() - .set_current_priority(prio) + current_run_queue().set_current_priority(prio) } /// Current task gives up the CPU time voluntarily, and switches to another /// ready task. pub fn yield_now() { - current_run_queue().scheduler().lock().yield_current() + current_run_queue().yield_current() } /// Current task is going to sleep for the given duration. @@ -157,17 +149,14 @@ pub fn sleep(dur: core::time::Duration) { /// If the feature `irq` is not enabled, it uses busy-wait instead. pub fn sleep_until(deadline: axhal::time::TimeValue) { #[cfg(feature = "irq")] - current_run_queue().scheduler().lock().sleep_until(deadline); + current_run_queue().sleep_until(deadline); #[cfg(not(feature = "irq"))] axhal::time::busy_wait_until(deadline); } /// Exits the current task. pub fn exit(exit_code: i32) -> ! { - current_run_queue() - .scheduler() - .lock() - .exit_current(exit_code) + current_run_queue().exit_current(exit_code) } /// The idle task routine. diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index b1816777cb..7d2376fcb0 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -148,21 +148,12 @@ pub(crate) fn select_run_queue(#[cfg(feature = "smp")] task: AxTaskRef) -> &'sta /// AxRunQueue represents a run queue for global system or a specific CPU. pub(crate) struct AxRunQueue { - /// The ID of the CPU this run queue is associated with. - cpu_id: usize, - /// The number of tasks currently in the run queue. - num_tasks: AtomicUsize, - /// The inner structure of the run queue, protected by a SpinNoIrq lock to ensure thread safety. - inner: SpinNoIrq, -} - -/// A structure that holds the core components of a run queue. -/// protected by a `SpinNoIrq` lock to ensure thread safety during scheduling. -pub struct AxRunQueueInner { /// The ID of the CPU this run queue is associated with. cpu_id: usize, /// The core scheduler of this run queue. scheduler: Scheduler, + /// The lock to ensure thread safety during scheduling. + lock: SpinNoIrq<()>, } impl AxRunQueue { @@ -174,12 +165,13 @@ impl AxRunQueue { // gc task shoule be pinned to the current CPU. Some(1 << cpu_id), ); + let mut scheduler = Scheduler::new(); scheduler.add_task(gc_task); Self { cpu_id, - num_tasks: AtomicUsize::new(2), - inner: SpinNoIrq::new(AxRunQueueInner { cpu_id, scheduler }), + scheduler, + lock: SpinNoIrq::new(()), } } @@ -189,23 +181,23 @@ impl AxRunQueue { self.cpu_id } - /// Returns the number of tasks in current run queue, + /// Returns the number of tasks in current run queue's scheduler, /// which is used for load balance during scheduling. #[cfg(feature = "smp")] pub fn num_tasks(&self) -> usize { - self.num_tasks.load(Ordering::Acquire) + self.scheduler.num_tasks() } /// Returns a reference to the inner scheduler of the run queue locked by a `SpinNoIrq` lock. /// Note: the scheduler lock is explicitly held during the scheduling process where task scheduling may happen, /// it is explicitly released before the context switch by `force_unlock()`. - pub(crate) fn scheduler(&self) -> &SpinNoIrq { - &self.inner + pub(crate) fn scheduler(&self) -> &SpinNoIrq<()> { + &self.lock } } -/// Core functions of run queue, which should be called after holding the scheduler() lock. -impl AxRunQueueInner { +/// Core functions of run queue, which should be called with lock held. +impl AxRunQueue { pub fn add_task(&mut self, task: AxTaskRef) { debug!( "task spawn: {} on run_queue {}", @@ -213,14 +205,13 @@ impl AxRunQueueInner { self.cpu_id ); assert!(task.is_ready()); + let _lock = self.lock.lock(); self.scheduler.add_task(task); - get_run_queue(self.cpu_id) - .num_tasks - .fetch_add(1, Ordering::AcqRel); } #[cfg(feature = "irq")] pub fn scheduler_timer_tick(&mut self) { + let _lock = self.lock.lock(); let curr = crate::current(); if !curr.is_idle() && self.scheduler.task_tick(curr.as_task_ref()) { #[cfg(feature = "preempt")] @@ -229,6 +220,7 @@ impl AxRunQueueInner { } pub fn yield_current(&mut self) { + let _lock = self.lock.lock(); let curr = crate::current(); trace!("task yield: {}", curr.id_name()); assert!(curr.is_running()); @@ -236,12 +228,14 @@ impl AxRunQueueInner { } pub fn set_current_priority(&mut self, prio: isize) -> bool { + let _lock = self.lock.lock(); self.scheduler .set_priority(crate::current().as_task_ref(), prio) } #[cfg(feature = "preempt")] pub fn preempt_resched(&mut self) { + let _lock = self.lock.lock(); let curr = crate::current(); // assert!(curr.is_running()); @@ -265,6 +259,7 @@ impl AxRunQueueInner { } pub fn exit_current(&mut self, exit_code: i32) -> ! { + let _lock = self.lock.lock(); let curr = crate::current(); debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); assert!(curr.is_running(), "task is not running: {:?}", curr.state()); @@ -295,6 +290,7 @@ impl AxRunQueueInner { } pub fn blocked_resched(&mut self) { + let _lock = self.lock.lock(); let curr = crate::current(); assert!(curr.is_blocking()); debug!("task block: {}", curr.id_name()); @@ -302,23 +298,23 @@ impl AxRunQueueInner { } pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { + let _lock = self.lock.lock(); let cpu_id = self.cpu_id; - loop { - if !task.is_blocking() { - assert!(task.is_blocked()); - break; + // When task's state is Blocking, it has not finished its scheduling process. + if task.is_blocking() { + while task.is_blocking() { + // Wait for the task to finish its scheduling process. + core::hint::spin_loop(); } + assert!(task.is_blocked()) } + if task.is_blocked() { debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); task.set_state(TaskState::Ready); self.scheduler.add_task(task); // TODO: priority - get_run_queue(cpu_id) - .num_tasks - .fetch_add(1, Ordering::AcqRel); - // Note: when the task is unblocked on another CPU's run queue, // we just ingiore the `resched` flag. if resched && cpu_id == this_cpu_id() { @@ -330,6 +326,7 @@ impl AxRunQueueInner { #[cfg(feature = "irq")] pub fn sleep_until(&mut self, deadline: axhal::time::TimeValue) { + let _lock = self.lock.lock(); let curr = crate::current(); debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline); assert!(curr.is_running()); @@ -339,15 +336,12 @@ impl AxRunQueueInner { if now < deadline { crate::timers::set_alarm_wakeup(deadline, curr.clone()); curr.set_state(TaskState::Blocking); - get_run_queue(self.cpu_id) - .num_tasks - .fetch_sub(1, Ordering::AcqRel); self.resched(false); } } } -impl AxRunQueueInner { +impl AxRunQueue { /// Common reschedule subroutine. If `preempt`, keep current task's time /// slice, otherwise reset it. fn resched(&mut self, preempt: bool) { @@ -396,8 +390,8 @@ impl AxRunQueueInner { CurrentTask::set_current(prev_task, next_task); - // Release the lock that was explicitly acquired by `scheduler()`. - crate::current_run_queue().scheduler().force_unlock(); + // Release the lock that was explicitly acquired. + crate::current_run_queue().lock.force_unlock(); (*prev_ctx_ptr).switch_to(&*next_ctx_ptr); } diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index f101637cc2..e2065e204d 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -319,9 +319,8 @@ impl TaskInner { fn current_check_preempt_pending() { let curr = crate::current(); if curr.need_resched.load(Ordering::Acquire) && curr.can_preempt(0) { - let mut rq_locked = crate::current_run_queue().scheduler().lock(); if curr.need_resched.load(Ordering::Acquire) { - rq_locked.preempt_resched(); + crate::current_run_queue().preempt_resched() } } } diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index 8925ee7923..cb55dde24c 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -15,20 +15,12 @@ struct TaskWakeupEvent(AxTaskRef); impl TimerEvent for TaskWakeupEvent { fn callback(self, _now: TimeValue) { - // Originally, irq and preempt are disabled by SpinNoIrq lock hold by RUN_QUEUE. - // But, we can't use RUN_QUEUE here, so we need to disable irq and preempt manually. - // Todo: figure out if `NoPreempt` is needed here. - // let _guard = kernel_guard::NoPreemptIrqSave::new(); - let mut rq_locked = select_run_queue( + self.0.set_in_timer_list(false); + select_run_queue( #[cfg(feature = "smp")] self.0.clone(), ) - .scheduler() - .lock(); - - self.0.set_in_timer_list(false); - - rq_locked.unblock_task(self.0, true); + .unblock_task(self.0, true) } } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index 0512350054..29d5758bb6 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -76,7 +76,7 @@ impl WaitQueue { // // When another task (generally on another run queue) try to unblock this task, // * if this task's state is still `Blocking`: - // it can just change this task's state to `Running` and this task will come back to running on this run queue again. + // it needs to wait for this task's state to be changed to `Blocked`, which means it has finished its scheduling process. // * if this task's state is `Blocked`: // it means this task is blocked and finished its scheduling process, in another word, it has left current run queue, // so this task can be scheduled on any run queue. @@ -88,7 +88,7 @@ impl WaitQueue { /// notifies it. pub fn wait(&self) { self.push_to_wait_queue(); - current_run_queue().scheduler().lock().blocked_resched(); + current_run_queue().blocked_resched(); self.cancel_events(crate::current()); } @@ -119,7 +119,7 @@ impl WaitQueue { curr.set_in_wait_queue(true); drop(wq); - current_run_queue().scheduler().lock().blocked_resched() + current_run_queue().blocked_resched() } self.cancel_events(crate::current()); } @@ -138,7 +138,7 @@ impl WaitQueue { crate::timers::set_alarm_wakeup(deadline, curr.clone()); self.push_to_wait_queue(); - current_run_queue().scheduler().lock().blocked_resched(); + current_run_queue().blocked_resched(); let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out self.cancel_events(curr); @@ -183,7 +183,7 @@ impl WaitQueue { curr.set_in_wait_queue(true); drop(wq); - current_run_queue().scheduler().lock().blocked_resched() + current_run_queue().blocked_resched() } self.cancel_events(curr); timeout @@ -237,14 +237,12 @@ impl WaitQueue { } pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) { + // Mark task as not in wait queue. + task.set_in_wait_queue(false); // Select run queue by the CPU set of the task. - let mut rq = select_run_queue( + select_run_queue( #[cfg(feature = "smp")] task.clone(), ) - .scheduler() - .lock(); - - task.set_in_wait_queue(false); - rq.unblock_task(task, resched) + .unblock_task(task, resched) } From 4f87b001ba58ebe8ed8d106ac733bd8c6e2b2517 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Mon, 16 Sep 2024 17:42:56 +0800 Subject: [PATCH 09/12] refactor: move run queue lock into scheduler crate --- Cargo.lock | 3 +- modules/axtask/src/run_queue.rs | 99 +++++++++------------ modules/axtask/src/task.rs | 1 + modules/axtask/src/wait_queue.rs | 3 +- strange_irq_log.txt | 148 +++++++++++++++++++++++++++++++ 5 files changed, 196 insertions(+), 58 deletions(-) create mode 100644 strange_irq_log.txt diff --git a/Cargo.lock b/Cargo.lock index 9c334a92c6..8db9882c99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,8 +1315,9 @@ checksum = "e6e36312fb5ddc10d08ecdc65187402baba4ac34585cb9d1b78522ae2358d890" [[package]] name = "scheduler" version = "0.1.0" -source = "git+https://github.com/arceos-org/scheduler.git?branch=num_tasks#6ca9626da9892dd85f8485a7b1649a93302ac90c" +source = "git+https://github.com/arceos-org/scheduler.git?branch=num_tasks#415a620347722cb734fa440d9103065690a5853b" dependencies = [ + "kspin", "linked_list", ] diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 7d2376fcb0..6123e4f639 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -1,11 +1,8 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; use core::mem::MaybeUninit; -use core::sync::atomic::{AtomicUsize, Ordering}; -#[cfg(feature = "smp")] use bitmaps::Bitmap; -use kspin::SpinNoIrq; use lazyinit::LazyInit; use scheduler::BaseScheduler; @@ -39,9 +36,9 @@ percpu_static! { /// Access to this variable is marked as `unsafe` because it contains `MaybeUninit` references, /// which require careful handling to avoid undefined behavior. The array should be fully /// initialized before being accessed to ensure safe usage. -static mut RUN_QUEUES: [MaybeUninit<&'static AxRunQueue>; axconfig::SMP] = - [MaybeUninit::uninit(); axconfig::SMP]; - +static mut RUN_QUEUES: [MaybeUninit<&'static mut AxRunQueue>; axconfig::SMP] = + [ARRAY_REPEAT_VALUE; axconfig::SMP]; +const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::uninit(); /// Returns a reference to the current run queue. /// /// ## Safety @@ -55,8 +52,8 @@ static mut RUN_QUEUES: [MaybeUninit<&'static AxRunQueue>; axconfig::SMP] = /// /// A static reference to the current run queue. #[inline] -pub(crate) fn current_run_queue() -> &'static AxRunQueue { - unsafe { RUN_QUEUE.current_ref_raw() } +pub(crate) fn current_run_queue() -> &'static mut AxRunQueue { + unsafe { RUN_QUEUE.current_ref_mut_raw() } } /// Selects the run queue index based on a CPU set bitmap, minimizing the number of tasks. @@ -82,10 +79,10 @@ fn select_run_queue_index(cpu_set: Bitmap<{ axconfig::SMP }>) -> usize { unsafe { RUN_QUEUES .iter() - .filter(|rq| cpu_set.get(rq.assume_init().cpu_id())) - .min_by_key(|rq| rq.assume_init().num_tasks()) + .filter(|rq| cpu_set.get(rq.assume_init_ref().cpu_id())) + .min_by_key(|rq| rq.assume_init_ref().num_tasks()) .expect("No available run queue that matches the CPU set") - .assume_init() + .assume_init_ref() .cpu_id() } } @@ -108,9 +105,9 @@ fn select_run_queue_index(cpu_set: Bitmap<{ axconfig::SMP }>) -> usize { /// This function will panic if the index is out of bounds. /// #[inline] -fn get_run_queue(index: usize) -> &'static AxRunQueue { +fn get_run_queue(index: usize) -> &'static mut AxRunQueue { assert!(index < axconfig::SMP); - unsafe { RUN_QUEUES[index].assume_init() } + unsafe { RUN_QUEUES[index].assume_init_mut() } } /// Selects the appropriate run queue for the provided task. @@ -132,7 +129,7 @@ fn get_run_queue(index: usize) -> &'static AxRunQueue { /// 2. Use a more generic load balancing algorithm that can be customized or replaced. /// #[inline] -pub(crate) fn select_run_queue(#[cfg(feature = "smp")] task: AxTaskRef) -> &'static AxRunQueue { +pub(crate) fn select_run_queue(#[cfg(feature = "smp")] task: AxTaskRef) -> &'static mut AxRunQueue { #[cfg(not(feature = "smp"))] { // When SMP is disabled, all tasks are scheduled on the same global run queue. @@ -152,8 +149,6 @@ pub(crate) struct AxRunQueue { cpu_id: usize, /// The core scheduler of this run queue. scheduler: Scheduler, - /// The lock to ensure thread safety during scheduling. - lock: SpinNoIrq<()>, } impl AxRunQueue { @@ -168,11 +163,7 @@ impl AxRunQueue { let mut scheduler = Scheduler::new(); scheduler.add_task(gc_task); - Self { - cpu_id, - scheduler, - lock: SpinNoIrq::new(()), - } + Self { cpu_id, scheduler } } /// Returns the cpu id of current run queue, @@ -187,16 +178,9 @@ impl AxRunQueue { pub fn num_tasks(&self) -> usize { self.scheduler.num_tasks() } - - /// Returns a reference to the inner scheduler of the run queue locked by a `SpinNoIrq` lock. - /// Note: the scheduler lock is explicitly held during the scheduling process where task scheduling may happen, - /// it is explicitly released before the context switch by `force_unlock()`. - pub(crate) fn scheduler(&self) -> &SpinNoIrq<()> { - &self.lock - } } -/// Core functions of run queue, which should be called with lock held. +/// Core functions of run queue. impl AxRunQueue { pub fn add_task(&mut self, task: AxTaskRef) { debug!( @@ -205,13 +189,11 @@ impl AxRunQueue { self.cpu_id ); assert!(task.is_ready()); - let _lock = self.lock.lock(); self.scheduler.add_task(task); } #[cfg(feature = "irq")] pub fn scheduler_timer_tick(&mut self) { - let _lock = self.lock.lock(); let curr = crate::current(); if !curr.is_idle() && self.scheduler.task_tick(curr.as_task_ref()) { #[cfg(feature = "preempt")] @@ -220,7 +202,7 @@ impl AxRunQueue { } pub fn yield_current(&mut self) { - let _lock = self.lock.lock(); + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); trace!("task yield: {}", curr.id_name()); assert!(curr.is_running()); @@ -228,22 +210,21 @@ impl AxRunQueue { } pub fn set_current_priority(&mut self, prio: isize) -> bool { - let _lock = self.lock.lock(); self.scheduler .set_priority(crate::current().as_task_ref(), prio) } #[cfg(feature = "preempt")] pub fn preempt_resched(&mut self) { - let _lock = self.lock.lock(); + // There is no need to disable IRQ and preemption here, because + // they both have been disabled in `current_check_preempt_pending`. let curr = crate::current(); // assert!(curr.is_running()); - // When we get the mutable reference of the run queue, we must - // have held the `SpinNoIrq` lock with both IRQs and preemption - // disabled. So we need to set `current_disable_count` to 1 in - // `can_preempt()` to obtain the preemption permission before - // locking the run queue. + // When we call `preempt_resched()`, both IRQs and preemption must + // have been disabled by `kernel_guard::NoPreemptIrqSave`. So we need + // to set `current_disable_count` to 1 in `can_preempt()` to obtain + // the preemption permission. let can_preempt = curr.can_preempt(1); debug!( @@ -259,7 +240,8 @@ impl AxRunQueue { } pub fn exit_current(&mut self, exit_code: i32) -> ! { - let _lock = self.lock.lock(); + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); + let curr = crate::current(); debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); assert!(curr.is_running(), "task is not running: {:?}", curr.state()); @@ -269,12 +251,6 @@ impl AxRunQueue { axhal::misc::terminate(); } else { curr.set_state(TaskState::Exited); - current_run_queue().num_tasks.fetch_sub(1, Ordering::AcqRel); - - // Unlock the run queue before notifying the joiner task. - unsafe { - current_run_queue().inner.force_unlock(); - } // Notify the joiner task. curr.notify_exit(exit_code); @@ -283,22 +259,31 @@ impl AxRunQueue { EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(curr.clone())); // Wake up the GC task to drop the exited tasks. WAIT_FOR_EXIT.with_current(|wq| wq.notify_one(false)); - // `SpinNoIrq` lock until now. + // Schedule to next task. self.resched(false); } unreachable!("task exited!"); } pub fn blocked_resched(&mut self) { - let _lock = self.lock.lock(); + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); - assert!(curr.is_blocking()); + assert!( + curr.is_blocking() || curr.is_running(), + "task is not blocking or running: {:?}", + curr.state() + ); + + // Current task may have been woken up on another CPU. + if curr.is_running() { + return; + } + debug!("task block: {}", curr.id_name()); self.resched(false); } pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { - let _lock = self.lock.lock(); let cpu_id = self.cpu_id; // When task's state is Blocking, it has not finished its scheduling process. @@ -326,7 +311,7 @@ impl AxRunQueue { #[cfg(feature = "irq")] pub fn sleep_until(&mut self, deadline: axhal::time::TimeValue) { - let _lock = self.lock.lock(); + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline); assert!(curr.is_running()); @@ -361,6 +346,11 @@ impl AxRunQueue { // Safety: IRQs must be disabled at this time. IDLE_TASK.current_ref_raw().get_unchecked().clone() }); + assert!( + next.is_ready(), + "next task is not ready: {:?}", + next.state() + ); self.switch_to(prev, next); } @@ -390,9 +380,6 @@ impl AxRunQueue { CurrentTask::set_current(prev_task, next_task); - // Release the lock that was explicitly acquired. - crate::current_run_queue().lock.force_unlock(); - (*prev_ctx_ptr).switch_to(&*next_ctx_ptr); } } @@ -444,7 +431,7 @@ pub(crate) fn init() { rq.init_once(AxRunQueue::new(cpu_id)); }); unsafe { - RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_raw()); + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw()); } } @@ -462,6 +449,6 @@ pub(crate) fn init_secondary() { rq.init_once(AxRunQueue::new(cpu_id)); }); unsafe { - RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_raw()); + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw()); } } diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index e2065e204d..09bb1d4c11 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -319,6 +319,7 @@ impl TaskInner { fn current_check_preempt_pending() { let curr = crate::current(); if curr.need_resched.load(Ordering::Acquire) && curr.can_preempt(0) { + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); if curr.need_resched.load(Ordering::Acquire) { crate::current_run_queue().preempt_resched() } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index 29d5758bb6..d90e55d073 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -69,7 +69,6 @@ impl WaitQueue { // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] assert!(curr.can_preempt(1)); - wq_locked.push_back(curr.clone()); // We set task state as `Blocking` to clarify that the task is blocked // but **still NOT** finished its scheduling process. @@ -82,6 +81,8 @@ impl WaitQueue { // so this task can be scheduled on any run queue. curr.set_state(TaskState::Blocking); curr.set_in_wait_queue(true); + + wq_locked.push_back(curr.clone()); } /// Blocks the current task and put it into the wait queue, until other task diff --git a/strange_irq_log.txt b/strange_irq_log.txt new file mode 100644 index 0000000000..21653c9c59 --- /dev/null +++ b/strange_irq_log.txt @@ -0,0 +1,148 @@ +➜ arceos git:(percpu_run_queue) ✗ make A=../arceos-apps/rust/task/parallel SMP=4 LOG=info FEATURES=sched_cfs ARCH=aarch64 ACCEL=n run + Building App: parallel, Arch: aarch64, Platform: aarch64-qemu-virt, App type: rust +cargo -C ../arceos-apps/rust/task/parallel build -Z unstable-options --target aarch64-unknown-none-softfloat --target-dir /home/hky/workspace/arceos-org/arceos/target --release --features "axstd/log-level-info axstd/sched_cfs axstd/smp" + Finished `release` profile [optimized] target(s) in 0.06s +rust-objcopy --binary-architecture=aarch64 ../arceos-apps/rust/task/parallel/parallel_aarch64-qemu-virt.elf --strip-all -O binary ../arceos-apps/rust/task/parallel/parallel_aarch64-qemu-virt.bin + Running on qemu... +qemu-system-aarch64 -m 128M -smp 4 -cpu cortex-a72 -machine virt -kernel ../arceos-apps/rust/task/parallel/parallel_aarch64-qemu-virt.bin -nographic + + d8888 .d88888b. .d8888b. + d88888 d88P" "Y88b d88P Y88b + d88P888 888 888 Y88b. + d88P 888 888d888 .d8888b .d88b. 888 888 "Y888b. + d88P 888 888P" d88P" d8P Y8b 888 888 "Y88b. + d88P 888 888 888 88888888 888 888 "888 + d8888888888 888 Y88b. Y8b. Y88b. .d88P Y88b d88P +d88P 888 888 "Y8888P "Y8888 "Y88888P" "Y8888P" + +arch = aarch64 +platform = aarch64-qemu-virt +target = aarch64-unknown-none-softfloat +smp = 4 +build_mode = release +log_level = info + +[ 0.002825 axruntime:130] Logging is enabled. +[ 0.003531 axruntime:131] Primary CPU 0 started, dtb = 0x44000000. +[ 0.003749 axruntime:133] Found physcial memory regions: +[ 0.003990 axruntime:135] [PA:0x40080000, PA:0x40092000) .text (READ | EXECUTE | RESERVED) +[ 0.004268 axruntime:135] [PA:0x40092000, PA:0x40098000) .rodata (READ | RESERVED) +[ 0.004724 axruntime:135] [PA:0x40098000, PA:0x4009c000) .data .tdata .tbss .percpu (READ | WRITE | RESERVED) +[ 0.005142 axruntime:135] [PA:0x4009c000, PA:0x4019c000) boot stack (READ | WRITE | RESERVED) +[ 0.005632 axruntime:135] [PA:0x4019c000, PA:0x401c2000) .bss (READ | WRITE | RESERVED) +[ 0.006012 axruntime:135] [PA:0x401c2000, PA:0x48000000) free memory (READ | WRITE | FREE) +[ 0.006429 axruntime:135] [PA:0x9000000, PA:0x9001000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.006839 axruntime:135] [PA:0x9100000, PA:0x9101000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.007264 axruntime:135] [PA:0x8000000, PA:0x8020000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.007797 axruntime:135] [PA:0xa000000, PA:0xa004000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.008228 axruntime:135] [PA:0x10000000, PA:0x3eff0000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.008365 axruntime:135] [PA:0x4010000000, PA:0x4020000000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.008778 axruntime:208] Initialize global memory allocator... +[ 0.009298 axruntime:209] use TLSF allocator. +[ 0.010894 axruntime:150] Initialize platform devices... +[ 0.011335 axhal::platform::aarch64_common::gic:50] Initialize GICv2... +[ 0.012179 axtask::api:66] Initialize scheduling... +[ 0.013023 axtask::run_queue:431] Initialize RUN_QUEUES +[ 0.014408 axtask::api:72] use Completely Fair scheduler. +[ 0.014959 axhal::platform::aarch64_common::psci:113] Starting CPU 1 ON ... +[ 0.015439 axruntime::mp:35] Secondary CPU 1 started. +[ 0.015993 axhal::platform::aarch64_common::psci:113] Starting CPU 2 ON ... +[ 0.016337 axruntime::mp:35] Secondary CPU 2 started. +[ 0.016119 axruntime::mp:45] Secondary CPU 1 init OK. +[ 0.017065 axruntime::mp:45] Secondary CPU 2 init OK. +[ 0.017056 axhal::platform::aarch64_common::psci:113] Starting CPU 3 ON ... +[ 0.022300 axruntime::mp:35] Secondary CPU 3 started. +[ 0.022418 axruntime:176] Initialize interrupt handlers... +[ 0.022716 axruntime::mp:45] Secondary CPU 3 init OK. +[ 0.023969 axruntime:186] Primary CPU 0 init OK. +part 0: ThreadId(10) [0, 125000) +part 4: ThreadId(14) [500000, 625000) +part 1: ThreadId(11) [125000, 250000) +part 2: ThreadId(12) [250000, 375000) +part 3: ThreadId(13) [375000, 500000) +part 8: ThreadId(18) [1000000, 1125000) +part 5: ThreadId(15) [625000, 750000) +part 9: ThreadId(19) [1125000, 1250000) +part 6: ThreadId(16) [750000, 875000) +part 7: ThreadId(17) [875000, 1000000) +part 12: ThreadId(22) [1500000, 1625000) +part 13: ThreadId(23) [1625000, 1750000) +part 10: ThreadId(20) [1250000, 1375000) +part 11: ThreadId(21) [1375000, 1500000) +part 14: ThreadId(24) [1750000, 1875000) +part 15: ThreadId(25) [1875000, 2000000) +part 9: ThreadId(19) finished +part 7: ThreadId(17) finished +part 5: ThreadId(15) finished +part 15: ThreadId(25) finished +part 0: ThreadId(10) finished +part 4: ThreadId(14) finished +part 8: ThreadId(18) finished +part 3: ThreadId(13) finished +part 11: ThreadId(21) finished +part 13: ThreadId(23) finished +part 1: ThreadId(11) finished +part 14: ThreadId(24) finished +part 12: ThreadId(22) finished +part 2: ThreadId(12) finished +part 6: ThreadId(16) finished +[ 1.070932 0:1 axhal::irq:20] Unhandled IRQ 33 +QEMU 8.1.94 monitor - type 'help' for more information +(qemu) info registers -a + +CPU#0 + PC=ffff000040086068 X00=ffff0000401c2ff8 X01=0000000000000000 +X02=0000000000000008 X03=0000000000000128 X04=0000000000000000 +X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 +X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 +X11=ffff0000401c4190 X12=0000000000000003 X13=ffff0000401c4770 +X14=0000000000000008 X15=00000000a0000000 X16=0000000000000000 +X17=0000000000005000 X18=0000000000000070 X19=ffff00004009b038 +X20=ffff000040093c28 X21=0000000000000000 X22=0000000000000000 +X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 +X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 +X29=0000000000000000 X30=ffff000040086064 SP=ffff0000401c2ff0 +PSTATE=60000345 -ZC- EL1h FPU disabled + +CPU#1 + PC=ffff000040086068 X00=ffff0000400dbf58 X01=0000000000000000 +X02=0000000000000000 X03=ffff0000401c3710 X04=ffff000041689fc0 +X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 +X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 +X11=ffff0000401c41d0 X12=0000000000000004 X13=ffff0000401c61b0 +X14=ffff0000401bff00 X15=ffff00004009b338 X16=0000000000000000 +X17=0000000000081102 X18=0000000000000000 X19=ffff00004009b138 +X20=ffff000040090450 X21=ffff0000401c1000 X22=0000000000000000 +X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 +X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 +X29=0000000000000000 X30=ffff000040086064 SP=ffff0000400dbf50 +PSTATE=60000345 -ZC- EL1h FPU disabled + +CPU#2 + PC=ffff000040086068 X00=ffff00004011bf58 X01=0000000000000000 +X02=0000000000000000 X03=0000000000000018 X04=0000000000000000 +X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 +X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 +X11=ffff0000401c4210 X12=0000000000000002 X13=ffff0000401c4930 +X14=0000000000000002 X15=0000000000000001 X16=0000000000000000 +X17=0000000000081002 X18=00000000000000e8 X19=ffff00004009b238 +X20=ffff000040090450 X21=ffff0000401c1000 X22=0000000000000000 +X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 +X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 +X29=0000000000000000 X30=ffff000040086064 SP=ffff00004011bf50 +PSTATE=60000345 -ZC- EL1h FPU disabled + +CPU#3 + PC=ffff000040086068 X00=ffff00004015bf58 X01=0000000000000000 +X02=0000000000000000 X03=ffff0000401c3f10 X04=ffff000041609f80 +X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 +X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 +X11=ffff0000401c4250 X12=0000000000000001 X13=ffff0000401c51f0 +X14=0000000000000002 X15=0000000000000001 X16=0000000000000003 +X17=00000000001e8480 X18=0000000000000030 X19=ffff00004009b338 +X20=ffff000040090450 X21=ffff0000401c1000 X22=0000000000000000 +X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 +X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 +X29=0000000000000000 X30=ffff000040086064 SP=ffff00004015bf50 +PSTATE=60000345 -ZC- EL1h FPU disabled +(qemu) c From 2b1abd5ef92042ae688dcb3c60d3beb0cc4c6bfc Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Tue, 17 Sep 2024 17:05:52 +0800 Subject: [PATCH 10/12] BUGGY in somewhere related to exit_current --- modules/axtask/src/run_queue.rs | 20 ++++++------ modules/axtask/src/timers.rs | 1 + modules/axtask/src/wait_queue.rs | 52 ++++++++++++++++++++++---------- 3 files changed, 46 insertions(+), 27 deletions(-) diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 6123e4f639..a6d68e0fe3 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -219,7 +219,7 @@ impl AxRunQueue { // There is no need to disable IRQ and preemption here, because // they both have been disabled in `current_check_preempt_pending`. let curr = crate::current(); - // assert!(curr.is_running()); + assert!(curr.is_running()); // When we call `preempt_resched()`, both IRQs and preemption must // have been disabled by `kernel_guard::NoPreemptIrqSave`. So we need @@ -262,22 +262,18 @@ impl AxRunQueue { // Schedule to next task. self.resched(false); } + drop(_kernel_guard); unreachable!("task exited!"); } pub fn blocked_resched(&mut self) { - let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); assert!( - curr.is_blocking() || curr.is_running(), - "task is not blocking or running: {:?}", + curr.is_blocking(), + "task is not blocking, {:?}", curr.state() ); - - // Current task may have been woken up on another CPU. - if curr.is_running() { - return; - } + assert!(curr.in_wait_queue()); debug!("task block: {}", curr.id_name()); self.resched(false); @@ -311,7 +307,7 @@ impl AxRunQueue { #[cfg(feature = "irq")] pub fn sleep_until(&mut self, deadline: axhal::time::TimeValue) { - let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline); assert!(curr.is_running()); @@ -323,6 +319,7 @@ impl AxRunQueue { curr.set_state(TaskState::Blocking); self.resched(false); } + drop(kernel_guard) } } @@ -348,7 +345,8 @@ impl AxRunQueue { }); assert!( next.is_ready(), - "next task is not ready: {:?}", + "next {} is not ready: {:?}", + next.id_name(), next.state() ); self.switch_to(prev, next); diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index cb55dde24c..5b4cd5a4ad 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -15,6 +15,7 @@ struct TaskWakeupEvent(AxTaskRef); impl TimerEvent for TaskWakeupEvent { fn callback(self, _now: TimeValue) { + let _kernel_guard = kernel_guard::NoPreempt::new(); self.0.set_in_timer_list(false); select_run_queue( #[cfg(feature = "smp")] diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index d90e55d073..8aef38a6af 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -65,7 +65,6 @@ impl WaitQueue { let curr = crate::current(); assert!(curr.is_running()); assert!(!curr.is_idle()); - let mut wq_locked = self.queue.lock(); // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] assert!(curr.can_preempt(1)); @@ -82,15 +81,19 @@ impl WaitQueue { curr.set_state(TaskState::Blocking); curr.set_in_wait_queue(true); - wq_locked.push_back(curr.clone()); + debug!("{} push to wait queue", curr.id_name()); + + self.queue.lock().push_back(curr.clone()); } /// Blocks the current task and put it into the wait queue, until other task /// notifies it. pub fn wait(&self) { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); self.push_to_wait_queue(); current_run_queue().blocked_resched(); self.cancel_events(crate::current()); + drop(kernel_guard); } /// Blocks the current task and put it into the wait queue, until the given @@ -102,6 +105,7 @@ impl WaitQueue { where F: Fn() -> bool, { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); loop { let mut wq = self.queue.lock(); if condition() { @@ -111,24 +115,28 @@ impl WaitQueue { assert!(curr.is_running()); assert!(!curr.is_idle()); + debug!("{} push to wait queue on wait_until", curr.id_name()); + // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] - assert!(curr.can_preempt(1)); + assert!(curr.can_preempt(2)); wq.push_back(curr.clone()); curr.set_state(TaskState::Blocking); curr.set_in_wait_queue(true); drop(wq); - current_run_queue().blocked_resched() + current_run_queue().blocked_resched(); } self.cancel_events(crate::current()); + drop(kernel_guard); } /// Blocks the current task and put it into the wait queue, until other tasks /// notify it, or the given duration has elapsed. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -143,6 +151,7 @@ impl WaitQueue { let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out self.cancel_events(curr); + drop(kernel_guard); timeout } @@ -156,6 +165,7 @@ impl WaitQueue { where F: Fn() -> bool, { + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -177,7 +187,7 @@ impl WaitQueue { // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] - assert!(curr.can_preempt(1)); + assert!(curr.can_preempt(2)); wq.push_back(curr.clone()); curr.set_state(TaskState::Blocking); @@ -187,6 +197,7 @@ impl WaitQueue { current_run_queue().blocked_resched() } self.cancel_events(curr); + drop(kernel_guard); timeout } @@ -195,11 +206,15 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_one(&self, resched: bool) -> bool { - let Some(task) = self.queue.lock().pop_front() else { - return false; - }; - unblock_one_task(task, resched); - true + let mut wq = self.queue.lock(); + if let Some(task) = wq.pop_front() { + task.set_in_wait_queue(false); + drop(wq); + unblock_one_task(task, resched); + true + } else { + false + } } /// Wakes all tasks in the wait queue. @@ -208,10 +223,14 @@ impl WaitQueue { /// preemption is enabled. pub fn notify_all(&self, resched: bool) { loop { - let Some(task) = self.queue.lock().pop_front() else { + let mut wq = self.queue.lock(); + if let Some(task) = wq.pop_front() { + task.set_in_wait_queue(false); + drop(wq); + unblock_one_task(task, resched); + } else { break; - }; - unblock_one_task(task, resched); + } } } @@ -220,8 +239,8 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool { + let mut wq = self.queue.lock(); let task_to_be_notify = { - let mut wq = self.queue.lock(); if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { wq.remove(index) } else { @@ -229,6 +248,9 @@ impl WaitQueue { } }; if let Some(task) = task_to_be_notify { + // Mark task as not in wait queue. + task.set_in_wait_queue(false); + drop(wq); unblock_one_task(task, resched); true } else { @@ -238,8 +260,6 @@ impl WaitQueue { } pub(crate) fn unblock_one_task(task: AxTaskRef, resched: bool) { - // Mark task as not in wait queue. - task.set_in_wait_queue(false); // Select run queue by the CPU set of the task. select_run_queue( #[cfg(feature = "smp")] From 6e841ebe0fd35a9ab63fa447a61bd8efa35a4cce Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Thu, 19 Sep 2024 00:09:45 +0800 Subject: [PATCH 11/12] Add kernel_guard in notify_xxx methods, bug remains --- modules/axtask/src/api.rs | 1 + modules/axtask/src/run_queue.rs | 8 +- modules/axtask/src/timers.rs | 1 - modules/axtask/src/wait_queue.rs | 12 ++- strange_irq_log.txt | 155 ++++++++++++++++--------------- 5 files changed, 95 insertions(+), 82 deletions(-) diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 4c13d553a1..2f0f2033c5 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -97,6 +97,7 @@ where F: FnOnce() + Send + 'static, { let task = TaskInner::new(f, name, stack_size, None); + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); crate::select_run_queue( #[cfg(feature = "smp")] task.clone(), diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index a6d68e0fe3..7790a10c83 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -273,15 +273,16 @@ impl AxRunQueue { "task is not blocking, {:?}", curr.state() ); - assert!(curr.in_wait_queue()); debug!("task block: {}", curr.id_name()); self.resched(false); } + /// Unblock one task by inserting it into the run queue. + /// If task state is `BLOCKING`, it will enter a loop until the task is in `BLOCKED` state. + /// + /// Note: this function should by called with preemption and IRQ disabled. pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { - let cpu_id = self.cpu_id; - // When task's state is Blocking, it has not finished its scheduling process. if task.is_blocking() { while task.is_blocking() { @@ -292,6 +293,7 @@ impl AxRunQueue { } if task.is_blocked() { + let cpu_id = self.cpu_id; debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); task.set_state(TaskState::Ready); self.scheduler.add_task(task); // TODO: priority diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index 5b4cd5a4ad..cb55dde24c 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -15,7 +15,6 @@ struct TaskWakeupEvent(AxTaskRef); impl TimerEvent for TaskWakeupEvent { fn callback(self, _now: TimeValue) { - let _kernel_guard = kernel_guard::NoPreempt::new(); self.0.set_in_timer_list(false); select_run_queue( #[cfg(feature = "smp")] diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index 8aef38a6af..b35c3ae21c 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -62,12 +62,13 @@ impl WaitQueue { } fn push_to_wait_queue(&self) { + let mut wq = self.queue.lock(); let curr = crate::current(); assert!(curr.is_running()); assert!(!curr.is_idle()); // we must not block current task with preemption disabled. #[cfg(feature = "preempt")] - assert!(curr.can_preempt(1)); + assert!(curr.can_preempt(2)); // We set task state as `Blocking` to clarify that the task is blocked // but **still NOT** finished its scheduling process. @@ -83,7 +84,7 @@ impl WaitQueue { debug!("{} push to wait queue", curr.id_name()); - self.queue.lock().push_back(curr.clone()); + wq.push_back(curr.clone()); } /// Blocks the current task and put it into the wait queue, until other task @@ -206,6 +207,8 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_one(&self, resched: bool) -> bool { + // Todo: figure out is irq save necessary here. + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let mut wq = self.queue.lock(); if let Some(task) = wq.pop_front() { task.set_in_wait_queue(false); @@ -223,6 +226,8 @@ impl WaitQueue { /// preemption is enabled. pub fn notify_all(&self, resched: bool) { loop { + // Todo: figure out is irq save necessary here. + let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let mut wq = self.queue.lock(); if let Some(task) = wq.pop_front() { task.set_in_wait_queue(false); @@ -231,6 +236,7 @@ impl WaitQueue { } else { break; } + drop(kernel_guard); } } @@ -239,6 +245,8 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool { + // Todo: figure out is irq save necessary here. + let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let mut wq = self.queue.lock(); let task_to_be_notify = { if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { diff --git a/strange_irq_log.txt b/strange_irq_log.txt index 21653c9c59..4d43ba6049 100644 --- a/strange_irq_log.txt +++ b/strange_irq_log.txt @@ -1,7 +1,7 @@ ➜ arceos git:(percpu_run_queue) ✗ make A=../arceos-apps/rust/task/parallel SMP=4 LOG=info FEATURES=sched_cfs ARCH=aarch64 ACCEL=n run Building App: parallel, Arch: aarch64, Platform: aarch64-qemu-virt, App type: rust cargo -C ../arceos-apps/rust/task/parallel build -Z unstable-options --target aarch64-unknown-none-softfloat --target-dir /home/hky/workspace/arceos-org/arceos/target --release --features "axstd/log-level-info axstd/sched_cfs axstd/smp" - Finished `release` profile [optimized] target(s) in 0.06s + Finished `release` profile [optimized] target(s) in 0.07s rust-objcopy --binary-architecture=aarch64 ../arceos-apps/rust/task/parallel/parallel_aarch64-qemu-virt.elf --strip-all -O binary ../arceos-apps/rust/task/parallel/parallel_aarch64-qemu-virt.bin Running on qemu... qemu-system-aarch64 -m 128M -smp 4 -cpu cortex-a72 -machine virt -kernel ../arceos-apps/rust/task/parallel/parallel_aarch64-qemu-virt.bin -nographic @@ -22,127 +22,130 @@ smp = 4 build_mode = release log_level = info -[ 0.002825 axruntime:130] Logging is enabled. -[ 0.003531 axruntime:131] Primary CPU 0 started, dtb = 0x44000000. -[ 0.003749 axruntime:133] Found physcial memory regions: -[ 0.003990 axruntime:135] [PA:0x40080000, PA:0x40092000) .text (READ | EXECUTE | RESERVED) -[ 0.004268 axruntime:135] [PA:0x40092000, PA:0x40098000) .rodata (READ | RESERVED) -[ 0.004724 axruntime:135] [PA:0x40098000, PA:0x4009c000) .data .tdata .tbss .percpu (READ | WRITE | RESERVED) -[ 0.005142 axruntime:135] [PA:0x4009c000, PA:0x4019c000) boot stack (READ | WRITE | RESERVED) -[ 0.005632 axruntime:135] [PA:0x4019c000, PA:0x401c2000) .bss (READ | WRITE | RESERVED) -[ 0.006012 axruntime:135] [PA:0x401c2000, PA:0x48000000) free memory (READ | WRITE | FREE) -[ 0.006429 axruntime:135] [PA:0x9000000, PA:0x9001000) mmio (READ | WRITE | DEVICE | RESERVED) -[ 0.006839 axruntime:135] [PA:0x9100000, PA:0x9101000) mmio (READ | WRITE | DEVICE | RESERVED) -[ 0.007264 axruntime:135] [PA:0x8000000, PA:0x8020000) mmio (READ | WRITE | DEVICE | RESERVED) -[ 0.007797 axruntime:135] [PA:0xa000000, PA:0xa004000) mmio (READ | WRITE | DEVICE | RESERVED) -[ 0.008228 axruntime:135] [PA:0x10000000, PA:0x3eff0000) mmio (READ | WRITE | DEVICE | RESERVED) -[ 0.008365 axruntime:135] [PA:0x4010000000, PA:0x4020000000) mmio (READ | WRITE | DEVICE | RESERVED) -[ 0.008778 axruntime:208] Initialize global memory allocator... -[ 0.009298 axruntime:209] use TLSF allocator. -[ 0.010894 axruntime:150] Initialize platform devices... -[ 0.011335 axhal::platform::aarch64_common::gic:50] Initialize GICv2... -[ 0.012179 axtask::api:66] Initialize scheduling... -[ 0.013023 axtask::run_queue:431] Initialize RUN_QUEUES -[ 0.014408 axtask::api:72] use Completely Fair scheduler. -[ 0.014959 axhal::platform::aarch64_common::psci:113] Starting CPU 1 ON ... -[ 0.015439 axruntime::mp:35] Secondary CPU 1 started. -[ 0.015993 axhal::platform::aarch64_common::psci:113] Starting CPU 2 ON ... -[ 0.016337 axruntime::mp:35] Secondary CPU 2 started. -[ 0.016119 axruntime::mp:45] Secondary CPU 1 init OK. -[ 0.017065 axruntime::mp:45] Secondary CPU 2 init OK. -[ 0.017056 axhal::platform::aarch64_common::psci:113] Starting CPU 3 ON ... -[ 0.022300 axruntime::mp:35] Secondary CPU 3 started. -[ 0.022418 axruntime:176] Initialize interrupt handlers... -[ 0.022716 axruntime::mp:45] Secondary CPU 3 init OK. -[ 0.023969 axruntime:186] Primary CPU 0 init OK. -part 0: ThreadId(10) [0, 125000) -part 4: ThreadId(14) [500000, 625000) +[ 0.005433 axruntime:130] Logging is enabled. +[ 0.006318 axruntime:131] Primary CPU 0 started, dtb = 0x44000000. +[ 0.006760 axruntime:133] Found physcial memory regions: +[ 0.007021 axruntime:135] [PA:0x40080000, PA:0x40092000) .text (READ | EXECUTE | RESERVED) +[ 0.007454 axruntime:135] [PA:0x40092000, PA:0x40098000) .rodata (READ | RESERVED) +[ 0.007862 axruntime:135] [PA:0x40098000, PA:0x4009c000) .data .tdata .tbss .percpu (READ | WRITE | RESERVED) +[ 0.008049 axruntime:135] [PA:0x4009c000, PA:0x4019c000) boot stack (READ | WRITE | RESERVED) +[ 0.008522 axruntime:135] [PA:0x4019c000, PA:0x401c2000) .bss (READ | WRITE | RESERVED) +[ 0.009656 axruntime:135] [PA:0x401c2000, PA:0x48000000) free memory (READ | WRITE | FREE) +[ 0.010102 axruntime:135] [PA:0x9000000, PA:0x9001000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.010908 axruntime:135] [PA:0x9100000, PA:0x9101000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.011561 axruntime:135] [PA:0x8000000, PA:0x8020000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.012257 axruntime:135] [PA:0xa000000, PA:0xa004000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.012670 axruntime:135] [PA:0x10000000, PA:0x3eff0000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.012862 axruntime:135] [PA:0x4010000000, PA:0x4020000000) mmio (READ | WRITE | DEVICE | RESERVED) +[ 0.013930 axruntime:208] Initialize global memory allocator... +[ 0.014402 axruntime:209] use TLSF allocator. +[ 0.016357 axruntime:150] Initialize platform devices... +[ 0.016647 axhal::platform::aarch64_common::gic:50] Initialize GICv2... +[ 0.017476 axtask::api:66] Initialize scheduling... +[ 0.018255 axtask::run_queue:430] Initialize RUN_QUEUES +[ 0.019116 axtask::api:72] use Completely Fair scheduler. +[ 0.019336 axhal::platform::aarch64_common::psci:113] Starting CPU 1 ON ... +[ 0.020195 axhal::platform::aarch64_common::psci:113] Starting CPU 2 ON ... +[ 0.020250 axruntime::mp:35] Secondary CPU 1 started. +[ 0.020992 axhal::platform::aarch64_common::psci:113] Starting CPU 3 ON ... +[ 0.020983 axruntime::mp:35] Secondary CPU 2 started. +[ 0.022058 axruntime:176] Initialize interrupt handlers... +[ 0.022999 axruntime::mp:45] Secondary CPU 1 init OK. +[ 0.021834 axruntime::mp:35] Secondary CPU 3 started. +[ 0.023066 axruntime::mp:45] Secondary CPU 2 init OK. +[ 0.024409 axruntime::mp:45] Secondary CPU 3 init OK. +[ 0.025113 axruntime:186] Primary CPU 0 init OK. part 1: ThreadId(11) [125000, 250000) -part 2: ThreadId(12) [250000, 375000) +part 0: ThreadId(10) [0, 125000) part 3: ThreadId(13) [375000, 500000) -part 8: ThreadId(18) [1000000, 1125000) +part 2: ThreadId(12) [250000, 375000) +part 4: ThreadId(14) [500000, 625000) part 5: ThreadId(15) [625000, 750000) -part 9: ThreadId(19) [1125000, 1250000) +part 8: ThreadId(18) [1000000, 1125000) part 6: ThreadId(16) [750000, 875000) part 7: ThreadId(17) [875000, 1000000) +part 9: ThreadId(19) [1125000, 1250000) part 12: ThreadId(22) [1500000, 1625000) -part 13: ThreadId(23) [1625000, 1750000) -part 10: ThreadId(20) [1250000, 1375000) part 11: ThreadId(21) [1375000, 1500000) +part 10: ThreadId(20) [1250000, 1375000) +part 13: ThreadId(23) [1625000, 1750000) part 14: ThreadId(24) [1750000, 1875000) part 15: ThreadId(25) [1875000, 2000000) -part 9: ThreadId(19) finished -part 7: ThreadId(17) finished -part 5: ThreadId(15) finished -part 15: ThreadId(25) finished +part 2: ThreadId(12) finished +part 8: ThreadId(18) finished part 0: ThreadId(10) finished +part 9: ThreadId(19) finished part 4: ThreadId(14) finished -part 8: ThreadId(18) finished -part 3: ThreadId(13) finished -part 11: ThreadId(21) finished -part 13: ThreadId(23) finished -part 1: ThreadId(11) finished part 14: ThreadId(24) finished +part 5: ThreadId(15) finished part 12: ThreadId(22) finished -part 2: ThreadId(12) finished part 6: ThreadId(16) finished -[ 1.070932 0:1 axhal::irq:20] Unhandled IRQ 33 +part 1: ThreadId(11) finished +part 13: ThreadId(23) finished +part 10: ThreadId(20) finished +part 15: ThreadId(25) finished +part 11: ThreadId(21) finished +part 7: ThreadId(17) finished +[ 2.202593 0:1 axhal::irq:20] Unhandled IRQ 33 QEMU 8.1.94 monitor - type 'help' for more information (qemu) info registers -a CPU#0 - PC=ffff000040086068 X00=ffff0000401c2ff8 X01=0000000000000000 + PC=ffff00004008616c X00=ffff0000401c2ff8 X01=0000000000000000 X02=0000000000000008 X03=0000000000000128 X04=0000000000000000 X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 -X11=ffff0000401c4190 X12=0000000000000003 X13=ffff0000401c4770 +X11=ffff0000401c4190 X12=0000000000000001 X13=ffff0000401c5ab0 X14=0000000000000008 X15=00000000a0000000 X16=0000000000000000 X17=0000000000005000 X18=0000000000000070 X19=ffff00004009b038 -X20=ffff000040093c28 X21=0000000000000000 X22=0000000000000000 +X20=ffff000040093c50 X21=0000000000000000 X22=0000000000000000 X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 -X29=0000000000000000 X30=ffff000040086064 SP=ffff0000401c2ff0 +X29=0000000000000000 X30=ffff000040086168 SP=ffff0000401c2ff0 PSTATE=60000345 -ZC- EL1h FPU disabled CPU#1 - PC=ffff000040086068 X00=ffff0000400dbf58 X01=0000000000000000 -X02=0000000000000000 X03=ffff0000401c3710 X04=ffff000041689fc0 + PC=ffff00004008616c X00=ffff0000400dbf58 X01=0000000000000000 +X02=0000000000000008 X03=0000000000000128 X04=ffff0000402c9e20 X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 -X11=ffff0000401c41d0 X12=0000000000000004 X13=ffff0000401c61b0 -X14=ffff0000401bff00 X15=ffff00004009b338 X16=0000000000000000 -X17=0000000000081102 X18=0000000000000000 X19=ffff00004009b138 -X20=ffff000040090450 X21=ffff0000401c1000 X22=0000000000000000 +X11=ffff0000401c41d0 X12=0000000000000001 X13=ffff0000401c4930 +X14=0000000000000010 X15=0000000000000101 X16=0000000000000000 +X17=000000000014fb18 X18=0000000000000030 X19=ffff00004009b138 +X20=ffff0000400909a4 X21=ffff0000401c1000 X22=0000000000000000 X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 -X29=0000000000000000 X30=ffff000040086064 SP=ffff0000400dbf50 +X29=0000000000000000 X30=ffff000040086168 SP=ffff0000400dbf50 PSTATE=60000345 -ZC- EL1h FPU disabled CPU#2 - PC=ffff000040086068 X00=ffff00004011bf58 X01=0000000000000000 + PC=ffff00004008616c X00=ffff00004011bf58 X01=0000000000000000 X02=0000000000000000 X03=0000000000000018 X04=0000000000000000 X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 -X11=ffff0000401c4210 X12=0000000000000002 X13=ffff0000401c4930 +X11=ffff0000401c4210 X12=0000000000000001 X13=ffff0000401c5ff0 X14=0000000000000002 X15=0000000000000001 X16=0000000000000000 -X17=0000000000081002 X18=00000000000000e8 X19=ffff00004009b238 -X20=ffff000040090450 X21=ffff0000401c1000 X22=0000000000000000 +X17=00000000000f4240 X18=0000000000000030 X19=ffff00004009b238 +X20=ffff0000400909a4 X21=ffff0000401c1000 X22=0000000000000000 X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 -X29=0000000000000000 X30=ffff000040086064 SP=ffff00004011bf50 +X29=0000000000000000 X30=ffff000040086168 SP=ffff00004011bf50 PSTATE=60000345 -ZC- EL1h FPU disabled CPU#3 - PC=ffff000040086068 X00=ffff00004015bf58 X01=0000000000000000 -X02=0000000000000000 X03=ffff0000401c3f10 X04=ffff000041609f80 + PC=ffff00004008616c X00=ffff00004015bf58 X01=0000000000000000 +X02=0000000000000000 X03=0000000000000018 X04=0000000000000000 X05=0000000000000002 X06=0000000000000000 X07=0000000000000000 X08=0000000000000000 X09=0000000000000000 X10=0000000000000000 -X11=ffff0000401c4250 X12=0000000000000001 X13=ffff0000401c51f0 -X14=0000000000000002 X15=0000000000000001 X16=0000000000000003 -X17=00000000001e8480 X18=0000000000000030 X19=ffff00004009b338 -X20=ffff000040090450 X21=ffff0000401c1000 X22=0000000000000000 +X11=ffff0000401c4250 X12=0000000000000001 X13=ffff0000401c53b0 +X14=0000000000000002 X15=0000000000000001 X16=0000000000010001 +X17=0000000000112a88 X18=0000000000000030 X19=ffff00004009b338 +X20=ffff0000400909a4 X21=ffff0000401c1000 X22=0000000000000000 X23=0000000000000000 X24=0000000000000000 X25=0000000000000000 X26=0000000000000000 X27=0000000000000000 X28=0000000000000000 -X29=0000000000000000 X30=ffff000040086064 SP=ffff00004015bf50 +X29=0000000000000000 X30=ffff000040086168 SP=ffff00004015bf50 PSTATE=60000345 -ZC- EL1h FPU disabled -(qemu) c +(qemu) qquit +unknown command: 'qquit' +(qemu) +(qemu) quit \ No newline at end of file From 30e3f9d46098e654ff027b62cb33912ad359ea59 Mon Sep 17 00:00:00 2001 From: hky1999 <976929993@qq.com> Date: Thu, 19 Sep 2024 13:54:15 +0800 Subject: [PATCH 12/12] Use unblock_lock to protect the task's unlock process --- modules/axtask/src/run_queue.rs | 23 ++++------------------- modules/axtask/src/task.rs | 30 ++++++++++++++++++++++++++++++ modules/axtask/src/wait_queue.rs | 19 +++++++++---------- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 7790a10c83..fa87196503 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -183,11 +183,7 @@ impl AxRunQueue { /// Core functions of run queue. impl AxRunQueue { pub fn add_task(&mut self, task: AxTaskRef) { - debug!( - "task spawn: {} on run_queue {}", - task.id_name(), - self.cpu_id - ); + debug!("Add {} on run_queue {}", task.id_name(), self.cpu_id); assert!(task.is_ready()); self.scheduler.add_task(task); } @@ -280,23 +276,12 @@ impl AxRunQueue { /// Unblock one task by inserting it into the run queue. /// If task state is `BLOCKING`, it will enter a loop until the task is in `BLOCKED` state. - /// - /// Note: this function should by called with preemption and IRQ disabled. pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { - // When task's state is Blocking, it has not finished its scheduling process. - if task.is_blocking() { - while task.is_blocking() { - // Wait for the task to finish its scheduling process. - core::hint::spin_loop(); - } - assert!(task.is_blocked()) - } - - if task.is_blocked() { + task.clone().unblock_locked(|| { let cpu_id = self.cpu_id; debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); task.set_state(TaskState::Ready); - self.scheduler.add_task(task); // TODO: priority + self.scheduler.add_task(task.clone()); // TODO: priority // Note: when the task is unblocked on another CPU's run queue, // we just ingiore the `resched` flag. @@ -304,7 +289,7 @@ impl AxRunQueue { #[cfg(feature = "preempt")] crate::current().set_preempt_pending(true); } - } + }) } #[cfg(feature = "irq")] diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index 09bb1d4c11..1c7dfa2a7a 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -6,6 +6,7 @@ use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull}; use bitmaps::Bitmap; +use kspin::SpinRaw; use memory_addr::{align_up_4k, va, VirtAddr}; use axhal::arch::TaskContext; @@ -54,6 +55,12 @@ pub struct TaskInner { #[cfg(feature = "irq")] in_timer_list: AtomicBool, + /// Used to protect the task from being unblocked by timer and `notify()` at the same time. + /// It is used in `unblock_task()`, which is called by wait queue's `notify()` and timer's callback. + /// Since preemption and irq are both disabled during `unblock_task()`, we can simply use a raw spin lock here. + #[cfg(feature = "irq")] + unblock_lock: SpinRaw<()>, + #[cfg(feature = "preempt")] need_resched: AtomicBool, #[cfg(feature = "preempt")] @@ -138,6 +145,8 @@ impl TaskInner { in_wait_queue: AtomicBool::new(false), #[cfg(feature = "irq")] in_timer_list: AtomicBool::new(false), + #[cfg(feature = "irq")] + unblock_lock: SpinRaw::new(()), #[cfg(feature = "preempt")] need_resched: AtomicBool::new(false), #[cfg(feature = "preempt")] @@ -288,6 +297,27 @@ impl TaskInner { self.in_timer_list.store(in_timer_list, Ordering::Release); } + pub(crate) fn unblock_locked(&self, mut run_queue_push: F) + where + F: FnMut(), + { + // When task's state is Blocking, it has not finished its scheduling process. + if self.is_blocking() { + while self.is_blocking() { + // Wait for the task to finish its scheduling process. + core::hint::spin_loop(); + } + assert!(self.is_blocked()) + } + + // When irq is enabled, use `unblock_lock` to protect the task from being unblocked by timer and `notify()` at the same time. + #[cfg(feature = "irq")] + let _lock = self.unblock_lock.lock(); + if self.is_blocked() { + run_queue_push(); + } + } + #[inline] #[cfg(feature = "preempt")] pub(crate) fn set_preempt_pending(&self, pending: bool) { diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index b35c3ae21c..ba3fc31209 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -67,6 +67,8 @@ impl WaitQueue { assert!(curr.is_running()); assert!(!curr.is_idle()); // we must not block current task with preemption disabled. + // Current expected preempt count is 2. + // 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`. #[cfg(feature = "preempt")] assert!(curr.can_preempt(2)); @@ -119,6 +121,8 @@ impl WaitQueue { debug!("{} push to wait queue on wait_until", curr.id_name()); // we must not block current task with preemption disabled. + // Current expected preempt count is 2. + // 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`. #[cfg(feature = "preempt")] assert!(curr.can_preempt(2)); wq.push_back(curr.clone()); @@ -187,6 +191,8 @@ impl WaitQueue { assert!(!curr.is_idle()); // we must not block current task with preemption disabled. + // Current expected preempt count is 2. + // 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`. #[cfg(feature = "preempt")] assert!(curr.can_preempt(2)); wq.push_back(curr.clone()); @@ -207,13 +213,11 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_one(&self, resched: bool) -> bool { - // Todo: figure out is irq save necessary here. - let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let mut wq = self.queue.lock(); if let Some(task) = wq.pop_front() { task.set_in_wait_queue(false); - drop(wq); unblock_one_task(task, resched); + drop(wq); true } else { false @@ -226,17 +230,14 @@ impl WaitQueue { /// preemption is enabled. pub fn notify_all(&self, resched: bool) { loop { - // Todo: figure out is irq save necessary here. - let kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let mut wq = self.queue.lock(); if let Some(task) = wq.pop_front() { task.set_in_wait_queue(false); - drop(wq); unblock_one_task(task, resched); } else { break; } - drop(kernel_guard); + drop(wq); } } @@ -245,8 +246,6 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool { - // Todo: figure out is irq save necessary here. - let _kernel_guard = kernel_guard::NoPreemptIrqSave::new(); let mut wq = self.queue.lock(); let task_to_be_notify = { if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { @@ -258,8 +257,8 @@ impl WaitQueue { if let Some(task) = task_to_be_notify { // Mark task as not in wait queue. task.set_in_wait_queue(false); - drop(wq); unblock_one_task(task, resched); + drop(wq); true } else { false