Skip to content

Commit

Permalink
m: Remove the dependency on async-lock
Browse files Browse the repository at this point in the history
This PR removes the need for async-lock as a dependency by making the Executor initializable in a const context. This relies on the const initialization of CondVars and Mutexes, so the MSRV was bumped to 1.63.
  • Loading branch information
james7132 authored May 14, 2024
1 parent 1a20703 commit a16e1d3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 39 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "blocking"
version = "1.6.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2021"
rust-version = "1.60"
rust-version = "1.63"
description = "A thread pool for isolating blocking I/O in async programs"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/blocking"
Expand All @@ -22,8 +22,5 @@ futures-lite = { version = "2.0.0", default-features = false }
piper = "0.2.0"
tracing = { version = "0.1.37", default-features = false, optional = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
async-lock = "3.0.0"

[dev-dependencies]
futures-lite = "2.0.0"
89 changes: 54 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ pub use async_task::Task;

/// Default value for max threads that Executor can grow to
#[cfg(not(target_family = "wasm"))]
const DEFAULT_MAX_THREADS: usize = 500;
const DEFAULT_MAX_THREADS: NonZeroUsize = {
if let Some(size) = NonZeroUsize::new(500) {
size
} else {
panic!("DEFAULT_MAX_THREADS is non-zero");
}
};

/// Minimum value for max threads config
#[cfg(not(target_family = "wasm"))]
Expand Down Expand Up @@ -150,46 +156,56 @@ struct Inner {
/// This is the number of idle threads + the number of active threads.
thread_count: usize,

// TODO: The option is only used for const-initialization. This can be replaced with
// a normal VecDeque when the MSRV can be bumped passed
/// The queue of blocking tasks.
queue: VecDeque<Runnable>,
queue: Option<VecDeque<Runnable>>,

/// Maximum number of threads in the pool
thread_limit: NonZeroUsize,
thread_limit: Option<NonZeroUsize>,
}

impl Inner {
#[inline]
fn queue(&mut self) -> &mut VecDeque<Runnable> {
self.queue.get_or_insert_with(VecDeque::new)
}
}

impl Executor {
#[cfg(not(target_family = "wasm"))]
fn max_threads() -> usize {
fn max_threads() -> NonZeroUsize {
match env::var(MAX_THREADS_ENV) {
Ok(v) => v
.parse::<usize>()
.map(|v| v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS))
.ok()
.and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS)))
.unwrap_or(DEFAULT_MAX_THREADS),
Err(_) => DEFAULT_MAX_THREADS,
}
}

#[cfg(target_family = "wasm")]
fn max_threads() -> NonZeroUsize {
NonZeroUsize::new(1).unwrap()
}

/// Get a reference to the global executor.
#[inline]
fn get() -> &'static Self {
#[cfg(not(target_family = "wasm"))]
{
use async_lock::OnceCell;

static EXECUTOR: OnceCell<Executor> = OnceCell::new();

return EXECUTOR.get_or_init_blocking(|| {
let thread_limit = Self::max_threads();
Executor {
inner: Mutex::new(Inner {
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
thread_limit: NonZeroUsize::new(thread_limit).unwrap(),
}),
cvar: Condvar::new(),
}
});
static EXECUTOR: Executor = Executor {
inner: Mutex::new(Inner {
idle_count: 0,
thread_count: 0,
queue: None,
thread_limit: None,
}),
cvar: Condvar::new(),
};

&EXECUTOR
}

#[cfg(target_family = "wasm")]
Expand Down Expand Up @@ -227,7 +243,7 @@ impl Executor {
inner.idle_count -= 1;

// Run tasks in the queue.
while let Some(runnable) = inner.queue.pop_front() {
while let Some(runnable) = inner.queue().pop_front() {
// We have found a task - grow the pool if needed.
self.grow_pool(inner);

Expand All @@ -249,7 +265,7 @@ impl Executor {
inner = lock;

// If there are no tasks after a while, stop this thread.
if res.timed_out() && inner.queue.is_empty() {
if res.timed_out() && inner.queue().is_empty() {
inner.idle_count -= 1;
inner.thread_count -= 1;
break;
Expand All @@ -266,7 +282,7 @@ impl Executor {
/// Schedules a runnable task for execution.
fn schedule(&'static self, runnable: Runnable) {
let mut inner = self.inner.lock().unwrap();
inner.queue.push_back(runnable);
inner.queue().push_back(runnable);

// Notify a sleeping thread and spawn more threads if needed.
self.cvar.notify_one();
Expand All @@ -278,17 +294,20 @@ impl Executor {
#[cfg(feature = "tracing")]
let _span = tracing::trace_span!(
"grow_pool",
queue_len = inner.queue.len(),
queue_len = inner.queue().len(),
idle_count = inner.idle_count,
thread_count = inner.thread_count,
)
.entered();

let thread_limit = inner
.thread_limit
.get_or_insert_with(Self::max_threads)
.get();

// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while inner.queue.len() > inner.idle_count * 5
&& inner.thread_count < inner.thread_limit.get()
{
while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit {
#[cfg(feature = "tracing")]
tracing::trace!("spawning a new thread to handle blocking tasks");

Expand Down Expand Up @@ -321,13 +340,13 @@ impl Executor {

// If the limit is about to be set to zero, set it to one instead so that if,
// in the future, we are able to spawn more threads, we will be able to do so.
NonZeroUsize::new(new_limit).unwrap_or_else(|| {
Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| {
#[cfg(feature = "tracing")]
tracing::warn!(
"attempted to lower thread_limit to zero; setting to one instead"
);
NonZeroUsize::new(1).unwrap()
})
}))
};
}
}
Expand Down Expand Up @@ -985,22 +1004,22 @@ mod tests {
fn test_max_threads() {
// properly set env var
env::set_var(MAX_THREADS_ENV, "100");
assert_eq!(100, Executor::max_threads());
assert_eq!(100, Executor::max_threads().get());

// passed value below minimum, so we set it to minimum
env::set_var(MAX_THREADS_ENV, "0");
assert_eq!(1, Executor::max_threads());
assert_eq!(1, Executor::max_threads().get());

// passed value above maximum, so we set to allowed maximum
env::set_var(MAX_THREADS_ENV, "50000");
assert_eq!(10000, Executor::max_threads());
assert_eq!(10000, Executor::max_threads().get());

// no env var, use default
env::set_var(MAX_THREADS_ENV, "");
assert_eq!(500, Executor::max_threads());
assert_eq!(500, Executor::max_threads().get());

// not a number, use default
env::set_var(MAX_THREADS_ENV, "NOTINT");
assert_eq!(500, Executor::max_threads());
assert_eq!(500, Executor::max_threads().get());
}
}

0 comments on commit a16e1d3

Please sign in to comment.