Skip to content

Commit

Permalink
support customizing the crossengine namespace (#632)
Browse files Browse the repository at this point in the history
  • Loading branch information
XN137 committed Sep 18, 2024
1 parent faf6d55 commit a16a7ca
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class ITCrossEngine {

private static final String NAMESPACE = System.getProperty("nesqueit.namespace", "db");

private static String sparkTableIdentifier(String tableName) {
return String.format("nessie.%s.%s", NAMESPACE, tableName);
}

@Order(20)
@Test
public void createNamespace(@NessieAPI NessieApiV1 nessie, @NessieDefaultBranch String branch)
throws Exception {
try {
nessie.createNamespace().namespace("db").refName(branch).create();
nessie.createNamespace().namespace(NAMESPACE).refName(branch).create();
} catch (NessieNamespaceAlreadyExistsException ignore) {
// ignore
}
Expand All @@ -64,7 +70,7 @@ public void createNamespace(@NessieAPI NessieApiV1 nessie, @NessieDefaultBranch
@Order(100)
@Test
public void createTables(@Spark SparkSession spark) {
spark.sql("CREATE TABLE nessie.db.from_spark (id int, val string)");
spark.sql(format("CREATE TABLE %s (id int, val string)", sparkTableIdentifier("from_spark")));
}

@Order(101)
Expand All @@ -87,7 +93,7 @@ private static void tableAddRow(String table, Object... values) {
@ParameterizedTest
@MethodSource("tables")
public void insertIntoSpark(String table, @Spark SparkSession spark) {
spark.sql(format("INSERT INTO nessie.db.%s select 100, \"from-spark\"", table));
spark.sql(format("INSERT INTO %s select 100, \"from-spark\"", sparkTableIdentifier(table)));
tableAddRow(table, 100, "from-spark");
}

Expand All @@ -105,7 +111,10 @@ public void insertIntoFlink(String table, @Flink FlinkHelper flink) {
@MethodSource("tables")
public void selectFromSpark(String table, @Spark SparkSession spark) {
assertThat(
spark.sql(format("SELECT id, val FROM nessie.db.%s", table)).collectAsList().stream()
spark
.sql(format("SELECT id, val FROM %s", sparkTableIdentifier(table)))
.collectAsList()
.stream()
.map(r -> asList(r.get(0), r.get(1))))
.containsExactlyInAnyOrderElementsOf(tableRows.get(table));
}
Expand All @@ -123,7 +132,7 @@ public void selectFromFlink(String table, @Flink FlinkHelper flink) {
@ParameterizedTest
@MethodSource("tables")
public void insertIntoSpark2(String table, @Spark SparkSession spark) {
spark.sql(format("INSERT INTO nessie.db.%s select 101, \"from-spark\"", table));
spark.sql(format("INSERT INTO %s select 101, \"from-spark\"", sparkTableIdentifier(table)));
tableAddRow(table, 101, "from-spark");
}

Expand All @@ -141,7 +150,10 @@ public void insertIntoFlink2(String table, @Flink FlinkHelper flink) {
@MethodSource("tables")
public void selectFromSpark2(String table, @Spark SparkSession spark) {
assertThat(
spark.sql(format("SELECT id, val FROM nessie.db.%s", table)).collectAsList().stream()
spark
.sql(format("SELECT id, val FROM %s", sparkTableIdentifier(table)))
.collectAsList()
.stream()
.map(r -> asList(r.get(0), r.get(1))))
.containsExactlyInAnyOrderElementsOf(tableRows.get(table));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
})
public class ITCrossEngineDremio {

private static final String NAMESPACE = "db";
private static final String NAMESPACE = System.getProperty("nesqueit.namespace", "db");
private static final String DREMIO_TABLE = "from_dremio";
private static final String SPARK_TABLE = "from_spark";
private static final String FLINK_TABLE = "from_flink";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
})
public class ITCrossEngineDremioPermissions {

private static final String NAMESPACE = "db";
private static final String NAMESPACE = System.getProperty("nesqueit.namespace", "db");
private static final String READ_ONLY_TABLE = "dremio_readonly"; // pre-created on the outside
private static final String EXPECTED_PERMISSION_ERROR =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Object resolveParameter(
private static class FlinkPerRun implements CloseableResource, FlinkHelper {
private final FlinkHolder flinkHolder;
private final String catalogName;
private final String databaseName = "db";
private final String databaseName = System.getProperty("nesqueit.namespace", "db");

static FlinkPerRun get(ExtensionContext extensionContext, Flink flinkConfig) {
return extensionContext
Expand Down

0 comments on commit a16a7ca

Please sign in to comment.