Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2446] restrict scope of BeamSqlEnv in dsl query #3372

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
*/
package org.apache.beam.dsls.sql;

import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;
import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable;

import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
Expand Down Expand Up @@ -47,17 +44,15 @@
Pipeline p = Pipeline.create(options);

//create table from TextIO;
TableSchema tableASchema = ...;
PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
.apply(BeamSql.fromTextRow(tableASchema));
TableSchema tableBSchema = ...;
.apply(...);
PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
.apply(BeamSql.fromTextRow(tableBSchema));
.apply(...);

//run a simple query, and register the output as a table in BeamSql;
String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1))
.withUdf("MY_FUNC", myFunc);
String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)
.withUdf("MY_FUNC", myFunc));

//run a JOIN with one table from TextIO, and one table from another query
PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
Expand Down Expand Up @@ -107,35 +102,47 @@ public static PTransform<PCollectionTuple, PCollection<BeamSqlRow>> query(String
*/
private static class QueryTransform extends
PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
private BeamSqlEnv sqlEnv;
private String sqlQuery;

public QueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
sqlEnv = new BeamSqlEnv();
}

public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) {
this.sqlQuery = sqlQuery;
this.sqlEnv = sqlEnv;
}

@Override
public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
//register tables
for (TupleTag<?> sourceTag : input.getAll().keySet()) {
PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();

registerTable(sourceTag.getId(),
new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
}
registerTables(input);

BeamRelNode beamRelNode = null;
try {
beamRelNode = planner.convertToBeamRel(sqlQuery);
beamRelNode = sqlEnv.planner.convertToBeamRel(sqlQuery);
} catch (ValidationException | RelConversionException | SqlParseException e) {
throw new IllegalStateException(e);
}

try {
return beamRelNode.buildBeamPipeline(input);
return beamRelNode.buildBeamPipeline(input, sqlEnv);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

//register tables, related with input PCollections.
private void registerTables(PCollectionTuple input){
for (TupleTag<?> sourceTag : input.getAll().keySet()) {
PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();

sqlEnv.registerTable(sourceTag.getId(),
new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
}
}
}

/**
Expand All @@ -144,34 +151,44 @@ public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
*/
private static class SimpleQueryTransform
extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
BeamSqlEnv sqlEnv = new BeamSqlEnv();
private String sqlQuery;

public SimpleQueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
validateQuery();
}

public SimpleQueryTransform withUdf(String udfName){
throw new UnsupportedOperationException("Pending for UDF support");
}
// public SimpleQueryTransform withUdf(String udfName){
// throw new UnsupportedOperationException("Pending for UDF support");
// }

@Override
public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
private void validateQuery() {
SqlNode sqlNode;
try {
sqlNode = planner.parseQuery(sqlQuery);
planner.getPlanner().close();
sqlNode = sqlEnv.planner.parseQuery(sqlQuery);
sqlEnv.planner.getPlanner().close();
} catch (SqlParseException e) {
throw new IllegalStateException(e);
}

if (sqlNode instanceof SqlSelect) {
SqlSelect select = (SqlSelect) sqlNode;
String tableName = select.getFrom().toString();
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
.apply(BeamSql.query(sqlQuery));
if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
}
} else {
throw new UnsupportedOperationException(
"Sql operation: " + sqlNode.toString() + " is not supported!");
}
}

@Override
public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
.apply(new QueryTransform(sqlQuery, sqlEnv));
}
}
}
18 changes: 8 additions & 10 deletions dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.dsls.sql;

import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;

import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -33,35 +31,35 @@
*/
@Experimental
public class BeamSqlCli {

/**
* Returns a human readable representation of the query execution plan.
*/
public static String explainQuery(String sqlString) throws Exception {
BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
String beamPlan = RelOptUtil.toString(exeTree);
return beamPlan;
}

/**
* compile SQL, and return a {@link Pipeline}.
*/
public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement) throws Exception{
public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
throws Exception{
PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
.as(PipelineOptions.class); // FlinkPipelineOptions.class
options.setJobName("BeamPlanCreator");
Pipeline pipeline = Pipeline.create(options);

return compilePipeline(sqlStatement, pipeline);
return compilePipeline(sqlStatement, pipeline, sqlEnv);
}

/**
* compile SQL, and return a {@link Pipeline}.
*/
public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
throws Exception{
public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline
, BeamSqlEnv sqlEnv) throws Exception{
PCollection<BeamSqlRow> resultStream =
planner.compileBeamPipeline(sqlStatement, basePipeline);
sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
return resultStream;
}
}
12 changes: 6 additions & 6 deletions dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@
* a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
*/
public class BeamSqlEnv {
static SchemaPlus schema;
static BeamQueryPlanner planner;
SchemaPlus schema;
BeamQueryPlanner planner;

static {
public BeamSqlEnv() {
schema = Frameworks.createRootSchema(true);
planner = new BeamQueryPlanner(schema);
}

/**
* Register a UDF function which can be used in SQL expression.
*/
public static void registerUdf(String functionName, Class<?> clazz, String methodName) {
public void registerUdf(String functionName, Class<?> clazz, String methodName) {
schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
}

/**
* Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
*
*/
public static void registerTable(String tableName, BaseBeamTable table) {
public void registerTable(String tableName, BaseBeamTable table) {
schema.add(tableName, new BeamCalciteTable(table.getRecordType()));
planner.getSourceTables().put(tableName, table);
}

/**
* Find {@link BaseBeamTable} by table name.
*/
public static BaseBeamTable findTable(String tableName){
public BaseBeamTable findTable(String tableName){
return planner.getSourceTables().get(tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ public static void main(String[] args) throws Exception {

//Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
PCollection<BeamSqlRow> outputStream = inputTable.apply(
BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1"));
BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1"));

//log out the output record;
outputStream.apply("log_result",
MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
public Void apply(BeamSqlRow input) {
System.out.println("TABLE_A: " + input);
System.out.println("PCOLLECTION: " + input);
return null;
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public abstract BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive le

/**
* The method to check whether operands are numeric or not.
* @param opType
*/
public boolean isOperandNumeric(SqlTypeName opType) {
return SqlTypeName.NUMERIC_TYPES.contains(opType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
Expand Down Expand Up @@ -106,12 +107,12 @@ public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
* which is linked with the given {@code pipeline}. The final output stream is returned as
* {@code PCollection} so more operations can be applied.
*/
public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
throws Exception {
public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
, BeamSqlEnv sqlEnv) throws Exception {
BeamRelNode relNode = convertToBeamRel(sqlStatement);

// the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline));
return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
Expand Down Expand Up @@ -72,13 +73,13 @@ public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
}

@Override
public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {
public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);

PCollection<BeamSqlRow> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
if (windowFieldIdx != -1) {
upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps
.<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.rel;

import org.apache.beam.dsls.sql.BeamSqlEnv;
import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
Expand Down Expand Up @@ -49,14 +50,13 @@ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
}

@Override
public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {

public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);

PCollection<BeamSqlRow> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);

BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,17 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
* which is the persisted PCollection.
*/
@Override
public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {

public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
String stageName = BeamSqlRelUtils.getStageName(this);

PCollection<BeamSqlRow> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);

String sourceName = Joiner.on('.').join(getTable().getQualifiedName());

BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName);
BaseBeamTable targetTable = sqlEnv.findTable(sourceName);

PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable
}

@Override
public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
throws Exception {

public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());

String stageName = BeamSqlRelUtils.getStageName(this);
Expand All @@ -55,7 +54,7 @@ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollecti
return sourceStream;
} else {
//If not, the source PColection is provided with BaseBeamTable.buildIOReader().
BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName);
BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
return sourceTable.buildIOReader(inputPCollections.getPipeline());
}
}
Expand Down
Loading