Skip to content

Commit

Permalink
Add oncore verification to allocation_state
Browse files Browse the repository at this point in the history
Checks at the top of most functions to ensure that the core
of the caller is the same as the home core of the state, only in
debug builds.
  • Loading branch information
travisdowns committed Nov 17, 2022
1 parent 413cbe3 commit 44c0dc7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
23 changes: 23 additions & 0 deletions src/v/cluster/scheduling/allocation_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cluster/scheduling/allocation_state.h"

#include "bytes/oncore.h"
#include "cluster/logger.h"
#include "ssx/sformat.h"

Expand All @@ -17,6 +18,7 @@ namespace cluster {
void allocation_state::rollback(
const std::vector<partition_assignment>& v,
const partition_allocation_domain domain) {
verify_shard();
for (auto& as : v) {
rollback(as.replicas, domain);
// rollback for each assignment as the groups are distinct
Expand All @@ -27,12 +29,14 @@ void allocation_state::rollback(
void allocation_state::rollback(
const std::vector<model::broker_shard>& v,
const partition_allocation_domain domain) {
verify_shard();
for (auto& bs : v) {
deallocate(bs, domain);
}
}

int16_t allocation_state::available_nodes() const {
verify_shard();
return std::count_if(
_nodes.begin(), _nodes.end(), [](const underlying_t::value_type& p) {
return p.second->is_active();
Expand All @@ -54,6 +58,7 @@ void allocation_state::apply_update(
std::vector<model::broker_shard> replicas,
raft::group_id group_id,
const partition_allocation_domain domain) {
verify_shard();
if (replicas.empty()) {
return;
}
Expand Down Expand Up @@ -92,6 +97,7 @@ void allocation_state::register_node(allocation_state::node_ptr n) {

void allocation_state::update_allocation_nodes(
const std::vector<model::broker>& brokers) {
verify_shard();
// deletions
for (auto& [id, node] : _nodes) {
auto it = std::find_if(
Expand Down Expand Up @@ -127,6 +133,7 @@ void allocation_state::update_allocation_nodes(
}

void allocation_state::decommission_node(model::node_id id) {
verify_shard();
auto it = _nodes.find(id);
if (it == _nodes.end()) {
throw std::invalid_argument(
Expand All @@ -137,6 +144,7 @@ void allocation_state::decommission_node(model::node_id id) {
}

void allocation_state::recommission_node(model::node_id id) {
verify_shard();
auto it = _nodes.find(id);
if (it == _nodes.end()) {
throw std::invalid_argument(
Expand All @@ -158,13 +166,15 @@ bool allocation_state::is_empty(model::node_id id) const {
void allocation_state::deallocate(
const model::broker_shard& replica,
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
it->second->deallocate_on(replica.shard, domain);
}
}

result<uint32_t> allocation_state::allocate(
model::node_id id, const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(id); it != _nodes.end()) {
if (it->second->is_full()) {
return errc::invalid_node_operation;
Expand All @@ -183,4 +193,17 @@ allocation_state::get_rack_id(model::node_id id) const {
vassert(false, "unexpected node id {}", id);
}

void allocation_state::verify_shard() const {
/* This is a consistency check on the use of the allocation state:
* it checks that the caller is on the same shard the state was originally
* created on. It is easy to inadvertently violate this condition since
* the cluster::allocation_units class embeds a pointer to this state and
* when moved across shards (e.g., because allocation always happens on
* shard 0 but some other shard initiates the request) it may result in
* calls on the current shard being routed back to the original shard
* through this pointer, a thread-safety violation.
*/
oncore_debug_verify(_verify_shard);
}

} // namespace cluster
8 changes: 8 additions & 0 deletions src/v/cluster/scheduling/allocation_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "bytes/oncore.h"
#include "cluster/scheduling/allocation_node.h"
#include "model/metadata.h"

Expand Down Expand Up @@ -67,10 +68,17 @@ class allocation_state {
std::optional<model::rack_id> get_rack_id(model::node_id) const;

private:
/**
* This function verifies that the current shard matches the shard the
* state was originally created on, aborting if not.
*/
void verify_shard() const;

config::binding<uint32_t> _partitions_per_shard;
config::binding<uint32_t> _partitions_reserve_shard0;

raft::group_id _highest_group{0};
underlying_t _nodes;
expression_in_debug_mode(oncore _verify_shard;)
};
} // namespace cluster

0 comments on commit 44c0dc7

Please sign in to comment.