Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): support default column in auto schema mapping #18571

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ mysql --protocol=tcp -u root mytest -e "
);
INSERT INTO mysql_types_test VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null, 'happy', '[1,2]');
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
CREATE TABLE IF NOT EXISTS test_default(
id int,
name varchar(255) DEFAULT 'default_name',
age int DEFAULT 18,
v1 real DEFAULT 1.1,
v2 double precision DEFAULT 2.2,
v3 decimal(5,2) DEFAULT 3.3,
v4 boolean DEFAULT false,
v5 date DEFAULT '2020-01-01',
v6 time DEFAULT '12:34:56',
v7 timestamp DEFAULT '2020-01-01 12:34:56',
v8 datetime DEFAULT '2020-01-01 12:34:56',
PRIMARY KEY (id)
);
INSERT INTO test_default(id) VALUES (1),(2);
"

statement ok
Expand Down Expand Up @@ -70,6 +85,18 @@ HINT: Please define the schema manually
statement ok
ALTER SYSTEM SET license_key TO DEFAULT;

statement ok
create table test_default (*) from mysql_source table 'mytest.test_default';

sleep 3s

query TTTTTTTTTTTTT
SELECT * FROM test_default order by id;
----
1 default_name 18 1.1 2.2 3.30 0 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 2020-01-01 12:34:56
2 default_name 18 1.1 2.2 3.30 0 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 2020-01-01 12:34:56


statement ok
create table rw_customers (*) from mysql_source table 'mytest.customers';

Expand Down
37 changes: 37 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ psql -c "
INSERT INTO postgres_types_test VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,sad}');
INSERT INTO postgres_types_test VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'a_varchar', 'DEADBEEF'::bytea, '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', to_jsonb('hello'::text), '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'a_varchar'], ARRAY[NULL, 'DEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, to_jsonb('hello'::text)], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]);
INSERT INTO postgres_types_test VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
CREATE TABLE IF NOT EXISTS test_default(
id int,
name varchar(255) DEFAULT 'default_name',
age int DEFAULT 18,
v1 real DEFAULT 1.1,
v2 double precision DEFAULT 2.2,
v3 numeric DEFAULT 3.3,
v4 boolean DEFAULT false,
v5 date DEFAULT '2020-01-01',
v6 time DEFAULT '12:34:56',
v7 timestamp DEFAULT '2020-01-01 12:34:56',
v8 timestamptz DEFAULT '2020-01-01 12:34:56+00',
v9 interval DEFAULT '1 day',
v10 jsonb DEFAULT '{}',
PRIMARY KEY (id)
);
INSERT INTO test_default(id,name,age) VALUES (1, 'name1', 20), (2, 'name2', 21), (3, 'name3', 22);
"

statement ok
Expand All @@ -58,6 +75,26 @@ create source pg_source with (
slot.name = 'pg_slot'
);

statement ok
create table test_default (*) from pg_source table 'public.test_default';

sleep 3s

statement ok
insert into test_default(id) values (4),(5);

statement ok
FLUSH;

query TTTTTTTTTTTTT
SELECT * from test_default order by id;
----
1 name1 20 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
2 name2 21 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
3 name3 22 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
4 default_name 18 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
5 default_name 18 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}


statement ok
create table rw_postgres_types_test (*) from pg_source table 'public.postgres_types_test';
Expand Down
16 changes: 14 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::borrow::Cow;

use itertools::Itertools;
use risingwave_common::types::Datum;
use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{
Expand All @@ -24,6 +26,7 @@ use risingwave_pb::plan_common::{
use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID};
use crate::types::DataType;
use crate::util::value_encoding::DatumToProtoExt;

/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
/// not globally unique.
Expand Down Expand Up @@ -144,10 +147,19 @@ impl ColumnDesc {
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
default_val: DefaultColumnDesc,
snapshot_value: Datum,
) -> ColumnDesc {
let default_col = DefaultColumnDesc {
expr: Some(ExprNode {
// equivalent to `Literal::to_expr_proto`
function_type: ExprType::Unspecified as i32,
return_type: Some(data_type.to_protobuf()),
rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
}),
snapshot_value: Some(snapshot_value.to_protobuf()),
};
ColumnDesc {
generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_val)),
generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_col)),
..Self::named(name, column_id, data_type)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def_anyhow_newtype! {
url::ParseError => "failed to parse url",
serde_json::Error => "failed to parse json",
csv::Error => "failed to parse csv",
rust_decimal::Error => transparent,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some context must be attached when converting a rust_decimal::Error to ConnectorError. If it's the case, could you remove the line here and explicitly call .context() on the result?


uuid::Error => transparent, // believed to be self-explanatory

Expand Down
15 changes: 1 addition & 14 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ use risingwave_common::types::{
DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef,
ToOwnedDatum,
};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::additional_column::ColumnType;
use risingwave_pb::plan_common::DefaultColumnDesc;
use thiserror_ext::AsReport;

use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
Expand Down Expand Up @@ -240,20 +236,11 @@ pub fn parse_schema_change(
}},
)?,
);
// equivalent to `Literal::to_expr_proto`
let default_val_expr_node = ExprNode {
function_type: ExprType::Unspecified as i32,
return_type: Some(data_type.to_protobuf()),
rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
};
ColumnDesc::named_with_default_value(
name,
ColumnId::placeholder(),
data_type,
DefaultColumnDesc {
expr: Some(default_val_expr_node),
snapshot_value: Some(snapshot_value.to_protobuf()),
},
snapshot_value,
)
}
_ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
Expand Down
55 changes: 48 additions & 7 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ use mysql_common::value::Value;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, OFFSET_COLUMN_NAME};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, Decimal, ScalarImpl, F32};
use risingwave_common::util::iter_util::ZipEqFast;
use sea_schema::mysql::def::{ColumnKey, ColumnType};
use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
use sea_schema::mysql::discovery::SchemaDiscovery;
use sea_schema::mysql::query::SchemaQueryBuilder;
use sea_schema::sea_query::{Alias, IntoIden};
use serde_derive::{Deserialize, Serialize};
use sqlx::mysql::MySqlConnectOptions;
use sqlx::MySqlPool;
use thiserror_ext::AsReport;

use crate::error::{ConnectorError, ConnectorResult};
use crate::source::cdc::external::{
Expand Down Expand Up @@ -112,11 +113,51 @@ impl MySqlExternalTable {
let data_type = mysql_type_to_rw_type(&col.col_type)?;
// column name in mysql is case-insensitive, convert to lowercase
let col_name = col.name.to_lowercase();
column_descs.push(ColumnDesc::named(
col_name.clone(),
ColumnId::placeholder(),
data_type,
));
let column_desc = if let Some(default) = col.default {
let snapshot_value = match default {
ColumnDefault::Null => None,
ColumnDefault::Int(val) => match data_type {
DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
DataType::Int64 => Some(ScalarImpl::Int64(val)),
_ => {
unreachable!("unexpected default value type for integer column")
}
Comment on lines +123 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the default and data_type are from external systems, shall we return an error instead of panicking here?

},
ColumnDefault::Real(val) => match data_type {
DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
DataType::Decimal => Some(ScalarImpl::Decimal(Decimal::try_from(val)?)),
_ => {
unreachable!("unexpected default value type for float column")
}
},
ColumnDefault::String(val) | ColumnDefault::CustomExpr(val) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we involve CustomExpr here?

match ScalarImpl::from_text(val.as_str(), &data_type) {
Ok(scalar) => Some(scalar),
Err(err) => {
tracing::warn!(error=%err.as_report(), "failed to parse mysql default value expression, only constant is supported");
None
}
}
}
ColumnDefault::CurrentTimestamp => {
tracing::warn!("MySQL CURRENT_TIMESTAMP default value not supported");
None
}
};

ColumnDesc::named_with_default_value(
col_name.clone(),
ColumnId::placeholder(),
data_type.clone(),
snapshot_value,
)
} else {
ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
};

column_descs.push(column_desc);
if matches!(col.key, ColumnKey::Primary) {
pk_names.push(col_name);
}
Expand Down
33 changes: 27 additions & 6 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres_openssl::MakeTlsConnector;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, StructType};
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_common::util::iter_util::ZipEqFast;
use sea_schema::postgres::def::{ColumnType, TableInfo};
use sea_schema::postgres::discovery::SchemaDiscovery;
Expand Down Expand Up @@ -123,11 +123,32 @@ impl PostgresExternalTable {
let mut column_descs = vec![];
for col in &table_schema.columns {
let data_type = type_to_rw_type(&col.col_type)?;
column_descs.push(ColumnDesc::named(
col.name.clone(),
ColumnId::placeholder(),
data_type,
));
let column_desc = if let Some(ref default_expr) = col.default {
// parse the value of "column_default" field in information_schema.columns,
// non number data type will be stored as "'value'::type"
let val_text = default_expr
.0
.split("::")
.map(|s| s.trim_matches('\''))
.next()
.expect("default value expression");

match ScalarImpl::from_text(val_text, &data_type) {
Ok(scalar) => ColumnDesc::named_with_default_value(
col.name.clone(),
ColumnId::placeholder(),
data_type.clone(),
Some(scalar),
),
Err(err) => {
tracing::warn!(error=%err.as_report(), "failed to parse postgres default value expression, only constant is supported");
ColumnDesc::named(col.name.clone(), ColumnId::placeholder(), data_type)
}
}
} else {
ColumnDesc::named(col.name.clone(), ColumnId::placeholder(), data_type)
};
column_descs.push(column_desc);
}

if table_schema.primary_key_constraints.is_empty() {
Expand Down
Loading