Skip to content

Commit

Permalink
remove select * from PCOLLECTION to a separated PR.
Browse files Browse the repository at this point in the history
  • Loading branch information
mingmxu committed Jun 22, 2017
1 parent f7e4a51 commit 8cb47dd
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 18 deletions.
18 changes: 5 additions & 13 deletions dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
.apply(...);
//run a simple query, and register the output as a table in BeamSql;
String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)
.withUdf("MY_FUNC", myFunc));
Expand Down Expand Up @@ -151,20 +151,19 @@ private void registerTables(PCollectionTuple input){
*/
private static class SimpleQueryTransform
extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
BeamSqlEnv sqlEnv = new BeamSqlEnv();
private String sqlQuery;

public SimpleQueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
validateQuery();
}

// public SimpleQueryTransform withUdf(String udfName){
// throw new UnsupportedOperationException("Pending for UDF support");
// }

private void validateQuery() {
@Override
public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
SqlNode sqlNode;
try {
sqlNode = sqlEnv.planner.parseQuery(sqlQuery);
Expand All @@ -176,19 +175,12 @@ private void validateQuery() {
if (sqlNode instanceof SqlSelect) {
SqlSelect select = (SqlSelect) sqlNode;
String tableName = select.getFrom().toString();
if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
}
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
.apply(new QueryTransform(sqlQuery, sqlEnv));
} else {
throw new UnsupportedOperationException(
"Sql operation: " + sqlNode.toString() + " is not supported!");
}
}

@Override
public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
.apply(new QueryTransform(sqlQuery, sqlEnv));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
*/
@Test
public void testAggregationWithoutWindow() throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2";

PCollection<BeamSqlRow> result =
inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testAggregationFunctions() throws Exception{
*/
@Test
public void testDistinct() throws Exception {
String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
String sql = "SELECT distinct f_int, f_long FROM TABLE_A ";

PCollection<BeamSqlRow> result =
inputA1.apply("testDistinct", BeamSql.simpleQuery(sql));
Expand Down Expand Up @@ -190,7 +190,7 @@ public void testTumbleWindow() throws Exception {
*/
@Test
public void testHopWindow() throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION "
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+ "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
PCollection<BeamSqlRow> result =
inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
*/
@Test
public void testSingleFilter() throws Exception {
String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
String sql = "SELECT * FROM TABLE_A WHERE f_int = 1";

PCollection<BeamSqlRow> result =
inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
*/
@Test
public void testSelectAll() throws Exception {
String sql = "SELECT * FROM PCOLLECTION";
String sql = "SELECT * FROM TABLE_A";

PCollection<BeamSqlRow> result =
inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));
Expand Down

0 comments on commit 8cb47dd

Please sign in to comment.