diff --git a/pom.xml b/pom.xml index 8325bcf9e93bc..bfbd0f63d963e 100644 --- a/pom.xml +++ b/pom.xml @@ -192,6 +192,7 @@ redis-hbo-provider presto-singlestore presto-hana + presto-base-arrow-flight diff --git a/presto-base-arrow-flight/pom.xml b/presto-base-arrow-flight/pom.xml new file mode 100644 index 0000000000000..daa797a2d2fbb --- /dev/null +++ b/presto-base-arrow-flight/pom.xml @@ -0,0 +1,409 @@ + + + 4.0.0 + + com.facebook.presto + presto-root + 0.289-SNAPSHOT + + presto-base-arrow-flight + presto-base-arrow-flight + arrow-flight Connector Plugin for Presto + + + ${project.parent.basedir} + 1.63.0 + 4.10.0 + 17.0.0 + 4.1.100.Final + + + + + + com.facebook.airlift + bootstrap + + + ch.qos.logback + logback-core + + + + + + com.facebook.airlift + log + + + + com.google.guava + guava + + + org.checkerframework + checker-qual + + + com.google.errorprone + error_prone_annotations + + + com.google.j2objc + j2objc-annotations + + + + + + javax.inject + javax.inject + + + + com.facebook.presto + presto-spi + provided + + + + io.airlift + slice + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.facebook.presto + presto-common + provided + + + + io.airlift + units + provided + + + + com.google.code.findbugs + jsr305 + true + + + + org.apache.arrow + arrow-memory-core + ${arrow.version} + + + + com.google.inject + guice + + + + com.facebook.airlift + configuration + + + + io.netty + netty-codec-http2 + ${netty.version} + + + + io.netty + netty-handler-proxy + ${netty.version} + + + io.netty + netty-codec-http + + + + + + io.netty + netty-tcnative-boringssl-static + 2.0.65.Final + + + + org.apache.arrow + flight-core + ${arrow.version} + + + io.netty + netty-codec-http2 + + + + io.netty + netty-handler-proxy + + + + io.netty + netty-common + + + + io.netty + netty-buffer + + + + io.netty + netty-handler + + + + io.netty + netty-codec + + + + io.netty + netty-transport + + + + com.google.j2objc + j2objc-annotations + + + + io.netty + netty-transport-native-unix-common + + + + + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + + + + org.testng + testng + test + + + + com.facebook.airlift + json + test + + + + org.mockito + mockito-core + 5.11.0 + + + org.objenesis + objenesis + + + test + + + + com.facebook.presto + presto-testng-services + test + + + + com.facebook.airlift + testing + test + ${dep.airlift.version} + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + + + + org.codehaus.mojo + animal-sniffer-annotations + 1.23 + + + + com.google.j2objc + j2objc-annotations + 1.3 + + + + com.google.errorprone + error_prone_annotations + 2.14.0 + + + + com.google.protobuf + protobuf-java + 3.25.1 + + + + io.grpc + grpc-protobuf + 1.63.0 + + + + + io.grpc + grpc-stub + 1.63.0 + + + + io.grpc + grpc-core + 1.63.0 + + + + io.grpc + grpc-api + 1.63.0 + + + + io.grpc + grpc-context + 1.63.0 + + + + commons-codec + commons-codec + 1.17.0 + + + + io.netty + netty-transport-native-unix-common + 4.1.100.Final + + + + io.netty + netty-common + 4.1.100.Final + + + + io.netty + netty-buffer + 4.1.100.Final + + + + + io.netty + netty-handler + 4.1.100.Final + + + + io.netty + netty-transport + 4.1.100.Final + + + + io.netty + netty-codec + 4.1.100.Final + + + + org.slf4j + slf4j-api + 2.0.13 + + + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + + + com.google.errorprone:error_prone_annotations + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + io.netty:netty-codec-http2 + io.netty:netty-handler-proxy + io.netty:netty-tcnative-boringssl-static + + + + + org.basepom.maven + duplicate-finder-maven-plugin + 1.2.1 + + + module-info + META-INF.versions.9.module-info + + + arrow-git.properties + + + + + + check + + + + + + + diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java new file mode 100644 index 0000000000000..1336676b1582f --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import java.util.Optional; + +public abstract class ArrowAbstractFlightRequest + implements ArrowFlightRequest +{ + private final String schema; + private final String table; + private final Optional query; + + public ArrowAbstractFlightRequest(String schema) + { + this.schema = schema; + this.query = Optional.empty(); + this.table = null; + } + + public ArrowAbstractFlightRequest(String schema, String table) + { + this.schema = schema; + this.table = table; + query = Optional.empty(); + } + + public ArrowAbstractFlightRequest(String schema, String table, Optional query) + { + this.schema = schema; + this.table = table; + this.query = query; + } + + public String getSchema() + { + return schema; + } + + public String getTable() + { + return table; + } + + public Optional getQuery() + { + return query; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java new file mode 100644 index 0000000000000..2769d057a13b3 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.DateType; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.common.type.RealType; +import com.facebook.presto.common.type.SmallintType; +import com.facebook.presto.common.type.TimeType; +import com.facebook.presto.common.type.TimestampType; +import com.facebook.presto.common.type.TinyintType; +import com.facebook.presto.common.type.VarbinaryType; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.NotFoundException; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR; +import static java.util.Objects.requireNonNull; + +public abstract class ArrowAbstractMetadata + implements ConnectorMetadata +{ + private static final Logger logger = Logger.get(ArrowAbstractMetadata.class); + private final ArrowFlightConfig config; + private final ArrowFlightClientHandler clientHandler; + + public ArrowAbstractMetadata(ArrowFlightConfig config, ArrowFlightClientHandler clientHandler) + { + this.config = config; + this.clientHandler = requireNonNull(clientHandler); + } + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + if (!listSchemaNames(session).contains(tableName.getSchemaName())) { + return null; + } + + if (!listTables(session, Optional.ofNullable(tableName.getSchemaName())).contains(tableName)) { + return null; + } + return new ArrowTableHandle(tableName.getSchemaName(), tableName.getTableName()); + } + + public List getColumnsList(String schema, String table, ConnectorSession connectorSession) + { + try { + String dataSourceSpecificSchemaName = getDataSourceSpecificSchemaName(config, schema); + String dataSourceSpecificTableName = getDataSourceSpecificTableName(config, table); + ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(), Optional.empty(), + dataSourceSpecificSchemaName, dataSourceSpecificTableName); + + FlightInfo flightInfo = clientHandler.getFlightInfo(request, connectorSession); + List fields = flightInfo.getSchema().getFields(); + return fields; + } + catch (Exception e) { + throw new ArrowException(ARROW_FLIGHT_ERROR, "The table columns could not be listed for the table " + table, e); + } + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) + { + Map column = new HashMap<>(); + + String schemaValue = ((ArrowTableHandle) tableHandle).getSchema(); + String tableValue = ((ArrowTableHandle) tableHandle).getTable(); + String dataSourceSpecificSchemaValue = getDataSourceSpecificSchemaName(config, schemaValue); + String dataSourceSpecificTableName = getDataSourceSpecificTableName(config, tableValue); + List columnList = getColumnsList(dataSourceSpecificSchemaValue, dataSourceSpecificTableName, session); + + for (Field field : columnList) { + String columnName = field.getName(); + logger.debug("The value of the flight columnName is:- %s", columnName); + switch (field.getType().getTypeID()) { + case Int: + ArrowType.Int intType = (ArrowType.Int) field.getType(); + switch (intType.getBitWidth()) { + case 64: + column.put(columnName, new ArrowColumnHandle(columnName, BigintType.BIGINT)); + break; + case 32: + column.put(columnName, new ArrowColumnHandle(columnName, IntegerType.INTEGER)); + break; + case 16: + column.put(columnName, new ArrowColumnHandle(columnName, SmallintType.SMALLINT)); + break; + case 8: + column.put(columnName, new ArrowColumnHandle(columnName, TinyintType.TINYINT)); + break; + default: + throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth()); + } + break; + case Binary: + case LargeBinary: + case FixedSizeBinary: + column.put(columnName, new ArrowColumnHandle(columnName, VarbinaryType.VARBINARY)); + break; + case Date: + column.put(columnName, new ArrowColumnHandle(columnName, DateType.DATE)); + break; + case Timestamp: + column.put(columnName, new ArrowColumnHandle(columnName, TimestampType.TIMESTAMP)); + break; + case Utf8: + case LargeUtf8: + column.put(columnName, new ArrowColumnHandle(columnName, VarcharType.VARCHAR)); + break; + case FloatingPoint: + ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType(); + switch (floatingPoint.getPrecision()) { + case SINGLE: + column.put(columnName, new ArrowColumnHandle(columnName, RealType.REAL)); + break; + case DOUBLE: + column.put(columnName, new ArrowColumnHandle(columnName, DoubleType.DOUBLE)); + break; + default: + throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision()); + } + break; + case Decimal: + ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType(); + int precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + column.put(columnName, new ArrowColumnHandle(columnName, DecimalType.createDecimalType(precision, scale))); + break; + case Bool: + column.put(columnName, new ArrowColumnHandle(columnName, BooleanType.BOOLEAN)); + break; + case Time: + column.put(columnName, new ArrowColumnHandle(columnName, TimeType.TIME)); + break; + default: + throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported."); + } + } + return column; + } + + @Override + public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns) + { + ArrowTableHandle tableHandle = (ArrowTableHandle) table; + + List columns = new ArrayList<>(); + if (desiredColumns.isPresent()) { + List arrowColumns = new ArrayList<>(desiredColumns.get()); + columns = (List) (List) arrowColumns; + } + + ConnectorTableLayout layout = new ConnectorTableLayout(new ArrowTableLayoutHandle(tableHandle, columns, constraint.getSummary(), Optional.empty())); + return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); + } + + @Override + public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) + { + return new ConnectorTableLayout(handle); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + List meta = new ArrayList<>(); + List columnList = getColumnsList(((ArrowTableHandle) table).getSchema(), ((ArrowTableHandle) table).getTable(), session); + + for (Field field : columnList) { + String columnName = field.getName(); + switch (field.getType().getTypeID()) { + case Int: + ArrowType.Int intType = (ArrowType.Int) field.getType(); + switch (intType.getBitWidth()) { + case 64: + meta.add(new ColumnMetadata(columnName, BigintType.BIGINT)); + break; + case 32: + meta.add(new ColumnMetadata(columnName, IntegerType.INTEGER)); + break; + case 16: + meta.add(new ColumnMetadata(columnName, SmallintType.SMALLINT)); + break; + case 8: + meta.add(new ColumnMetadata(columnName, TinyintType.TINYINT)); + break; + default: + throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth()); + } + break; + case Binary: + case LargeBinary: + case FixedSizeBinary: + meta.add(new ColumnMetadata(columnName, VarbinaryType.VARBINARY)); + break; + case Date: + meta.add(new ColumnMetadata(columnName, DateType.DATE)); + break; + case Timestamp: + meta.add(new ColumnMetadata(columnName, TimestampType.TIMESTAMP)); + break; + case Utf8: + case LargeUtf8: + meta.add(new ColumnMetadata(columnName, VarcharType.VARCHAR)); + break; + case FloatingPoint: + ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType(); + switch (floatingPoint.getPrecision()) { + case SINGLE: + meta.add(new ColumnMetadata(columnName, RealType.REAL)); + break; + case DOUBLE: + meta.add(new ColumnMetadata(columnName, DoubleType.DOUBLE)); + break; + default: + throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision()); + } + break; + case Decimal: + ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType(); + int precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + meta.add(new ColumnMetadata(columnName, DecimalType.createDecimalType(precision, scale))); + break; + case Time: + meta.add(new ColumnMetadata(columnName, TimeType.TIME)); + break; + case Bool: + meta.add(new ColumnMetadata(columnName, BooleanType.BOOLEAN)); + break; + default: + throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported."); + } + } + return new ConnectorTableMetadata(new SchemaTableName(((ArrowTableHandle) table).getSchema(), ((ArrowTableHandle) table).getTable()), meta); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + return ((ArrowColumnHandle) columnHandle).getColumnMetadata(); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + requireNonNull(prefix, "prefix is null"); + ImmutableMap.Builder> columns = ImmutableMap.builder(); + List tables; + if (prefix.getSchemaName() != null && prefix.getTableName() != null) { + tables = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); + } + else { + tables = listTables(session, Optional.of(prefix.getSchemaName())); + } + + for (SchemaTableName tableName : tables) { + try { + ConnectorTableHandle tableHandle = getTableHandle(session, tableName); + columns.put(tableName, getTableMetadata(session, tableHandle).getColumns()); + } + catch (ClassCastException | NotFoundException e) { + throw new ArrowException(ARROW_FLIGHT_ERROR, "The table columns could not be listed for the table " + tableName, e); + } + catch (Exception e) { + throw new ArrowException(ARROW_FLIGHT_ERROR, e.getMessage(), e); + } + } + return columns.build(); + } + + protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional query, String schema, String table); + + protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, String schema); + + protected abstract String getDataSourceSpecificSchemaName(ArrowFlightConfig config, String schemaName); + + protected abstract String getDataSourceSpecificTableName(ArrowFlightConfig config, String tableName); +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java new file mode 100644 index 0000000000000..ddd63a6ba5006 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import org.apache.arrow.flight.FlightInfo; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class ArrowAbstractSplitManager + implements ConnectorSplitManager +{ + private static final Logger logger = Logger.get(ArrowAbstractSplitManager.class); + private final ArrowFlightClientHandler clientHandler; + + public ArrowAbstractSplitManager(ArrowFlightClientHandler client) + { + this.clientHandler = client; + } + + @Override + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext) + { + ArrowTableLayoutHandle tableLayoutHandle = (ArrowTableLayoutHandle) layout; + ArrowTableHandle tableHandle = tableLayoutHandle.getTableHandle(); + ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(), + tableLayoutHandle); + + FlightInfo flightInfo = clientHandler.getFlightInfo(request, session); + List splits = flightInfo.getEndpoints() + .stream() + .map(info -> new ArrowSplit( + tableHandle.getSchema(), + tableHandle.getTable(), + info.getTicket().getBytes(), + info.getLocations().stream().map(location -> location.getUri().toString()).collect(Collectors.toList()))) + .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); + logger.info("created %d splits from arrow tickets", splits.size()); + return new FixedSplitSource(splits); + } + + protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle); +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java new file mode 100644 index 0000000000000..e935d994dccb0 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.common.type.Type; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +public class ArrowColumnHandle + implements ColumnHandle +{ + private final String columnName; + private final Type columnType; + + @JsonCreator + public ArrowColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("columnType") Type columnType) + { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.columnType = requireNonNull(columnType, "type is null"); + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public Type getColumnType() + { + return columnType; + } + + public ColumnMetadata getColumnMetadata() + { + return new ColumnMetadata(columnName, columnType); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java new file mode 100644 index 0000000000000..9523bd210faa5 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.transaction.IsolationLevel; +import com.google.inject.Inject; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class ArrowConnector + implements Connector +{ + private final ConnectorMetadata metadata; + private final ConnectorSplitManager splitManager; + private final ConnectorPageSourceProvider pageSourceProvider; + private final ConnectorHandleResolver handleResolver; + + @Inject + public ArrowConnector(ConnectorMetadata metadata, + ConnectorHandleResolver handleResolver, + ConnectorSplitManager splitManager, + ConnectorPageSourceProvider pageSourceProvider) + { + this.metadata = requireNonNull(metadata, "Metadata is null"); + this.handleResolver = requireNonNull(handleResolver, "Metadata is null"); + this.splitManager = requireNonNull(splitManager, "SplitManager is null"); + this.pageSourceProvider = requireNonNull(pageSourceProvider, "PageSinkProvider is null"); + } + + public Optional getHandleResolver() + { + return Optional.of(handleResolver); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + { + return ArrowTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorPageSourceProvider getPageSourceProvider() + { + return pageSourceProvider; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java new file mode 100644 index 0000000000000..f17317dd42b3a --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.relation.RowExpressionService; +import com.google.inject.ConfigurationException; +import com.google.inject.Injector; +import com.google.inject.Module; + +import java.util.Map; + +import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static java.util.Objects.requireNonNull; + +public class ArrowConnectorFactory + implements ConnectorFactory +{ + private final String name; + private final Module module; + private final ClassLoader classLoader; + + public ArrowConnectorFactory(String name, Module module, ClassLoader classLoader) + { + checkArgument(!isNullOrEmpty(name), "name is null or empty"); + this.name = name; + this.module = requireNonNull(module, "module is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return new ArrowHandleResolver(); + } + + @Override + public Connector create(String catalogName, Map requiredConfig, ConnectorContext context) + { + requireNonNull(requiredConfig, "requiredConfig is null"); + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + Bootstrap app = new Bootstrap( + binder -> { + binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager()); + binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); + binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService()); + binder.bind(NodeManager.class).toInstance(context.getNodeManager()); + }, + new ArrowModule(catalogName), + module); + + Injector injector = app + .doNotInitializeLogging() + .setRequiredConfigurationProperties(requiredConfig) + .initialize(); + + return injector.getInstance(ArrowConnector.class); + } + catch (ConfigurationException ex) { + throw new ArrowException(ARROW_FLIGHT_ERROR, "The connector instance could not be created.", ex); + } + catch (Exception e) { + throwIfUnchecked(e); + throw new RuntimeException(e); + } + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java new file mode 100644 index 0000000000000..dce08bac4ac24 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class ArrowConnectorId +{ + private final String id; + + public ArrowConnectorId(String id) + { + this.id = requireNonNull(id, "id is null"); + } + + @Override + public String toString() + { + return id; + } + + @Override + public int hashCode() + { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + ArrowConnectorId other = (ArrowConnectorId) obj; + return Objects.equals(this.id, other.id); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java new file mode 100644 index 0000000000000..2e33f736a62c5 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.common.ErrorCode; +import com.facebook.presto.common.ErrorType; +import com.facebook.presto.spi.ErrorCodeSupplier; + +import static com.facebook.presto.common.ErrorType.EXTERNAL; +import static com.facebook.presto.common.ErrorType.INTERNAL_ERROR; + +public enum ArrowErrorCode + implements ErrorCodeSupplier +{ + ARROW_INVALID_TABLE(0, EXTERNAL), + ARROW_INVALID_CREDENTAILS(1, EXTERNAL), + ARROW_FLIGHT_ERROR(2, EXTERNAL), + ARROW_INTERNAL_ERROR(3, INTERNAL_ERROR); + + private final ErrorCode errorCode; + + ArrowErrorCode(int code, ErrorType type) + { + errorCode = new ErrorCode(code + 0x0509_0000, name(), type); + } + + @Override + public ErrorCode toErrorCode() + { + return errorCode; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java new file mode 100644 index 0000000000000..ba2c6edba589c --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ErrorCodeSupplier; +import com.facebook.presto.spi.PrestoException; + +public class ArrowException + extends PrestoException +{ + public ArrowException(ErrorCodeSupplier errorCode, String message) + { + super(errorCode, message); + } + + public ArrowException(ErrorCodeSupplier errorCode, String message, Throwable cause) + { + super(errorCode, message, cause); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java new file mode 100644 index 0000000000000..b5c30bfcaf4a0 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.relation.ConstantExpression; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class ArrowExpression +{ + private final String expression; + private final List boundConstantValues; + + public ArrowExpression(String expression) + { + this(expression, ImmutableList.of()); + } + + @JsonCreator + public ArrowExpression( + @JsonProperty("translatedString") String expression, + @JsonProperty("boundConstantValues") List constantBindValues) + { + this.expression = requireNonNull(expression, "expression is null"); + this.boundConstantValues = requireNonNull(constantBindValues, "boundConstantValues is null"); + } + + @JsonProperty + public String getExpression() + { + return expression; + } + + /** + * Constant expressions are not added to the expression String. Instead they appear as "?" in the query. + * This is because we would potentially lose precision on double values. Hence when we make a PreparedStatement + * out of the SQL string replacing every "?" by it's corresponding actual bindValue. + * + * @return List of constants to replace in the SQL string. + */ + @JsonProperty + public List getBoundConstantValues() + { + return boundConstantValues; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ArrowExpression that = (ArrowExpression) o; + return expression.equals(that.expression) && + boundConstantValues.equals(that.boundConstantValues); + } + + @Override + public int hashCode() + { + return Objects.hash(expression, boundConstantValues); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java new file mode 100644 index 0000000000000..e6a6b2ec919ff --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.memory.RootAllocator; + +import java.io.InputStream; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class ArrowFlightClient +{ + private final FlightClient flightClient; + private final Optional trustedCertificate; + private RootAllocator allocator; + + public ArrowFlightClient(FlightClient flightClient, Optional trustedCertificate, RootAllocator allocator) + { + this.flightClient = requireNonNull(flightClient, "flightClient cannot be null"); + this.trustedCertificate = trustedCertificate; + this.allocator = allocator; + } + + public FlightClient getFlightClient() + { + return flightClient; + } + + public Optional getTrustedCertificate() + { + return trustedCertificate; + } + + public void close() throws Exception + { + flightClient.close(); + if (trustedCertificate.isPresent()) { + trustedCertificate.get().close(); + } + if (allocator != null) { + allocator.close(); + allocator = null; + } + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java new file mode 100644 index 0000000000000..86671fd6a7090 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java @@ -0,0 +1,168 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.ConnectorSession; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.grpc.CredentialCallOption; +import org.apache.arrow.memory.RootAllocator; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR; + +public abstract class ArrowFlightClientHandler +{ + private static final Logger logger = Logger.get(ArrowFlightClientHandler.class); + private static final int TIMER_DURATION_IN_MINUTES = 30; + private final ArrowFlightConfig config; + private AtomicBoolean isClientClosed = new AtomicBoolean(true); + private Optional trustedCertificate = Optional.empty(); + private ScheduledExecutorService scheduledExecutorService; + private ArrowFlightClient arrowFlightClient; + private RootAllocator allocator; + + public ArrowFlightClientHandler(ArrowFlightConfig config) + { + this.config = config; + } + + public ArrowFlightConfig getConfig() + { + return config; + } + + public synchronized ArrowFlightClient getClient(Optional uri) + { + if (isClientClosed.get()) { + logger.info("Reinitialize the client if closed or not initialized"); + initializeClient(uri); + scheduleCloseTask(); + } + else { + resetTimer(); // Reset timer when client is reused + } + return this.arrowFlightClient; + } + + public FlightInfo getFlightInfo(ArrowFlightRequest request, ConnectorSession connectorSession) + { + try { + ArrowFlightClient client = getClient(Optional.empty()); + CredentialCallOption auth = this.getCallOptions(connectorSession); + FlightDescriptor descriptor = FlightDescriptor.command(request.getCommand()); + logger.debug("Fetching flight info"); + FlightInfo flightInfo = client.getFlightClient().getInfo(descriptor, auth); + logger.debug("got flight info"); + return flightInfo; + } + catch (Exception e) { + throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight information could not be obtained from the flight server." + e.getMessage(), e); + } + } + + public synchronized void close() throws Exception + { + if (arrowFlightClient != null) { + arrowFlightClient.close(); + arrowFlightClient = null; + } + if (trustedCertificate.isPresent()) { + trustedCertificate.get().close(); + } + shutdownTimer(); + isClientClosed.set(true); + } + + public void resetTimer() + { + shutdownTimer(); + scheduleCloseTask(); + } + + public void shutdownTimer() + { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + } + + protected abstract CredentialCallOption getCallOptions(ConnectorSession connectorSession); + + private void initializeClient(Optional uri) + { + if (!isClientClosed.get()) { + return; + } + try { + allocator = new RootAllocator(Long.MAX_VALUE); + Optional trustedCertificate = Optional.empty(); + + Location location; + if (uri.isPresent()) { + location = new Location(uri.get()); + } + else { + if (config.getArrowFlightServerSslEnabled() != null && !config.getArrowFlightServerSslEnabled()) { + location = Location.forGrpcInsecure(config.getFlightServerName(), config.getArrowFlightPort()); + } + else { + location = Location.forGrpcTls(config.getFlightServerName(), config.getArrowFlightPort()); + } + } + + FlightClient.Builder flightClientBuilder = FlightClient.builder(allocator, location); + if (config.getVerifyServer() != null && !config.getVerifyServer()) { + flightClientBuilder.verifyServer(false); + } + else if (config.getFlightServerSSLCertificate() != null) { + trustedCertificate = Optional.of(new FileInputStream(config.getFlightServerSSLCertificate())); + flightClientBuilder.trustedCertificates(trustedCertificate.get()).useTls(); + } + + FlightClient flightClient = flightClientBuilder.build(); + this.arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator); + isClientClosed.set(false); + } + catch (Exception ex) { + throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight client could not be obtained." + ex.getMessage(), ex); + } + } + + private void scheduleCloseTask() + { + scheduledExecutorService = Executors.newScheduledThreadPool(1); + Runnable closeTask = () -> { + try { + close(); + logger.info("in closeTask"); + } + catch (Exception e) { + logger.error(e); + } + scheduledExecutorService.shutdown(); + }; + scheduledExecutorService.schedule(closeTask, TIMER_DURATION_IN_MINUTES, TimeUnit.MINUTES); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java new file mode 100644 index 0000000000000..a30301dcdc361 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.configuration.Config; + +public class ArrowFlightConfig +{ + private String server; + private Boolean verifyServer; + private String flightServerSSLCertificate; + private Boolean arrowFlightServerSslEnabled; + private Integer arrowFlightPort; + public String getFlightServerName() + { + return server; + } + + public Boolean getVerifyServer() + { + return verifyServer; + } + + public Boolean getArrowFlightServerSslEnabled() + { + return arrowFlightServerSslEnabled; + } + + public String getFlightServerSSLCertificate() + { + return flightServerSSLCertificate; + } + + public Integer getArrowFlightPort() + { + return arrowFlightPort; + } + + @Config("arrow-flight.server") + public ArrowFlightConfig setFlightServerName(String server) + { + this.server = server; + return this; + } + + @Config("arrow-flight.server.verify") + public ArrowFlightConfig setVerifyServer(Boolean verifyServer) + { + this.verifyServer = verifyServer; + return this; + } + + @Config("arrow-flight.server.port") + public ArrowFlightConfig setArrowFlightPort(Integer arrowFlightPort) + { + this.arrowFlightPort = arrowFlightPort; + return this; + } + + @Config("arrow-flight.server-ssl-certificate") + public ArrowFlightConfig setFlightServerSSLCertificate(String flightServerSSLCertificate) + { + this.flightServerSSLCertificate = flightServerSSLCertificate; + return this; + } + + @Config("arrow-flight.server-ssl-enabled") + public ArrowFlightConfig setArrowFlightServerSslEnabled(Boolean arrowFlightServerSslEnabled) + { + this.arrowFlightServerSslEnabled = arrowFlightServerSslEnabled; + return this; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java new file mode 100644 index 0000000000000..7e04e0a6066e3 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +public interface ArrowFlightRequest +{ + byte[] getCommand(); +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java new file mode 100644 index 0000000000000..8b231b98a6ee6 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public class ArrowHandleResolver + implements ConnectorHandleResolver +{ + @Override + public Class getTableHandleClass() + { + return ArrowTableHandle.class; + } + + @Override + public Class getTableLayoutHandleClass() + { + return ArrowTableLayoutHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return ArrowColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return ArrowSplit.class; + } + + @Override + public Class getTransactionHandleClass() + { + return ArrowTransactionHandle.class; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java new file mode 100644 index 0000000000000..b20762f8ad497 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static java.util.Objects.requireNonNull; + +public class ArrowModule + implements Module +{ + protected final String connectorId; + + public ArrowModule(String connectorId) + { + this.connectorId = requireNonNull(connectorId, "connector id is null"); + } + + public void configure(Binder binder) + { + configBinder(binder).bindConfig(ArrowFlightConfig.class); + binder.bind(ArrowConnector.class).in(Scopes.SINGLETON); + binder.bind(ArrowConnectorId.class).toInstance(new ArrowConnectorId(connectorId)); + binder.bind(ConnectorHandleResolver.class).to(ArrowHandleResolver.class).in(Scopes.SINGLETON); + binder.bind(ArrowPageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(ConnectorPageSourceProvider.class).to(ArrowPageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(Connector.class).to(ArrowConnector.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java new file mode 100644 index 0000000000000..954689c91b3ae --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java @@ -0,0 +1,566 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.type.DateType; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.common.type.TimeType; +import com.facebook.presto.common.type.TimestampType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.NullVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR; + +public class ArrowPageSource + implements ConnectorPageSource +{ + private static Logger logger = Logger.get(ArrowPageSource.class); + private final ArrowSplit split; + private final List columnHandles; + private final ArrowFlightClientHandler clientHandler; + private boolean completed; + private int currentPosition; + private Optional vectorSchemaRoot = Optional.empty(); + private ArrowFlightClient flightClient; + private FlightStream flightStream; + + public ArrowPageSource(ArrowSplit split, List columnHandles, ArrowFlightClientHandler clientHandler, + ConnectorSession connectorSession) + { + this.columnHandles = columnHandles; + this.split = split; + this.clientHandler = clientHandler; + getFlightStream(clientHandler, split.getTicket(), connectorSession); + } + + @Override + public long getCompletedBytes() + { + return 0; + } + + @Override + public long getCompletedPositions() + { + return currentPosition; + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public boolean isFinished() + { + return completed; + } + + @Override + public long getSystemMemoryUsage() + { + return 0; + } + + @Override + public Page getNextPage() + { + if (vectorSchemaRoot.isPresent()) { + vectorSchemaRoot.get().close(); + vectorSchemaRoot = Optional.empty(); + } + + if (flightStream.next()) { + vectorSchemaRoot = Optional.ofNullable(flightStream.getRoot()); + } + + if (!vectorSchemaRoot.isPresent()) { + completed = true; + } + + if (isFinished()) { + return null; + } + + currentPosition++; + + List blocks = new ArrayList<>(); + for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) { + FieldVector vector = vectorSchemaRoot.get().getVector(columnIndex); + Type type = columnHandles.get(columnIndex).getColumnType(); + + Block block = buildBlockFromVector(vector, type); + blocks.add(block); + } + + return new Page(vectorSchemaRoot.get().getRowCount(), blocks.toArray(new Block[0])); + } + + @Override + public void close() throws IOException + { + if (vectorSchemaRoot.isPresent()) { + vectorSchemaRoot.get().close(); + } + if (flightStream != null) { + try { + flightStream.close(); + } + catch (Exception e) { + logger.error(e); + } + } + } + + private void getFlightStream(ArrowFlightClientHandler clientHandler, byte[] ticket, ConnectorSession connectorSession) + { + try { + Optional uri = split.getLocationUrls().isEmpty() ? Optional.empty() : Optional.of(split.getLocationUrls().get(0)); + flightClient = clientHandler.getClient(uri); + flightStream = flightClient.getFlightClient().getStream(new Ticket(ticket), clientHandler.getCallOptions(connectorSession)); + } + catch (FlightRuntimeException e) { + throw new ArrowException(ARROW_FLIGHT_ERROR, e.getMessage(), e); + } + } + + private Block buildBlockFromVector(FieldVector vector, Type type) + { + if (vector instanceof BitVector) { + return buildBlockFromBitVector((BitVector) vector, type); + } + else if (vector instanceof TinyIntVector) { + return buildBlockFromTinyIntVector((TinyIntVector) vector, type); + } + else if (vector instanceof IntVector) { + return buildBlockFromIntVector((IntVector) vector, type); + } + else if (vector instanceof SmallIntVector) { + return buildBlockFromSmallIntVector((SmallIntVector) vector, type); + } + else if (vector instanceof BigIntVector) { + return buildBlockFromBigIntVector((BigIntVector) vector, type); + } + else if (vector instanceof DecimalVector) { + return buildBlockFromDecimalVector((DecimalVector) vector, type); + } + else if (vector instanceof NullVector) { + return buildBlockFromNullVector((NullVector) vector, type); + } + else if (vector instanceof TimeStampMicroVector) { + return buildBlockFromTimeStampMicroVector((TimeStampMicroVector) vector, type); + } + else if (vector instanceof TimeStampMilliVector) { + return buildBlockFromTimeStampMilliVector((TimeStampMilliVector) vector, type); + } + else if (vector instanceof Float4Vector) { + return buildBlockFromFloat4Vector((Float4Vector) vector, type); + } + else if (vector instanceof Float8Vector) { + return buildBlockFromFloat8Vector((Float8Vector) vector, type); + } + else if (vector instanceof VarCharVector) { + return buildBlockFromVarCharVector((VarCharVector) vector, type); + } + else if (vector instanceof VarBinaryVector) { + return buildBlockFromVarBinaryVector((VarBinaryVector) vector, type); + } + else if (vector instanceof DateDayVector) { + return buildBlockFromDateDayVector((DateDayVector) vector, type); + } + else if (vector instanceof DateMilliVector) { + return buildBlockFromDateMilliVector((DateMilliVector) vector, type); + } + else if (vector instanceof TimeMilliVector) { + return buildBlockFromTimeMilliVector((TimeMilliVector) vector, type); + } + else if (vector instanceof TimeSecVector) { + return buildBlockFromTimeSecVector((TimeSecVector) vector, type); + } + else if (vector instanceof TimeStampSecVector) { + return buildBlockFromTimeStampSecVector((TimeStampSecVector) vector, type); + } + else if (vector instanceof TimeMicroVector) { + return buildBlockFromTimeMicroVector((TimeMicroVector) vector, type); + } + throw new UnsupportedOperationException("Unsupported vector type: " + vector.getClass().getSimpleName()); + } + + private Block buildBlockFromBitVector(BitVector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + type.writeBoolean(builder, vector.get(i) == 1); + } + } + return builder.build(); + } + + private Block buildBlockFromIntVector(IntVector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + type.writeLong(builder, vector.get(i)); + } + } + return builder.build(); + } + + private Block buildBlockFromSmallIntVector(SmallIntVector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + type.writeLong(builder, vector.get(i)); + } + } + return builder.build(); + } + + private Block buildBlockFromTinyIntVector(TinyIntVector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + type.writeLong(builder, vector.get(i)); + } + } + return builder.build(); + } + + private Block buildBlockFromBigIntVector(BigIntVector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + type.writeLong(builder, vector.get(i)); + } + } + return builder.build(); + } + + private Block buildBlockFromDecimalVector(DecimalVector vector, Type type) + { + if (!(type instanceof DecimalType)) { + throw new IllegalArgumentException("Type must be a DecimalType for DecimalVector"); + } + + DecimalType decimalType = (DecimalType) type; + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + BigDecimal decimal = vector.getObject(i); // Get the BigDecimal value + if (decimalType.isShort()) { + builder.writeLong(decimal.unscaledValue().longValue()); + } + else { + Slice slice = Decimals.encodeScaledValue(decimal); + decimalType.writeSlice(builder, slice, 0, slice.length()); + } + } + } + return builder.build(); + } + + private Block buildBlockFromNullVector(NullVector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + builder.appendNull(); + } + return builder.build(); + } + + private Block buildBlockFromTimeStampMicroVector(TimeStampMicroVector vector, Type type) + { + if (!(type instanceof TimestampType)) { + throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName()); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + long micros = vector.get(i); + long millis = TimeUnit.MICROSECONDS.toMillis(micros); + type.writeLong(builder, millis); + } + } + return builder.build(); + } + + private Block buildBlockFromTimeStampMilliVector(TimeStampMilliVector vector, Type type) + { + if (!(type instanceof TimestampType)) { + throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName()); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + long millis = vector.get(i); + type.writeLong(builder, millis); + } + } + return builder.build(); + } + + private Block buildBlockFromFloat8Vector(Float8Vector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + type.writeDouble(builder, vector.get(i)); + } + } + return builder.build(); + } + + private Block buildBlockFromFloat4Vector(Float4Vector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + int intBits = Float.floatToIntBits(vector.get(i)); + type.writeLong(builder, intBits); + } + } + return builder.build(); + } + + private Block buildBlockFromVarBinaryVector(VarBinaryVector vector, Type type) + { + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + byte[] value = vector.get(i); + type.writeSlice(builder, Slices.wrappedBuffer(value)); + } + } + return builder.build(); + } + + private Block buildBlockFromVarCharVector(VarCharVector vector, Type type) + { + if (!(type instanceof VarcharType)) { + throw new IllegalArgumentException("Expected VarcharType but got " + type.getClass().getName()); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + String value = new String(vector.get(i), StandardCharsets.UTF_8); + type.writeSlice(builder, Slices.utf8Slice(value)); + } + } + return builder.build(); + } + + private Block buildBlockFromDateDayVector(DateDayVector vector, Type type) + { + if (!(type instanceof DateType)) { + throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName()); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + type.writeLong(builder, vector.get(i)); + } + } + return builder.build(); + } + + private Block buildBlockFromDateMilliVector(DateMilliVector vector, Type type) + { + if (!(type instanceof DateType)) { + throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName()); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + DateType dateType = (DateType) type; + long days = TimeUnit.MILLISECONDS.toDays(vector.get(i)); + dateType.writeLong(builder, days); + } + } + return builder.build(); + } + + private Block buildBlockFromTimeSecVector(TimeSecVector vector, Type type) + { + if (!(type instanceof TimeType)) { + throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector"); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + int value = vector.get(i); + long millis = TimeUnit.SECONDS.toMillis(value); + type.writeLong(builder, millis); + } + } + return builder.build(); + } + + private Block buildBlockFromTimeMilliVector(TimeMilliVector vector, Type type) + { + if (!(type instanceof TimeType)) { + throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector"); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + int value = vector.get(i); + long millis = TimeUnit.MILLISECONDS.toMillis(value); + type.writeLong(builder, millis); + } + } + return builder.build(); + } + + private Block buildBlockFromTimeMicroVector(TimeMicroVector vector, Type type) + { + if (!(type instanceof TimeType)) { + throw new IllegalArgumentException("Type must be a TimeType for TimemicroVector"); + } + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + long value = vector.get(i); + long micro = TimeUnit.MICROSECONDS.toMillis(value); + type.writeLong(builder, micro); + } + } + return builder.build(); + } + + private Block buildBlockFromTimeStampSecVector(TimeStampSecVector vector, Type type) + { + if (!(type instanceof TimestampType)) { + throw new IllegalArgumentException("Type must be a TimestampType for TimeStampSecVector"); + } + + BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount()); + for (int i = 0; i < vector.getValueCount(); i++) { + if (vector.isNull(i)) { + builder.appendNull(); + } + else { + long value = vector.get(i); + long millis = TimeUnit.SECONDS.toMillis(value); + type.writeLong(builder, millis); + } + } + return builder.build(); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java new file mode 100644 index 0000000000000..f3bb41c3e35d4 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.SplitContext; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.util.List; + +public class ArrowPageSourceProvider + implements ConnectorPageSourceProvider +{ + private static final Logger logger = Logger.get(ArrowPageSourceProvider.class); + private ArrowFlightClientHandler clientHandler; + @Inject + public ArrowPageSourceProvider(ArrowFlightClientHandler clientHandler) + { + this.clientHandler = clientHandler; + } + + @Override + public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns, SplitContext splitContext) + { + ImmutableList.Builder columnHandles = ImmutableList.builder(); + for (ColumnHandle handle : columns) { + columnHandles.add((ArrowColumnHandle) handle); + } + ArrowSplit arrowSplit = (ArrowSplit) split; + logger.debug("Processing split with flight ticket"); + return new ArrowPageSource(arrowSplit, columnHandles.build(), clientHandler, session); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java new file mode 100644 index 0000000000000..7744af3ecd357 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.google.common.collect.ImmutableList; +import com.google.inject.Module; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Objects.requireNonNull; + +public class ArrowPlugin + implements Plugin +{ + protected final String name; + protected final Module module; + + public ArrowPlugin(String name, Module module) + { + checkArgument(!isNullOrEmpty(name), "name is null or empty"); + this.name = name; + this.module = requireNonNull(module, "module is null"); + } + + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new ArrowConnectorFactory(name, module, getClassLoader())); + } + + private static ClassLoader getClassLoader() + { + return firstNonNull(Thread.currentThread().getContextClassLoader(), ArrowPlugin.class.getClassLoader()); + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java new file mode 100644 index 0000000000000..db65912de8c58 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.NodeProvider; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + +public class ArrowSplit + implements ConnectorSplit +{ + private final String schemaName; + private final String tableName; + private final byte[] ticket; + private final List locationUrls; + + @JsonCreator + public ArrowSplit( + @JsonProperty("schemaName") @Nullable String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("ticket") byte[] ticket, + @JsonProperty("locationUrls") List locationUrls) + { + this.schemaName = schemaName; + this.tableName = tableName; + this.ticket = ticket; + this.locationUrls = locationUrls; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() + { + return NodeSelectionStrategy.NO_PREFERENCE; + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) + { + return Collections.emptyList(); + } + + @Override + public Object getInfo() + { + return this.getInfoMap(); + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @JsonProperty + public byte[] getTicket() + { + return ticket; + } + + @JsonProperty + public List getLocationUrls() + { + return locationUrls; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableHandle.java new file mode 100644 index 0000000000000..e0f8c6586791d --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableHandle.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ConnectorTableHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ArrowTableHandle + implements ConnectorTableHandle +{ + private final String schema; + private final String table; + + @JsonCreator + public ArrowTableHandle( + @JsonProperty("schema") String schema, + @JsonProperty("table") String table) + { + this.schema = schema; + this.table = table; + } + + @JsonProperty("schema") + public String getSchema() + { + return schema; + } + + @JsonProperty("table") + public String getTable() + { + return table; + } + + @Override + public String toString() + { + return schema + ":" + table; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableLayoutHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableLayoutHandle.java new file mode 100644 index 0000000000000..c62aea21ca449 --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableLayoutHandle.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class ArrowTableLayoutHandle + implements ConnectorTableLayoutHandle +{ + private final ArrowTableHandle tableHandle; + private final List columnHandles; + private final TupleDomain tupleDomain; + private final Optional additionalPredicate; + + @JsonCreator + public ArrowTableLayoutHandle(@JsonProperty("table") ArrowTableHandle table, + @JsonProperty("columnHandles") List columnHandles, + @JsonProperty("tupleDomain") TupleDomain domain, + @JsonProperty("additionalPredicate") Optional additionalPredicate) + { + this.tableHandle = requireNonNull(table, "table is null"); + this.columnHandles = requireNonNull(columnHandles, "columns are null"); + this.tupleDomain = requireNonNull(domain, "domain is null"); + this.additionalPredicate = additionalPredicate; + } + + @JsonProperty("table") + public ArrowTableHandle getTableHandle() + { + return tableHandle; + } + + @JsonProperty("tupleDomain") + public TupleDomain getTupleDomain() + { + return tupleDomain; + } + + @JsonProperty("additionalPredicate") + public Optional getAdditionalPredicate() + { + return additionalPredicate; + } + + @JsonProperty("columnHandles") + public List getColumnHandles() + { + return columnHandles; + } +} diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTransactionHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTransactionHandle.java new file mode 100644 index 0000000000000..07eb7385cfbcf --- /dev/null +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTransactionHandle.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public enum ArrowTransactionHandle + implements ConnectorTransactionHandle +{ + INSTANCE +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowServer.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowServer.java new file mode 100644 index 0000000000000..334b744f0e881 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowServer.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import org.apache.arrow.flight.Action; +import org.apache.arrow.flight.ActionType; +import org.apache.arrow.flight.Criteria; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.Ticket; + +public class ArrowServer + implements FlightProducer +{ + @Override + public void getStream(CallContext callContext, Ticket ticket, ServerStreamListener serverStreamListener) + { + } + + @Override + public void listFlights(CallContext callContext, Criteria criteria, StreamListener streamListener) + { + } + + @Override + public FlightInfo getFlightInfo(CallContext callContext, FlightDescriptor flightDescriptor) + { + return null; + } + + @Override + public Runnable acceptPut(CallContext callContext, FlightStream flightStream, StreamListener streamListener) + { + return null; + } + + @Override + public void doAction(CallContext callContext, Action action, StreamListener streamListener) + { + } + + @Override + public void listActions(CallContext callContext, StreamListener streamListener) + { + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowTableHandleTest.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowTableHandleTest.java new file mode 100644 index 0000000000000..cd8d3984cc533 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowTableHandleTest.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class ArrowTableHandleTest +{ + @Test + public void testConstructorAndGetters() + { + ArrowTableHandle handle = new ArrowTableHandle("test_schema", "test_table"); + assertEquals(handle.getSchema(), "test_schema"); + assertEquals(handle.getTable(), "test_table"); + } + + @Test + public void testToString() + { + ArrowTableHandle handle = new ArrowTableHandle("test_schema", "test_table"); + assertEquals(handle.toString(), "test_schema:test_table"); + } + + @Test + public void testJsonSerialization() throws Exception + { + ObjectMapper objectMapper = new ObjectMapper(); + + ArrowTableHandle handle = new ArrowTableHandle("test_schema", "test_table"); + String json = objectMapper.writeValueAsString(handle); + + assertNotNull(json); + + ArrowTableHandle deserialized = objectMapper.readValue(json, ArrowTableHandle.class); + assertEquals(deserialized.getSchema(), handle.getSchema()); + assertEquals(deserialized.getTable(), handle.getTable()); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractFlightRequest.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractFlightRequest.java new file mode 100644 index 0000000000000..20921f8105099 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractFlightRequest.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public class TestArrowAbstractFlightRequest +{ + @Test + public void testGetSchema() + { + String schema = "test_schema"; + ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema); + assertEquals(schema, request.getSchema()); + } + + @Test + public void testGetTable() + { + String schema = "test_schema"; + String table = "test_table"; + ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema, table); + assertEquals(table, request.getTable()); + } + + @Test + public void testGetQuery() + { + String schema = "test_schema"; + String table = "test_table"; + String query = "SELECT * FROM test_table"; + ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema, table, query); + assertTrue(request.getQuery().isPresent()); + assertEquals(query, request.getQuery().get()); + } + + @Test + public void testEmptyQuery() + { + String schema = "test_schema"; + String table = "test_table"; + ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema, table); + assertFalse(request.getQuery().isPresent()); + } + + @Test + public void testGetCommand() + { + TestArrowFlightRequest request = new TestArrowFlightRequest("schema"); + + byte[] command = request.getCommand(); + + assertNotNull(command); + assertEquals(0, command.length); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractMetadata.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractMetadata.java new file mode 100644 index 0000000000000..378dc325da791 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractMetadata.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.SchemaTableName; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@Test(singleThreaded = true) +public class TestArrowAbstractMetadata +{ + public TestArrowAbstractMetadata() throws IOException + { + } + + @Test + public void testGetTableMetadata() + { + // Mock dependencies + ArrowAbstractMetadata metadata = mock(ArrowAbstractMetadata.class); + Mockito.doCallRealMethod().when(metadata).getTableMetadata(Mockito.any(ConnectorSession.class), Mockito.any(ConnectorTableHandle.class)); + ConnectorSession session = mock(ConnectorSession.class); + ArrowTableHandle tableHandle = new ArrowTableHandle("testSchema", "testTable"); + + // Mock the behavior of getColumnsList + List columnList = Arrays.asList( + new Field("column1", FieldType.notNullable(new ArrowType.Int(32, true)), null), + new Field("column2", FieldType.notNullable(new ArrowType.Decimal(10, 2)), null), + new Field("column3", FieldType.notNullable(new ArrowType.Bool()), null)); + + when(metadata.getColumnsList("testSchema", "testTable", session)).thenReturn(columnList); + // Call the method under test + ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle); + + // Verify the result + assertNotNull(tableMetadata); + assertEquals(tableMetadata.getTable(), new SchemaTableName("testSchema", "testTable")); + List columns = tableMetadata.getColumns(); + assertEquals(columns.size(), 3); + assertEquals(columns.get(0), new ColumnMetadata("column1", IntegerType.INTEGER)); + assertEquals(columns.get(1), new ColumnMetadata("column2", DecimalType.createDecimalType(10, 2))); + assertEquals(columns.get(2), new ColumnMetadata("column3", BooleanType.BOOLEAN)); + } + + @Test + public void testGetTableLayouts() + { + // Mock dependencies + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableHandle tableHandle = new ArrowTableHandle("testSchema", "testTable"); + + // Mock the constraint + Constraint trueConstraint = Constraint.alwaysTrue(); + + Set desiredColumns = new HashSet<>(); + desiredColumns.add(new ArrowColumnHandle("column1", IntegerType.INTEGER)); + desiredColumns.add(new ArrowColumnHandle("column2", VarcharType.VARCHAR)); + + // Call the method under test + ArrowAbstractMetadata metadata = mock(ArrowAbstractMetadata.class); + Mockito.doCallRealMethod().when(metadata).getTableLayouts(Mockito.any(ConnectorSession.class), Mockito.any(ConnectorTableHandle.class), Mockito.any(Constraint.class), Mockito.any(Optional.class)); + + List tableLayouts = metadata.getTableLayouts(session, tableHandle, trueConstraint, Optional.of(desiredColumns)); + + // Verify the result + assertNotNull(tableLayouts); + assertEquals(tableLayouts.size(), 1); + ConnectorTableLayoutResult layoutResult = tableLayouts.get(0); + assertNotNull(layoutResult); + assertNotNull(layoutResult.getTableLayout()); + assertEquals(((ArrowTableLayoutHandle) layoutResult.getTableLayout().getHandle()).getTableHandle(), tableHandle); + assertEquals(((ArrowTableLayoutHandle) layoutResult.getTableLayout().getHandle()).getColumnHandles(), desiredColumns); + assertEquals(layoutResult.getTableLayout().getPredicate(), trueConstraint.getSummary()); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowColumnHandle.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowColumnHandle.java new file mode 100644 index 0000000000000..6eca6de0a85c5 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowColumnHandle.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.ColumnMetadata; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class TestArrowColumnHandle +{ + @Test + public void testConstructorAndGetters() + { + String columnName = "TestColumn"; + Type columnType = VarcharType.createUnboundedVarcharType(); + + ArrowColumnHandle columnHandle = new ArrowColumnHandle(columnName, columnType); + + assertEquals(columnHandle.getColumnName(), columnName); + assertEquals(columnHandle.getColumnType(), columnType); + } + + @Test + public void testGetColumnMetadata() + { + String columnName = "testcolumn"; + Type columnType = VarcharType.createUnboundedVarcharType(); + + ArrowColumnHandle columnHandle = new ArrowColumnHandle(columnName, columnType); + ColumnMetadata columnMetadata = columnHandle.getColumnMetadata(); + + assertEquals(columnMetadata.getName(), columnName); + assertEquals(columnMetadata.getType(), columnType); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightClient.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightClient.java new file mode 100644 index 0000000000000..ce5039c3e7186 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightClient.java @@ -0,0 +1,88 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.memory.RootAllocator; +import org.testng.annotations.Test; + +import java.io.InputStream; +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class TestArrowFlightClient +{ + @Test + public void testArrowFlightClient() + { + FlightClient flightClient = mock(FlightClient.class); + InputStream certificateStream = mock(InputStream.class); + Optional trustedCertificate = Optional.of(certificateStream); + RootAllocator allocator = mock(RootAllocator.class); + + ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator); + + assertEquals(arrowFlightClient.getFlightClient(), flightClient); + assertTrue(arrowFlightClient.getTrustedCertificate().isPresent()); + assertEquals(arrowFlightClient.getTrustedCertificate().get(), certificateStream); + } + + @Test + public void testArrowFlightClientWithoutCertificate() + { + FlightClient flightClient = mock(FlightClient.class); + Optional trustedCertificate = Optional.empty(); + RootAllocator allocator = mock(RootAllocator.class); + ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator); + + assertEquals(arrowFlightClient.getFlightClient(), flightClient); + assertFalse(arrowFlightClient.getTrustedCertificate().isPresent()); + } + + @Test + public void testClose() throws Exception + { + FlightClient flightClient = mock(FlightClient.class); + InputStream certificateStream = mock(InputStream.class); + Optional trustedCertificate = Optional.of(certificateStream); + RootAllocator allocator = mock(RootAllocator.class); + + ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator); + + arrowFlightClient.close(); + + verify(flightClient, times(1)).close(); + verify(certificateStream, times(1)).close(); + } + + @Test + public void testCloseWithoutCertificate() throws Exception + { + FlightClient flightClient = mock(FlightClient.class); + Optional trustedCertificate = Optional.empty(); + RootAllocator allocator = mock(RootAllocator.class); + + ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator); + + arrowFlightClient.close(); + + verify(flightClient, times(1)).close(); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightConfig.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightConfig.java new file mode 100644 index 0000000000000..3e9e643ebdd29 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.configuration.testing.ConfigAssertions; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +public class TestArrowFlightConfig +{ + @Test + public void testDefaults() + { + ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(ArrowFlightConfig.class) + .setFlightServerName(null) + .setVerifyServer(null) + .setFlightServerSSLCertificate(null) + .setArrowFlightServerSslEnabled(null) + .setArrowFlightPort(null)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("arrow-flight.server", "127.0.0.1") + .put("arrow-flight.server.verify", "true") + .put("arrow-flight.server-ssl-certificate", "cert") + .put("arrow-flight.server-ssl-enabled", "true") + .put("arrow-flight.server.port", "443") + .build(); + + ArrowFlightConfig expected = new ArrowFlightConfig() + .setFlightServerName("127.0.0.1") + .setVerifyServer(true) + .setFlightServerSSLCertificate("cert") + .setArrowFlightServerSslEnabled(true) + .setArrowFlightPort(443); + + ConfigAssertions.assertFullMapping(properties, expected); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightRequest.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightRequest.java new file mode 100644 index 0000000000000..0d6d444ff1ec8 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightRequest.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import java.util.Optional; +public class TestArrowFlightRequest + extends ArrowAbstractFlightRequest +{ + public TestArrowFlightRequest(String schema) + { + super(schema); + } + + public TestArrowFlightRequest(String schema, String table) + { + super(schema, table); + } + + public TestArrowFlightRequest(String schema, String table, String query) + { + super(schema, table, Optional.of(query)); + } + + @Override + public byte[] getCommand() + { + return new byte[0]; + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSource.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSource.java new file mode 100644 index 0000000000000..2e245ecb7b46f --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSource.java @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ConnectorSession; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +public class TestArrowPageSource +{ + @Mock + private ArrowSplit mockSplit; + + @Mock + private List mockColumnHandles; + + @Mock + private ArrowFlightClientHandler mockClientHandler; + + @Mock + private ConnectorSession mockSession; + + @Mock + private FlightClient mockFlightClient; + + @Mock + private FlightStream mockFlightStream; + + @Mock + private VectorSchemaRoot mockVectorSchemaRoot; + + @BeforeClass + public void setUp() + { + MockitoAnnotations.openMocks(this); + ArrowFlightClient mockArrowFlightClient = mock(ArrowFlightClient.class); + when(mockClientHandler.getClient(any(Optional.class))).thenReturn(mockArrowFlightClient); + when(mockArrowFlightClient.getFlightClient()).thenReturn(mockFlightClient); + when(mockFlightClient.getStream(any(Ticket.class), any())).thenReturn(mockFlightStream); + } + + @Test + public void testInitialization() + { + ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession); + assertNotNull(arrowPageSource); + } + + @Test + public void testGetNextPageWithEmptyFlightStream() + { + when(mockFlightStream.next()).thenReturn(false); + ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession); + assertNull(arrowPageSource.getNextPage()); + } + + @Test + public void testGetNextPageWithNonEmptyFlightStream() + { + when(mockFlightStream.next()).thenReturn(true); + when(mockFlightStream.getRoot()).thenReturn(mockVectorSchemaRoot); + when(mockVectorSchemaRoot.getRowCount()).thenReturn(1); + ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession); + assertNotNull(arrowPageSource.getNextPage()); + } + + @Test + public void testCloseResources() throws Exception + { + ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession); + arrowPageSource.close(); + verify(mockFlightStream).close(); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSourceProvider.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSourceProvider.java new file mode 100644 index 0000000000000..77b89cf5d17cb --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSourceProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SplitContext; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.common.collect.ImmutableList; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Ticket; +import org.testng.annotations.Test; + +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public class TestArrowPageSourceProvider +{ + @Test + public void testCreatePageSourceWithValidParameters() + { + ArrowFlightClientHandler clientHandler = mock(ArrowFlightClientHandler.class); + ArrowFlightClient flightClient = mock(ArrowFlightClient.class); + FlightClient flightClientInstance = mock(FlightClient.class); + FlightStream flightStream = mock(FlightStream.class); + when(clientHandler.getClient(any())).thenReturn(flightClient); + when(flightClient.getFlightClient()).thenReturn(flightClientInstance); + when(flightClientInstance.getStream(any(Ticket.class), any(), any())).thenReturn(flightStream); + ArrowPageSourceProvider arrowPageSourceProvider = new ArrowPageSourceProvider(clientHandler); + ConnectorTransactionHandle transactionHandle = mock(ConnectorTransactionHandle.class); + ConnectorSession session = mock(ConnectorSession.class); + ArrowSplit split = mock(ArrowSplit.class); + List columns = ImmutableList.of(mock(ArrowColumnHandle.class)); + SplitContext splitContext = mock(SplitContext.class); + when(split.getTicket()).thenReturn(new byte[0]); + ConnectorPageSource pageSource = arrowPageSourceProvider.createPageSource(transactionHandle, session, split, columns, splitContext); + assertNotNull(pageSource); + assertTrue(pageSource instanceof ArrowPageSource); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPlugin.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPlugin.java new file mode 100644 index 0000000000000..2cd4ee099ddb1 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPlugin.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.connector.ConnectorFactory; +import org.testng.annotations.Test; + +import static com.facebook.airlift.testing.Assertions.assertInstanceOf; +import static com.google.common.collect.Iterables.getOnlyElement; + +public class TestArrowPlugin +{ + @Test + public void testStartup() + { + ArrowModule testModule = new ArrowModule("arrow-flight"); + ArrowPlugin plugin = new ArrowPlugin("arrow-flight", testModule); + ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); + assertInstanceOf(factory, ArrowConnectorFactory.class); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowSplit.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowSplit.java new file mode 100644 index 0000000000000..83b0cbf1cc7b5 --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowSplit.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.airlift.json.JsonCodec; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static com.facebook.airlift.json.JsonCodec.jsonCodec; +import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; +import static org.testng.Assert.assertEquals; + +public class TestArrowSplit +{ + private final ArrowSplit split = new ArrowSplit("schemaName", "tableName", + "ticket".getBytes(), Arrays.asList("http://host")); + + @Test + public void testNodes() + { + assertEquals(split.getNodeSelectionStrategy(), NO_PREFERENCE); + assertEquals(split.getPreferredNodes(null), Collections.emptyList()); + } + + @Test + public void testJsonRoundTrip() + { + JsonCodec codec = jsonCodec(ArrowSplit.class); + String json = codec.toJson(split); + ArrowSplit copy = codec.fromJson(json); + assertEquals(copy.getSchemaName(), split.getSchemaName()); + assertEquals(copy.getTableName(), split.getTableName()); + assertEquals(copy.getTicket(), split.getTicket()); + assertEquals(copy.getLocationUrls(), split.getLocationUrls()); + } +} diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestFlightClient.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestFlightClient.java new file mode 100644 index 0000000000000..5b2e3eea9cfaf --- /dev/null +++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestFlightClient.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.plugin.arrow; + +import com.facebook.presto.spi.ConnectorSession; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.grpc.CredentialCallOption; +import org.apache.arrow.memory.RootAllocator; +import org.mockito.Mockito; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class TestFlightClient +{ + private static RootAllocator allocator; + private static FlightServer server; + + private static Location serverLocation; + + @BeforeClass + public void setup() throws Exception + { + allocator = new RootAllocator(Long.MAX_VALUE); + serverLocation = Location.forGrpcInsecure("127.0.0.1", 9443); + server = FlightServer.builder(allocator, serverLocation, new ArrowServer()) + .build(); + server.start(); + System.out.println("Server listening on port " + server.getPort()); + } + + @AfterClass + public void tearDown() throws Exception + { + server.close(); + allocator.close(); + } + + @Test + public void testInitializeClient() + { + ArrowFlightConfig config = mock(ArrowFlightConfig.class); + when(config.getFlightServerSSLCertificate()).thenReturn(null); + when(config.getVerifyServer()).thenReturn(true); + ConnectorSession connectorSession = mock(ConnectorSession.class); + ArrowFlightClientHandler handler = new ArrowFlightClientHandler(config) { + @Override + protected CredentialCallOption getCallOptions(ConnectorSession connectorSession) + { + return null; + } + }; + Optional uri = Optional.of("grpc://127.0.0.1:9443"); + handler.getClient(uri); + assertNotNull(handler.getClient(uri)); + } + + @Test + public void testGetFlightInfo() + { + ArrowFlightConfig config = Mockito.mock(ArrowFlightConfig.class); + FlightInfo mockFlightInfo = mock(FlightInfo.class); + ArrowFlightClientHandler clientHandler = Mockito.mock(ArrowFlightClientHandler.class); + ArrowFlightRequest request = new TestArrowFlightRequest("schema", "table", "query"); + ConnectorSession session = Mockito.mock(ConnectorSession.class); + when(clientHandler.getFlightInfo(request, session)).thenReturn(mockFlightInfo); + FlightInfo result = clientHandler.getFlightInfo(request, session); + assertEquals(mockFlightInfo, result); + } +} diff --git a/presto-docs/src/main/sphinx/connector.rst b/presto-docs/src/main/sphinx/connector.rst index f07506b460cab..d337fe4ed12d1 100644 --- a/presto-docs/src/main/sphinx/connector.rst +++ b/presto-docs/src/main/sphinx/connector.rst @@ -9,6 +9,7 @@ from different data sources. :maxdepth: 1 connector/accumulo + connector/base-arrow-flight connector/bigquery connector/blackhole connector/cassandra diff --git a/presto-docs/src/main/sphinx/connector/base-arrow-flight.rst b/presto-docs/src/main/sphinx/connector/base-arrow-flight.rst new file mode 100644 index 0000000000000..5a310cecdd4c3 --- /dev/null +++ b/presto-docs/src/main/sphinx/connector/base-arrow-flight.rst @@ -0,0 +1,96 @@ + +====================== +Arrow-flight Connector +====================== +This connector allows querying multiple data sources that are supported by an Arrow Flight server. +Apache Arrow enhances performance and efficiency in data-intensive applications through its columnar memory layout, zero-copy reads, vectorized execution, cross-language interoperability, rich data type support, and optimization for modern hardware. These features collectively reduce overhead, improve data processing speeds, and facilitate seamless data exchange between different systems and languages. + +Getting Started with base-arrow-module: Essential Abstract Methods for Developers +--------------------------------------------------------------------------------- +To use the base-arrow-module, you need to implement certain abstract methods that are specific to your use case. Below are the required classes and their purposes: + +* ``ArrowFlightClientHandler.java`` + This class is responsible for initializing the Flight client and retrieving Flight information from the Flight server. To authenticate the Flight server, you must implement the abstract method ``getCallOptions`` in ArrowFlightClientHandler, which returns the ``CredentialCallOption`` specific to your Flight server. + +* ``ArrowAbstractFlightRequest.java`` + Implement this class to define the request data, including the data source type, connection properties, the number of partitions and other data required to interact with database. + +* ``ArrowAbstractMetadata.java`` + To retrieve metadata (schema and table information), implement the abstract methods in the ArrowAbstractMetadata class. + +* ``ArrowAbstractSplitManager.java`` + Extend the ArrowAbstractSplitManager class to implement the Arrow flight request, defining the Arrow split. + +* ``ArrowPlugin.java`` + Register your connector name by extending the ArrowPlugin class. + +* ``ArrowFlightRequest`` + The ``getCommand`` method in the ``ArrowFlightRequest`` interface should return a byte array for the flight request. + + +Configuration +------------- +To configure the Arrow connector, create a catalog file +in ``etc/catalog`` named, for example, ``arrowmariadb.properties``, to +mount the Arrow-flight connector as the ``arrowmariadb`` catalog. +Create the file with the following contents, replacing the +connection properties as appropriate for your setup: + + +.. code-block:: none + + + connector.name= + arrow-flight.server= + arrow-flight.server.port= + + + +Add other properties that are required for your Flight server to connect. + +========================================== ============================================================== +Property Name Description +========================================== ============================================================== +``arrow-flight.server`` Endpoint of arrow-flight server +``arrow-flight.server.port`` Flight server port +``arrow-flight.server-ssl-certificate`` Pass ssl certificate +``arrow-flight.server.verify`` To verify server +``arrow-flight.server-ssl-enabled`` Port is ssl enabled +========================================== ============================================================== + +Querying Arrow-Flight +--------------------- + +The Arrow-Flight connector provides schema for each supported *databases*. +Example for MariaDB is shown below. +To see the available schemas, run ``SHOW SCHEMAS``:: + + SHOW SCHEMAS FROM arrowmariadb; + +To view the tables in the MariaDB database named ``user``, +run ``SHOW TABLES``:: + + SHOW TABLES FROM arrowmariadb.user; + +To see a list of the columns in the ``admin`` table in the ``user`` database, +use either of the following commands:: + + DESCRIBE arrowmariadb.user.admin; + SHOW COLUMNS FROM arrowmariadb.user.admin; + +Finally, you can access the ``admin`` table in the ``user`` database:: + + SELECT * FROM arrowmariadb.user.admin; + +If you used a different name for your catalog properties file, use +that catalog name instead of ``arrowmariadb`` in the above examples. + + +Arrow-Flight Connector Limitations +---------------------------------- + +* SELECT and DESCRIBE queries are supported by this connector template. Implementing modules can add support for additional features. + +* Arrow-flight connector can query against only those datasources which are supported by Flight server. + +* The user should have the flight server running for the arrow-flight connector to work.