Skip to content

Commit

Permalink
refine docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Sep 11, 2024
1 parent 6d122a9 commit fce41b8
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 33 deletions.
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ message Table {
// - The table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
//
// Please note that this field is not intended to describe the expected vnode count
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.expected_vnode_count`.
optional uint32 maybe_vnode_count = 40;

// Per-table catalog version, used by schema change. `None` for internal
Expand Down
1 change: 1 addition & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct TableDesc {
/// the column indices which could receive watermarks.
pub watermark_columns: FixedBitSet,

/// Total vnode count of the table.
pub vnode_count: usize,

/// Whether the table is versioned. If `true`, column-aware row encoding will be used
Expand Down
43 changes: 43 additions & 0 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::vnode::VirtualNode;

/// A trait for accessing the vnode count field with backward compatibility.
pub trait VnodeCountCompat {
/// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set,
/// typically for backward compatibility.
///
/// See the documentation on the field of the implementing type for more details.
fn vnode_count(&self) -> usize;
}

macro_rules! impl_maybe_vnode_count_compat {
($($ty:ty),* $(,)?) => {
$(
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
}
}
)*
};
}

impl_maybe_vnode_count_compat!(
risingwave_pb::plan_common::StorageTableDesc,
risingwave_pb::catalog::Table,
risingwave_pb::meta::table_fragments::Fragment,
);
3 changes: 2 additions & 1 deletion src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {

/// Create a vnode mapping where all vnodes are mapped to the same single item.
///
/// The vnode count will be 1.
/// The length of the mapping will be 1, as if there's only one vnode in total.
/// This is to be consistent with [`VnodeBitmapExt::singleton`].
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), 1)
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/hash/consistent_hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
// limitations under the License.

pub mod bitmap;
pub mod compat;
pub mod mapping;
pub mod vnode;
23 changes: 0 additions & 23 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,29 +193,6 @@ impl VirtualNode {
}
}

pub trait VnodeCountCompat {
fn vnode_count(&self) -> usize;
}

macro_rules! delegate_maybe_vnode_count {
($($ty:ty),* $(,)?) => {
$(
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
}
}
)*
};
}

delegate_maybe_vnode_count!(
risingwave_pb::plan_common::StorageTableDesc,
risingwave_pb::catalog::Table,
risingwave_pb::meta::table_fragments::Fragment,
);

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/common/src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod key_v2;
pub mod table_distribution;

pub use consistent_hash::bitmap::*;
pub use consistent_hash::compat::*;
pub use consistent_hash::mapping::*;
pub use consistent_hash::vnode::*;
pub use dispatcher::{calc_hash_key_kind, HashKeyDispatcher};
Expand Down
4 changes: 3 additions & 1 deletion src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,9 @@ where
visit_stream_node_internal_tables(fragment.node.as_mut().unwrap(), f)
}

/// Visit the tables of a [`StreamFragment`], including those in `Materialize` nodes.
/// Visit the tables of a [`StreamFragment`].
///
/// Compared to [`visit_internal_tables`], this function also visits the table of `Materialize` node.
pub fn visit_tables<F>(fragment: &mut StreamFragment, f: F)
where
F: FnMut(&mut Table, &str),
Expand Down
18 changes: 17 additions & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,20 @@ pub struct TableCatalog {

pub cdc_table_id: Option<String>,

/// Can be unset.
/// Total vnode count of the table.
///
/// Can be unset if the catalog is generated by the frontend and not yet persisted. This is
/// because the vnode count of each fragment (then internal tables) is determined by the
/// meta service. See also [`StreamMaterialize::derive_table_catalog`] and
/// [`TableCatalogBuilder::build`].
///
/// On the contrary, if this comes from a [`PbTable`], the field must be `Some` no matter
/// whether the table is created before or after the version we introduced variable vnode
/// count support. This is because we've already handled backward compatibility during
/// conversion.
///
/// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
/// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
pub vnode_count: Option<usize>,
}

Expand Down Expand Up @@ -391,6 +404,9 @@ impl TableCatalog {
self.version().map(|v| v.version_id)
}

/// Get the total vnode count of the table.
///
/// Panics if it's called on an incomplete (and not yet persisted) table catalog.
pub fn vnode_count(&self) -> usize {
self.vnode_count
.expect("vnode count unset, are you calling on an incomplete table catalog?")
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl StreamingJob {
}
}

// TODO: basically we want to ensure that the `Table` persisted in the catalog is the same as the
// one in the `Materialize` node in the actor. However, they are currently handled separately
// and can be out of sync. Shall we directly copy the whole struct from the actor to the catalog
// to avoid `set`ting each field separately?
impl StreamingJob {
pub fn set_id(&mut self, id: u32) {
match self {
Expand Down Expand Up @@ -160,6 +164,7 @@ impl StreamingJob {
}
}

/// Set the vnode count of the table.
pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
match self {
Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2142,7 +2142,8 @@ impl DdlController {
old_table_fragments.assigned_parallelism,
);

// TODO(var-vnode): fill vnode count for table catalog in `stream_job`.
// TODO(var-vnode): fill vnode count for table catalog in `stream_job`,
// like what we do in `build_stream_job`.

let ctx = ReplaceTableContext {
old_table_fragments,
Expand Down
22 changes: 16 additions & 6 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ pub struct StreamFragmentGraph {
/// variable. If not specified, all active worker slots will be used.
specified_parallelism: Option<NonZeroUsize>,

/// Expected vnode count for the graph.
/// The scheduler on the meta service will use this as a hint to decide the vnode count
/// for each fragment.
///
/// Note that the actual vnode count may be different from this value.
/// For example, a no-shuffle exchange between current fragment graph and an existing
/// upstream fragment graph requires two fragments to be in the same distribution,
/// thus the same vnode count.
expected_vnode_count: usize,
}

Expand All @@ -351,10 +359,10 @@ impl StreamFragmentGraph {
// Create nodes.
let fragments: HashMap<_, _> = proto
.fragments
.iter()
.map(|(&id, fragment)| {
.into_iter()
.map(|(id, fragment)| {
let id = fragment_id_gen.to_global_id(id);
let fragment = BuildingFragment::new(id, fragment.clone(), job, table_id_gen);
let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
(id, fragment)
})
.collect();
Expand All @@ -371,10 +379,10 @@ impl StreamFragmentGraph {
let mut downstreams = HashMap::new();
let mut upstreams = HashMap::new();

for edge in &proto.edges {
for edge in proto.edges {
let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
let edge = StreamFragmentEdge::from_protobuf(edge);
let edge = StreamFragmentEdge::from_protobuf(&edge);

upstreams
.entry(downstream_id)
Expand Down Expand Up @@ -510,6 +518,7 @@ impl StreamFragmentGraph {
self.specified_parallelism
}

/// Get the expected vnode count of the graph. See documentation of the field for more details.
pub fn expected_vnode_count(&self) -> usize {
self.expected_vnode_count
}
Expand Down Expand Up @@ -1167,13 +1176,14 @@ impl CompleteStreamFragmentGraph {
&self.building_graph.fragments
}

/// Returns all building fragments in the graph.
/// Returns all building fragments in the graph, mutable.
pub(super) fn building_fragments_mut(
&mut self,
) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
&mut self.building_graph.fragments
}

/// Get the expected vnode count of the building graph. See documentation of the field for more details.
pub(super) fn expected_vnode_count(&self) -> usize {
self.building_graph.expected_vnode_count()
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ impl Distribution {
}
}

/// Get the vnode count of the distribution.
///
/// For singleton, it's 1. For hash, it's the length of the mapping.
pub fn vnode_count(&self) -> usize {
match self {
Distribution::Singleton(_) => 1,
Expand Down

0 comments on commit fce41b8

Please sign in to comment.