Skip to content

Commit

Permalink
rethink vnode count existence
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Sep 9, 2024
1 parent d4f3ddf commit c4772c9
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 29 deletions.
6 changes: 3 additions & 3 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat};
use risingwave_common::memory::MemoryContext;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -195,8 +195,8 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.collect();

// Lookup Join always contains distribution key, so we don't need vnode bitmap
// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
7 changes: 4 additions & 3 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, WorkerSlotId,
ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
WorkerSlotId,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -408,8 +409,8 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
})
.collect();

// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
Expand Down
5 changes: 2 additions & 3 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use prometheus::Histogram;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -107,8 +107,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
};

let chunk_size = source.context.get_config().developer.chunk_size as u32;
Expand Down
5 changes: 2 additions & 3 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand Down Expand Up @@ -210,8 +210,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
};

let scan_ranges = {
Expand Down
9 changes: 4 additions & 5 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use anyhow::{anyhow, Result};
use futures::{pin_mut, StreamExt};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::VirtualNode;
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_rpc_client::MetaClient;
Expand Down Expand Up @@ -54,6 +53,7 @@ pub fn print_table_catalog(table: &TableCatalog) {
println!("{:#?}", table);
}

// TODO: shall we work on `TableDesc` instead?
pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> StateTable<S> {
StateTable::new_with_distribution(
hummock,
Expand All @@ -65,13 +65,13 @@ pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -
.collect(),
table.pk().iter().map(|x| x.order_type).collect(),
table.pk().iter().map(|x| x.column_index).collect(),
// TODO(var-vnode): use vnode count from table desc
TableDistribution::all(table.distribution_key().to_vec(), VirtualNode::COUNT), // scan all vnodes
TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count()), // scan all vnodes
Some(table.value_indices.clone()),
)
.await
}

// TODO: shall we work on `TableDesc` instead?
pub fn make_storage_table<S: StateStore>(
hummock: S,
table: &TableCatalog,
Expand All @@ -84,8 +84,7 @@ pub fn make_storage_table<S: StateStore>(
Ok(StorageTable::new_partial(
hummock,
output_columns_ids,
// TODO(var-vnode): use vnode count from table desc
Some(Bitmap::ones(VirtualNode::COUNT).into()),
Some(Bitmap::ones(table.vnode_count()).into()),
&table.table_desc().try_to_protobuf()?,
))
}
Expand Down
22 changes: 15 additions & 7 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ pub struct TableCatalog {

pub cdc_table_id: Option<String>,

pub vnode_count: usize,
/// Can be unset.
pub vnode_count: Option<usize>,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -333,6 +334,9 @@ impl TableCatalog {
}

/// Get a [`TableDesc`] of the table.
///
/// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
/// (which is determined by the meta service) and panic.
pub fn table_desc(&self) -> TableDesc {
use risingwave_common::catalog::TableOption;

Expand All @@ -351,7 +355,7 @@ impl TableCatalog {
watermark_columns: self.watermark_columns.clone(),
versioned: self.version.is_some(),
vnode_col_index: self.vnode_col_index,
vnode_count: self.vnode_count,
vnode_count: self.vnode_count(),
}
}

Expand Down Expand Up @@ -387,6 +391,11 @@ impl TableCatalog {
self.version().map(|v| v.version_id)
}

pub fn vnode_count(&self) -> usize {
self.vnode_count
.expect("vnode count unset, are you calling on an incomplete table catalog?")
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbTable {
PbTable {
id: self.id.table_id,
Expand Down Expand Up @@ -432,7 +441,7 @@ impl TableCatalog {
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
retention_seconds: self.retention_seconds,
cdc_table_id: self.cdc_table_id.clone(),
maybe_vnode_count: Some(self.vnode_count as _),
maybe_vnode_count: self.vnode_count.map(|v| v as _),
}
}

Expand Down Expand Up @@ -609,7 +618,7 @@ impl From<PbTable> for TableCatalog {
.map(TableId::from)
.collect_vec(),
cdc_table_id: tb.cdc_table_id,
vnode_count,
vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */
}
}
}
Expand All @@ -630,7 +639,6 @@ impl OwnedByUserCatalog for TableCatalog {
mod tests {

use risingwave_common::catalog::{row_id_column_desc, ColumnDesc, ColumnId};
use risingwave_common::hash::VirtualNode;
use risingwave_common::test_prelude::*;
use risingwave_common::types::*;
use risingwave_common::util::sort_util::OrderType;
Expand Down Expand Up @@ -701,7 +709,7 @@ mod tests {
initialized_at_cluster_version: None,
version_column_index: None,
cdc_table_id: None,
maybe_vnode_count: None,
maybe_vnode_count: Some(233),
}
.into();

Expand Down Expand Up @@ -765,7 +773,7 @@ mod tests {
dependent_relations: vec![],
version_column_index: None,
cdc_table_id: None,
vnode_count: VirtualNode::COUNT,
vnode_count: Some(233),
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
Expand Down Expand Up @@ -284,7 +283,7 @@ impl StreamMaterialize {
created_at_cluster_version: None,
retention_seconds: retention_seconds.map(|i| i.into()),
cdc_table_id: None,
vnode_count: VirtualNode::COUNT, // TODO(var-vnode): use vnode count from session config
vnode_count: None, // will be filled in by the meta service later
})
}

Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_common::catalog::{
use risingwave_common::constants::log_store::v2::{
KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use crate::catalog::table_catalog::TableType;
Expand Down Expand Up @@ -180,7 +179,7 @@ impl TableCatalogBuilder {
created_at_cluster_version: None,
retention_seconds: None,
cdc_table_id: None,
vnode_count: VirtualNode::COUNT, /* TODO(var-vnode): use vnode count from session config, */
vnode_count: None, // will be filled in by the meta service later
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ pub(crate) mod tests {
initialized_at_cluster_version: None,
created_at_cluster_version: None,
cdc_table_id: None,
vnode_count: VirtualNode::COUNT_FOR_TEST,
vnode_count: Some(VirtualNode::COUNT_FOR_TEST),
};
let batch_plan_node: PlanRef = LogicalScan::create(
"".to_string(),
Expand Down

0 comments on commit c4772c9

Please sign in to comment.