Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tuple ClickHouse #29715

Merged
merged 15 commits into from
Jan 2, 2024
Merged
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Adding support for Tuples DataType in ClickHouse (Java) ([#29715](https://github.com/apache/beam/pull/29715)).


## New Features / Improvements

Expand Down Expand Up @@ -98,10 +100,10 @@

* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).
* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))
* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))
* Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
* NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)).
* Adding support for LowCardinality (Java) ([#29533](https://github.com/apache/beam/pull/29533)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -109,6 +110,7 @@
* <tr><td>{@link TableSchema.TypeName#ENUM8}</td> <td>{@link Schema.TypeName#STRING}</td></tr>
* <tr><td>{@link TableSchema.TypeName#ENUM16}</td> <td>{@link Schema.TypeName#STRING}</td></tr>
* <tr><td>{@link TableSchema.TypeName#BOOL}</td> <td>{@link Schema.TypeName#BOOLEAN}</td></tr>
* <tr><td>{@link TableSchema.TypeName#TUPLE}</td> <td>{@link Schema.TypeName#ROW}</td></tr>
* </table>
*
* Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is
Expand Down Expand Up @@ -475,6 +477,15 @@ abstract static class Builder<T> {
}
}

private static String tuplePreprocessing(String payload) {
List<String> l =
Arrays.stream(payload.trim().split(","))
.map(s -> s.trim().replaceAll(" +", "' "))
.collect(Collectors.toList());
String content =
String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller has a condition if (type.toLowerCase().trim().startsWith("tuple(")) { below, so here payload always (case insensitive) starts with "tuple(", so here the replace of "Tuple\(" won't work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition is on with lower case, but the actual parsing is on the original string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean below when " if (type.toLowerCase().trim().startsWith("tuple(")) {" the this preprocessing will be executed.

So Tuple( (with backslash) is considered here but not the condition in the caller there. Coild this cause issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Would you mind adding some comments about the proprocessing rule and example, for example, sth like // Tuple(a String, b Integer) -> ...

return content;
}
/**
* Returns {@link TableSchema} for a given table.
*
Expand All @@ -498,7 +509,13 @@ public static TableSchema getTableSchema(String jdbcUrl, String table) {
String defaultTypeStr = rs.getString("default_type");
String defaultExpression = rs.getString("default_expression");

ColumnType columnType = ColumnType.parse(type);
ColumnType columnType = null;
if (type.toLowerCase().trim().startsWith("tuple(")) {
String content = tuplePreprocessing(type);
columnType = ColumnType.parse(content);
} else {
columnType = ColumnType.parse(type);
}
DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null);

Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.clickhouse.client.ClickHousePipedOutputStream;
import com.clickhouse.client.data.BinaryStreamUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithStorage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Days;
Expand Down Expand Up @@ -146,6 +148,20 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj
case BOOL:
BinaryStreamUtils.writeBoolean(stream, (Boolean) value);
break;
case TUPLE:
RowWithStorage rowValues = (RowWithStorage) value;
List<Object> tupleValues = rowValues.getValues();
Collection<ColumnType> columnTypesList = columnType.tupleTypes().values();
int index = 0;
for (ColumnType ct : columnTypesList) {
if (ct.nullable()) {
writeNullableValue(stream, ct, tupleValues.get(index));
} else {
writeValue(stream, ct, tupleValues.get(index));
}
index++;
}
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -111,6 +112,14 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) {
return Schema.FieldType.STRING;
case BOOL:
return Schema.FieldType.BOOLEAN;
case TUPLE:
List<Schema.Field> fields =
columnType.tupleTypes().entrySet().stream()
.map(x -> Schema.Field.of(x.getKey(), Schema.FieldType.DATETIME))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mzitnik,
Shouldn't each key inside a tuple be mapped to their respective data type ? Any reason for being DATETIME?

Copy link

@wattache wattache Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

This quick adaptation seems to work:

case TUPLE:
     List<TableSchema.Column> columns = columnType.tupleTypes().entrySet().stream().map(
                              x -> TableSchema.Column.of(x.getKey(), x.getValue())
                         ).collect(Collectors.toList());
     TableSchema tupleSchema = TableSchema.of(columns.toArray(new TableSchema.Column[0]));
     Schema tupleAsRowSchema = getEquivalentSchema(tupleSchema);
     return Schema.FieldType.row(tupleAsRowSchema);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

Also, it seems that Array(Tuple(*)) is not supported when calling ClickHouseIO.getTableSchema.

This seems to fix the issue :
if (type.toLowerCase().trim().startsWith("tuple(")) { String content = tuplePreprocessing(type); columnType = TableSchema.ColumnType.parse(content); } else if (type.toLowerCase().trim().startsWith("array(tuple(")) { String content = tuplePreprocessing(type); columnType = TableSchema.ColumnType.parse(content); }

Copy link
Contributor Author

@mzitnik mzitnik Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erube and @wattache, thanks for the feedback i will fork and also add some tests to cover Array(Tuple(*))

.collect(Collectors.toList());
Schema.Field[] array = fields.toArray(new Schema.Field[fields.size()]);
Schema schema = Schema.of(array);
return Schema.FieldType.row(schema);
}

// not possible, errorprone checks for exhaustive switch
Expand Down Expand Up @@ -168,7 +177,9 @@ public enum TypeName {
// Composite type
ARRAY,
// Primitive type
BOOL
BOOL,
// Composite type
TUPLE
}

/**
Expand Down Expand Up @@ -208,6 +219,7 @@ public abstract static class ColumnType implements Serializable {
public static final ColumnType UINT32 = ColumnType.of(TypeName.UINT32);
public static final ColumnType UINT64 = ColumnType.of(TypeName.UINT64);
public static final ColumnType BOOL = ColumnType.of(TypeName.BOOL);
public static final ColumnType TUPLE = ColumnType.of(TypeName.TUPLE);

// ClickHouse doesn't allow nested nullables, so boolean flag is enough
public abstract boolean nullable();
Expand All @@ -220,6 +232,8 @@ public abstract static class ColumnType implements Serializable {

public abstract @Nullable ColumnType arrayElementType();

public abstract @Nullable Map<String, ColumnType> tupleTypes();

public ColumnType withNullable(boolean nullable) {
return toBuilder().nullable(nullable).build();
}
Expand Down Expand Up @@ -265,6 +279,14 @@ public static ColumnType array(ColumnType arrayElementType) {
.build();
}

public static ColumnType tuple(Map<String, ColumnType> elements) {
return ColumnType.builder()
.typeName(TypeName.TUPLE)
.nullable(false)
.tupleTypes(elements)
.build();
}

/**
* Parse string with ClickHouse type to {@link ColumnType}.
*
Expand Down Expand Up @@ -339,6 +361,8 @@ abstract static class Builder {

public abstract Builder fixedStringSize(Integer size);

public abstract Builder tupleTypes(Map<String, ColumnType> tupleElements);

public abstract ColumnType build();
}
}
Expand Down
47 changes: 47 additions & 0 deletions sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
options {
IGNORE_CASE=true;
DEBUG_PARSER = false;
DEBUG_LOOKAHEAD = false;
DEBUG_TOKEN_MANAGER = false;
STATIC = false;
}

PARSER_BEGIN(ColumnTypeParser)
Expand Down Expand Up @@ -99,6 +103,7 @@ TOKEN :
| < EQ : "=" >
| < BOOL : "BOOL" >
| < LOWCARDINALITY : "LOWCARDINALITY" >
| < TUPLE : "TUPLE" >
}

public ColumnType columnType() :
Expand All @@ -113,6 +118,7 @@ public ColumnType columnType() :
| ct = array()
| ct = nullable()
| ct = lowcardenality()
| ct = tuple()
)
{
return ct;
Expand Down Expand Up @@ -263,6 +269,33 @@ private Map<String, Integer> enumElements() :
}
}

private Map.Entry<String, ColumnType> tupleElement() :
{
String key;
ColumnType value;
Token token;
}
{
( (key = string() ) ( value = columnType() ) ) {
return Maps.immutableEntry(key, value);
}
}

private Map<String, ColumnType> tupleElements() :
{
Map.Entry<String, ColumnType> el;
List<Map.Entry<String, ColumnType>> entries = Lists.newArrayList();
}
{
(
( el = tupleElement() { entries.add(el); } )
( <COMMA> ( el = tupleElement() { entries.add(el); } ) )*
)
{
return ImmutableMap.copyOf(entries);
}
}

private ColumnType enum_() :
{
Map<String, Integer> elements;
Expand All @@ -289,4 +322,18 @@ private ColumnType lowcardenality() :
(
(<LOWCARDINALITY> <LPAREN> (ct = primitive()) <RPAREN>) { return ct; }
)
}

private ColumnType tuple() :
{
Map<String, ColumnType> elements;
}
{
(
(<TUPLE> <LPAREN> ( elements = tupleElements() ) <RPAREN>)
{
return ColumnType.tuple(elements);
}
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,84 @@ public void testArrayOfArrayOfInt64() throws Exception {
assertEquals(15L, sum0);
}

@Test
public void testTupleType() throws Exception {
Schema tupleSchema =
Schema.of(
Schema.Field.of("f0", FieldType.STRING), Schema.Field.of("f1", FieldType.BOOLEAN));
Schema schema = Schema.of(Schema.Field.of("t0", FieldType.row(tupleSchema)));
Row row1Tuple = Row.withSchema(tupleSchema).addValue("tuple").addValue(true).build();

Row row1 = Row.withSchema(schema).addValue(row1Tuple).build();

executeSql(
"CREATE TABLE test_named_tuples (" + "t0 Tuple(`f0` String, `f1` Bool)" + ") ENGINE=Log");

pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_tuples"));

pipeline.run().waitUntilFinish();

try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) {
rs.next();
assertEquals("('tuple',true)", rs.getString("t0"));
}

try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM test_named_tuples")) {
rs.next();
assertEquals("tuple", rs.getString("f0"));
assertEquals("true", rs.getString("f1"));
}
}

@Test
public void testComplexTupleType() throws Exception {
Schema sizeSchema =
Schema.of(
Schema.Field.of("width", FieldType.INT64.withNullable(true)),
Schema.Field.of("height", FieldType.INT64.withNullable(true)));

Schema browserSchema =
Schema.of(
Schema.Field.of("name", FieldType.STRING.withNullable(true)),
Schema.Field.of("size", FieldType.row(sizeSchema)),
Schema.Field.of("version", FieldType.STRING.withNullable(true)));

Schema propSchema =
Schema.of(
Schema.Field.of("browser", FieldType.row(browserSchema)),
Schema.Field.of("deviceCategory", FieldType.STRING.withNullable(true)));

Schema schema = Schema.of(Schema.Field.of("prop", FieldType.row(propSchema)));

Row sizeRow = Row.withSchema(sizeSchema).addValue(10L).addValue(20L).build();
Row browserRow =
Row.withSchema(browserSchema).addValue("test").addValue(sizeRow).addValue("1.0.0").build();
Row propRow = Row.withSchema(propSchema).addValue(browserRow).addValue("mobile").build();
Row row1 = Row.withSchema(schema).addValue(propRow).build();

executeSql(
"CREATE TABLE test_named_complex_tuples ("
+ "`prop` Tuple(`browser` Tuple(`name` Nullable(String),`size` Tuple(`width` Nullable(Int64), `height` Nullable(Int64)),`version` Nullable(String)),`deviceCategory` Nullable(String))"
+ ") ENGINE=Log");

pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_complex_tuples"));

pipeline.run().waitUntilFinish();

try (ResultSet rs = executeQuery("SELECT * FROM test_named_complex_tuples")) {
rs.next();
assertEquals("(('test',(10,20),'1.0.0'),'mobile')", rs.getString("prop"));
}

try (ResultSet rs =
executeQuery(
"SELECT prop.browser.name as name, prop.browser.size as size FROM test_named_complex_tuples")) {
rs.next();
assertEquals("test", rs.getString("name"));
assertEquals("(10,20)", rs.getString("size"));
}
}

@Test
public void testPrimitiveTypes() throws Exception {
Schema schema =
Expand Down
Loading
Loading