Skip to content

Commit

Permalink
Implement Once and generalize thread-local storage map for reuse (#49)
Browse files Browse the repository at this point in the history
`Once` is a synchronization primitive often used in a static variable to
ensure some initialization logic runs at most once in the execution of a
program. To ensure that we support this pattern, we implement a new
global storage map that is empied across executions, and use it to
ensure that a `Once` cell's state resets across test executions. This
also means that multiple Shuttle tests running concurrently (e.g., under
the cargo test runner with a default config) will see independent
instances of a static Once cell and be properly isolated from each
other.

The storage map shares common logic with the TLS map we already have, so
this PR factors that map out into a single abstraction.

In a future PR we'll use Once to implement lazy_static, which will also
store its values into the global storage map.
  • Loading branch information
jamesbornholt committed Sep 9, 2021
1 parent 31f169d commit 7e10ce3
Show file tree
Hide file tree
Showing 9 changed files with 559 additions and 77 deletions.
23 changes: 20 additions & 3 deletions src/runtime/execution.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::runtime::failure::{init_panic_hook, persist_failure, persist_task_failure};
use crate::runtime::storage::{StorageKey, StorageMap};
use crate::runtime::task::clock::VectorClock;
use crate::runtime::task::{Task, TaskId, TaskState, DEFAULT_INLINE_TASKS};
use crate::runtime::thread::continuation::PooledContinuation;
Expand Down Expand Up @@ -171,6 +172,9 @@ pub(crate) struct ExecutionState {
// the number of scheduling decisions made so far
context_switches: usize,

// static values for the current execution
storage: StorageMap,

scheduler: Rc<RefCell<dyn Scheduler>>,
current_schedule: Schedule,

Expand Down Expand Up @@ -213,6 +217,7 @@ impl ExecutionState {
next_task: ScheduledTask::None,
has_yielded: false,
context_switches: 0,
storage: StorageMap::new(),
scheduler,
current_schedule: initial_schedule,
current_span_entered: None,
Expand Down Expand Up @@ -320,6 +325,8 @@ impl ExecutionState {
.expect("couldn't cleanup a future");
}

while Self::with(|state| state.storage.pop()).is_some() {}

#[cfg(debug_assertions)]
Self::with(|state| state.has_cleaned_up = true);
}
Expand Down Expand Up @@ -429,6 +436,16 @@ impl ExecutionState {
Self::with(|state| state.context_switches)
}

pub(crate) fn get_storage<K: Into<StorageKey>, T: 'static>(&self, key: K) -> Option<&T> {
self.storage
.get(key.into())
.map(|result| result.expect("global storage is never destructed"))
}

pub(crate) fn init_storage<K: Into<StorageKey>, T: 'static>(&mut self, key: K, value: T) {
self.storage.init(key.into(), value);
}

pub(crate) fn get_clock(&self, id: TaskId) -> &VectorClock {
&self.tasks.get(id.0).unwrap().clock
}
Expand All @@ -437,21 +454,21 @@ impl ExecutionState {
&mut self.tasks.get_mut(id.0).unwrap().clock
}

// Increment the current thread's clock entry and update its clock with the one provided.
/// Increment the current thread's clock entry and update its clock with the one provided.
pub(crate) fn update_clock(&mut self, clock: &VectorClock) {
let task = self.current_mut();
task.clock.increment(task.id);
task.clock.update(clock);
}

// Increment the current thread's clock and return a shared reference to it
/// Increment the current thread's clock and return a shared reference to it
pub(crate) fn increment_clock(&mut self) -> &VectorClock {
let task = self.current_mut();
task.clock.increment(task.id);
&task.clock
}

// Increment the current thread's clock and return a mutable reference to it
/// Increment the current thread's clock and return a mutable reference to it
pub(crate) fn increment_clock_mut(&mut self) -> &mut VectorClock {
let task = self.current_mut();
task.clock.increment(task.id);
Expand Down
1 change: 1 addition & 0 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(crate) mod execution;
mod failure;
pub(crate) mod runner;
pub(crate) mod storage;
pub(crate) mod task;
pub(crate) mod thread;
60 changes: 60 additions & 0 deletions src/runtime/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::any::Any;
use std::collections::{HashMap, VecDeque};

/// A unique identifier for a storage slot
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct StorageKey(pub usize, pub usize); // (identifier, type)

/// A map of storage values.
///
/// We remember the insertion order into the storage HashMap so that destruction is deterministic.
/// Values are Option<_> because we need to be able to incrementally destruct them, as it's valid
/// for TLS destructors to initialize new TLS slots. When a slot is destructed, its key is removed
/// from `order` and its value is replaced with None.
pub(crate) struct StorageMap {
locals: HashMap<StorageKey, Option<Box<dyn Any>>>,
order: VecDeque<StorageKey>,
}

impl StorageMap {
pub fn new() -> Self {
Self {
locals: HashMap::new(),
order: VecDeque::new(),
}
}

pub fn get<T: 'static>(&self, key: StorageKey) -> Option<Result<&T, AlreadyDestructedError>> {
self.locals.get(&key).map(|val| {
val.as_ref()
.map(|val| {
Ok(val
.downcast_ref::<T>()
.expect("local value must downcast to expected type"))
})
.unwrap_or(Err(AlreadyDestructedError))
})
}

pub fn init<T: 'static>(&mut self, key: StorageKey, value: T) {
let result = self.locals.insert(key, Some(Box::new(value)));
assert!(result.is_none(), "cannot reinitialize a storage slot");
self.order.push_back(key);
}

/// Return ownership of the next still-initialized storage slot.
pub fn pop(&mut self) -> Option<Box<dyn Any>> {
let key = self.order.pop_front()?;
let value = self
.locals
.get_mut(&key)
.expect("keys in `order` must exist")
.take()
.expect("keys in `order` must not yet be destructed");
Some(value)
}
}

#[derive(Debug)]
#[non_exhaustive]
pub(crate) struct AlreadyDestructedError;
75 changes: 8 additions & 67 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::runtime::execution::ExecutionState;
use crate::runtime::storage::{AlreadyDestructedError, StorageKey, StorageMap};
use crate::runtime::task::clock::VectorClock;
use crate::runtime::thread;
use crate::runtime::thread::continuation::{ContinuationPool, PooledContinuation};
Expand All @@ -8,7 +9,6 @@ use bitvec::vec::BitVec;
use futures::{task::Waker, Future};
use std::any::Any;
use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::rc::Rc;
use std::task::Context;
Expand Down Expand Up @@ -56,7 +56,7 @@ pub(crate) struct Task {

name: Option<String>,

local_storage: LocalMap,
local_storage: StorageMap,
}

impl Task {
Expand All @@ -80,7 +80,7 @@ impl Task {
woken_by_self: false,
detached: false,
name,
local_storage: LocalMap::new(),
local_storage: StorageMap::new(),
}
}

Expand Down Expand Up @@ -210,14 +210,14 @@ impl Task {
/// Returns Some(Err(_)) if the slot has already been destructed. Returns None if the slot has
/// not yet been initialized.
pub(crate) fn local<T: 'static>(&self, key: &'static LocalKey<T>) -> Option<Result<&T, AlreadyDestructedError>> {
self.local_storage.get(key)
self.local_storage.get(key.into())
}

/// Initialize the given thread-local storage slot with a new value.
///
/// Panics if the slot has already been initialized.
pub(crate) fn init_local<T: 'static>(&mut self, key: &'static LocalKey<T>, value: T) {
self.local_storage.init(key, value)
self.local_storage.init(key.into(), value)
}

/// Return ownership of the next still-initialized thread-local storage slot, to be used when
Expand Down Expand Up @@ -316,67 +316,8 @@ impl Debug for TaskSet {
}
}

/// A unique identifier for a [`LocalKey`](crate::thread::LocalKey)
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct LocalKeyId(usize);

impl LocalKeyId {
fn new<T: 'static>(key: &'static LocalKey<T>) -> Self {
Self(key as *const _ as usize)
impl<T: 'static> From<&'static LocalKey<T>> for StorageKey {
fn from(key: &'static LocalKey<T>) -> Self {
Self(key as *const _ as usize, 0x1)
}
}

/// A map of thread-local storage values.
///
/// We remember the insertion order into the storage HashMap so that destruction is deterministic.
/// Values are Option<_> because we need to be able to incrementally destruct them, as it's valid
/// for TLS destructors to initialize new TLS slots. When a slot is destructed, its key is removed
/// from `order` and its value is replaced with None.
struct LocalMap {
locals: HashMap<LocalKeyId, Option<Box<dyn Any>>>,
order: VecDeque<LocalKeyId>,
}

impl LocalMap {
fn new() -> Self {
Self {
locals: HashMap::new(),
order: VecDeque::new(),
}
}

fn get<T: 'static>(&self, key: &'static LocalKey<T>) -> Option<Result<&T, AlreadyDestructedError>> {
self.locals.get(&LocalKeyId::new(key)).map(|val| {
val.as_ref()
.map(|val| {
Ok(val
.downcast_ref::<T>()
.expect("local value must downcast to expected type"))
})
.unwrap_or(Err(AlreadyDestructedError))
})
}

fn init<T: 'static>(&mut self, key: &'static LocalKey<T>, value: T) {
let key = LocalKeyId::new(key);
let result = self.locals.insert(key, Some(Box::new(value)));
assert!(result.is_none(), "cannot reinitialize a TLS slot");
self.order.push_back(key);
}

/// Return ownership of the next still-initialized TLS slot.
fn pop(&mut self) -> Option<Box<dyn Any>> {
let key = self.order.pop_front()?;
let value = self
.locals
.get_mut(&key)
.expect("keys in `order` must exist")
.take()
.expect("keys in `order` must not yet be destructed");
Some(value)
}
}

#[derive(Debug)]
#[non_exhaustive]
pub(crate) struct AlreadyDestructedError;
4 changes: 4 additions & 0 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod barrier;
mod condvar;
pub mod mpsc;
mod mutex;
mod once;
mod rwlock;

pub use barrier::{Barrier, BarrierWaitResult};
Expand All @@ -13,6 +14,9 @@ pub use condvar::{Condvar, WaitTimeoutResult};
pub use mutex::Mutex;
pub use mutex::MutexGuard;

pub use once::Once;
pub use once::OnceState;

pub use rwlock::RwLock;
pub use rwlock::RwLockReadGuard;
pub use rwlock::RwLockWriteGuard;
Expand Down
Loading

0 comments on commit 7e10ce3

Please sign in to comment.