diff --git a/pom.xml b/pom.xml index 9d986b41b05f..e242b179ee5c 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ 3.3.0 2.6.0 2.2.0 + 2.11.0 org.openjdk.jmh @@ -581,6 +594,40 @@ presto-jmx test + + + org.apache.iceberg + iceberg-core + 1.5.0 + tests + test + + + org.apache.avro + avro + + + org.apache.parquet + parquet-column + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-jdk14 + + + io.airlift + aircompressor + + + org.roaringbitmap + RoaringBitmap + + + @@ -590,10 +637,15 @@ org.apache.maven.plugins maven-dependency-plugin - + org.glassfish.jersey.core:jersey-common:jar org.eclipse.jetty:jetty-server:jar + + com.facebook.airlift:http-server:jar + com.facebook.airlift:node:jar + javax.servlet:javax.servlet-api:jar + org.apache.httpcomponents.core5:httpcore5:jar diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java index 9035d91ad906..c7a54e829cd6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CatalogType.java @@ -16,13 +16,14 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.nessie.NessieCatalog; +import org.apache.iceberg.rest.RESTCatalog; public enum CatalogType { HADOOP(HadoopCatalog.class.getName()), HIVE(HiveCatalog.class.getName()), NESSIE(NessieCatalog.class.getName()), - + REST(RESTCatalog.class.getName()) /**/; private final String catalogImpl; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 6d1476433e5f..39dbcbc01a34 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -39,6 +39,7 @@ import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; import com.facebook.presto.iceberg.nessie.NessieConfig; import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider; +import com.facebook.presto.iceberg.rest.IcebergRestConfig; import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure; import com.facebook.presto.iceberg.procedure.RegisterTableProcedure; import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure; @@ -128,6 +129,7 @@ public void setup(Binder binder) configBinder(binder).bindConfig(IcebergConfig.class); configBinder(binder).bindConfig(NessieConfig.class); + configBinder(binder).bindConfig(IcebergRestConfig.class); binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON); binder.bind(IcebergTableProperties.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergResourceFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergResourceFactory.java index 8b1c8911d8ef..fb84477265df 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergResourceFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergResourceFactory.java @@ -13,22 +13,27 @@ */ package com.facebook.presto.iceberg; +import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.hive.gcs.GcsConfigurationInitializer; import com.facebook.presto.hive.s3.S3ConfigurationUpdater; import com.facebook.presto.iceberg.nessie.NessieConfig; +import com.facebook.presto.iceberg.rest.IcebergRestConfig; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.UncheckedExecutionException; +import io.jsonwebtoken.Jwts; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.rest.auth.OAuth2Properties; import javax.inject.Inject; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,12 +45,18 @@ import static com.facebook.presto.iceberg.IcebergUtil.loadCachingProperties; import static com.facebook.presto.iceberg.nessie.AuthenticationType.BASIC; import static com.facebook.presto.iceberg.nessie.AuthenticationType.BEARER; +import static com.facebook.presto.iceberg.rest.AuthenticationType.OAUTH2; +import static com.facebook.presto.iceberg.rest.SessionType.USER; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.base.Throwables.throwIfUnchecked; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.CatalogProperties.FILE_IO_IMPL; +import static org.apache.iceberg.CatalogProperties.URI; import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; +import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; +import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; /** * Factory for loading Iceberg resources such as Catalog. @@ -63,9 +74,18 @@ public class IcebergResourceFactory private final GcsConfigurationInitializer gcsConfigurationInitialize; private final IcebergConfig icebergConfig; + private final IcebergRestConfig restConfig; + private final NodeVersion nodeVersion; @Inject - public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogName, NessieConfig nessieConfig, S3ConfigurationUpdater s3ConfigurationUpdater, GcsConfigurationInitializer gcsConfigurationInitialize) + public IcebergResourceFactory( + IcebergConfig config, + IcebergCatalogName catalogName, + NessieConfig nessieConfig, + IcebergRestConfig restConfig, + S3ConfigurationUpdater s3ConfigurationUpdater, + GcsConfigurationInitializer gcsConfigurationInitialize, + NodeVersion nodeVersion) { this.catalogName = requireNonNull(catalogName, "catalogName is null").getCatalogName(); this.icebergConfig = requireNonNull(config, "config is null"); @@ -73,8 +93,10 @@ public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogNa this.catalogWarehouse = config.getCatalogWarehouse(); this.hadoopConfigResources = config.getHadoopConfigResources(); this.nessieConfig = requireNonNull(nessieConfig, "nessieConfig is null"); + this.restConfig = requireNonNull(restConfig, "restConfig is null"); this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null"); this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null"); + this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); catalogCache = CacheBuilder.newBuilder() .maximumSize(config.getCatalogCacheSize()) .build(); @@ -150,31 +172,63 @@ public Map getCatalogProperties(ConnectorSession session) if (catalogWarehouse != null) { properties.put(WAREHOUSE_LOCATION, catalogWarehouse); } - if (catalogType == NESSIE) { - properties.put("ref", getNessieReferenceName(session)); - properties.put("uri", nessieConfig.getServerUri().orElseThrow(() -> new IllegalStateException("iceberg.nessie.uri must be set for Nessie"))); - String hash = getNessieReferenceHash(session); - if (hash != null) { - properties.put("ref.hash", hash); - } - nessieConfig.getReadTimeoutMillis().ifPresent(val -> properties.put("transport.read-timeout", val.toString())); - nessieConfig.getConnectTimeoutMillis().ifPresent(val -> properties.put("transport.connect-timeout", val.toString())); - nessieConfig.getClientBuilderImpl().ifPresent(val -> properties.put("client-builder-impl", val)); - nessieConfig.getAuthenticationType().ifPresent(type -> { - if (type == BASIC) { - properties.put("authentication.username", nessieConfig.getUsername() - .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.username must be set with BASIC authentication"))); - properties.put("authentication.password", nessieConfig.getPassword() - .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.password must be set with BASIC authentication"))); + switch (catalogType) { + case NESSIE: + properties.put("ref", getNessieReferenceName(session)); + properties.put("uri", nessieConfig.getServerUri().orElseThrow(() -> new IllegalStateException("iceberg.nessie.uri must be set for Nessie"))); + String hash = getNessieReferenceHash(session); + if (hash != null) { + properties.put("ref.hash", hash); } - else if (type == BEARER) { - properties.put("authentication.token", nessieConfig.getBearerToken() - .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.bearer.token must be set with BEARER authentication"))); + nessieConfig.getReadTimeoutMillis().ifPresent(val -> properties.put("transport.read-timeout", val.toString())); + nessieConfig.getConnectTimeoutMillis().ifPresent(val -> properties.put("transport.connect-timeout", val.toString())); + nessieConfig.getClientBuilderImpl().ifPresent(val -> properties.put("client-builder-impl", val)); + nessieConfig.getAuthenticationType().ifPresent(type -> { + if (type == BASIC) { + properties.put("authentication.username", nessieConfig.getUsername() + .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.username must be set with BASIC authentication"))); + properties.put("authentication.password", nessieConfig.getPassword() + .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.password must be set with BASIC authentication"))); + } + else if (type == BEARER) { + properties.put("authentication.token", nessieConfig.getBearerToken() + .orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.bearer.token must be set with BEARER authentication"))); + } + }); + if (!nessieConfig.isCompressionEnabled()) { + properties.put("transport.disable-compression", "true"); } - }); - if (!nessieConfig.isCompressionEnabled()) { - properties.put("transport.disable-compression", "true"); - } + break; + case REST: + properties.put(URI, restConfig.getServerUri().orElseThrow( + () -> new IllegalStateException("iceberg.rest.uri must be set for REST catalog"))); + restConfig.getAuthenticationType().ifPresent(type -> { + if (type == OAUTH2) { + if (!restConfig.credentialOrTokenExists()) { + throw new IllegalStateException("iceberg.rest.auth.oauth2 requires either a credential or a token"); + } + restConfig.getCredential().ifPresent(credential -> properties.put(CREDENTIAL, credential)); + restConfig.getToken().ifPresent(token -> properties.put(TOKEN, token)); + } + }); + restConfig.getSessionType().ifPresent(type -> { + if (type == USER) { + properties.putAll(session.getIdentity().getExtraCredentials()); + + String sessionId = format("%s-%s", session.getUser(), session.getSource().orElse("default")); + String jwt = Jwts.builder() + .setId(sessionId) + .setSubject(session.getUser()) + .setIssuedAt(new Date()) + .setIssuer(nodeVersion.toString()) + .claim("user", session.getUser()) + .claim("source", session.getSource().orElse("")) + .compact(); + + properties.put(OAuth2Properties.JWT_TOKEN_TYPE, jwt); + } + }); + break; } return properties; } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/AuthenticationType.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/AuthenticationType.java new file mode 100644 index 000000000000..95bd3d4f9955 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/AuthenticationType.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +public enum AuthenticationType +{ + NONE, + OAUTH2 +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestConfig.java new file mode 100644 index 000000000000..b00c68f09aca --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/IcebergRestConfig.java @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.NotNull; + +import java.util.Optional; + +public class IcebergRestConfig +{ + private String serverUri; + private SessionType sessionType; + private AuthenticationType authenticationType; + private String credential; + private String token; + + @NotNull + public Optional getServerUri() + { + return Optional.ofNullable(serverUri); + } + + @Config("iceberg.rest.uri") + @ConfigDescription("The URI to connect to the REST server") + public IcebergRestConfig setServerUri(String serverUri) + { + this.serverUri = serverUri; + return this; + } + + public Optional getSessionType() + { + return Optional.ofNullable(sessionType); + } + + @Config("iceberg.rest.session.type") + @ConfigDescription("The session type to use for communicating with REST catalog server (NONE | USER)") + public IcebergRestConfig setSessionType(SessionType sessionType) + { + this.sessionType = sessionType; + return this; + } + + public Optional getAuthenticationType() + { + return Optional.ofNullable(authenticationType); + } + + @Config("iceberg.rest.auth.type") + @ConfigDescription("The authentication type to use for communicating with REST catalog server (NONE | OAUTH2)") + public IcebergRestConfig setAuthenticationType(AuthenticationType authenticationType) + { + this.authenticationType = authenticationType; + return this; + } + + public Optional getCredential() + { + return Optional.ofNullable(credential); + } + + @Config("iceberg.rest.auth.oauth2.credential") + @ConfigDescription("The credential to use for OAUTH2 authentication") + public IcebergRestConfig setCredential(String credential) + { + this.credential = credential; + return this; + } + + public Optional getToken() + { + return Optional.ofNullable(token); + } + + @Config("iceberg.rest.auth.oauth2.token") + @ConfigDescription("The Bearer token to use for OAUTH2 authentication") + public IcebergRestConfig setToken(String token) + { + this.token = token; + return this; + } + + public boolean credentialOrTokenExists() + { + return credential != null || token != null; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/SessionType.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/SessionType.java new file mode 100644 index 000000000000..87e1fbee61cd --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/rest/SessionType.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +public enum SessionType +{ + NONE, + USER +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 3dac9dc34e35..8a8972fd8b2d 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -765,7 +765,7 @@ public void testCreateTableWithFormatVersion() testWithAllFormatVersions(this::testCreateTableWithFormatVersion); } - private void testCreateTableWithFormatVersion(String formatVersion, String defaultDeleteMode) + protected void testCreateTableWithFormatVersion(String formatVersion, String defaultDeleteMode) { @Language("SQL") String createTable = "" + "CREATE TABLE test_create_table_with_format_version_" + formatVersion + " " + diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 824a519ef686..f96c395a3534 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1385,7 +1385,7 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map getConnectorProperties(CatalogType icebergCat Path testDataDirectory = icebergDataDirectory.resolve(TEST_DATA_DIRECTORY); switch (icebergCatalogType) { case HADOOP: + case REST: case NESSIE: return ImmutableMap.of("iceberg.catalog.warehouse", testDataDirectory.getParent().toFile().toURI().toString()); case HIVE: diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java index ed10e1468189..f55e5d46d92c 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.iceberg.hadoop; +import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.hive.gcs.HiveGcsConfig; import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; import com.facebook.presto.hive.s3.HiveS3Config; @@ -23,6 +24,7 @@ import com.facebook.presto.iceberg.IcebergResourceFactory; import com.facebook.presto.iceberg.IcebergUtil; import com.facebook.presto.iceberg.nessie.NessieConfig; +import com.facebook.presto.iceberg.rest.IcebergRestConfig; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.tests.DistributedQueryRunner; @@ -68,8 +70,10 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String IcebergResourceFactory resourceFactory = new IcebergResourceFactory(icebergConfig, new IcebergCatalogName(ICEBERG_CATALOG), new NessieConfig(), + new IcebergRestConfig(), new PrestoS3ConfigurationUpdater(new HiveS3Config()), - new HiveGcsConfigurationInitializer(new HiveGcsConfig())); + new HiveGcsConfigurationInitializer(new HiveGcsConfig()), + new NodeVersion("test_version")); return IcebergUtil.getNativeIcebergTable(resourceFactory, session, diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java index 91e3c2214d43..3dbd44f018dc 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.iceberg.nessie; +import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.hive.gcs.HiveGcsConfig; import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; import com.facebook.presto.hive.s3.HiveS3Config; @@ -23,6 +24,7 @@ import com.facebook.presto.iceberg.IcebergQueryRunner; import com.facebook.presto.iceberg.IcebergResourceFactory; import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.iceberg.rest.IcebergRestConfig; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.testing.QueryRunner; @@ -107,8 +109,10 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String IcebergResourceFactory resourceFactory = new IcebergResourceFactory(icebergConfig, new IcebergCatalogName(ICEBERG_CATALOG), nessieConfig, + new IcebergRestConfig(), new PrestoS3ConfigurationUpdater(new HiveS3Config()), - new HiveGcsConfigurationInitializer(new HiveGcsConfig())); + new HiveGcsConfigurationInitializer(new HiveGcsConfig()), + new NodeVersion("test_version")); return IcebergUtil.getNativeIcebergTable(resourceFactory, session, diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java new file mode 100644 index 000000000000..d9edd97ad81d --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java @@ -0,0 +1,130 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.http.server.TheServlet; +import com.facebook.airlift.http.server.testing.TestingHttpServer; +import com.facebook.airlift.http.server.testing.TestingHttpServerModule; +import com.facebook.airlift.node.NodeInfo; +import com.facebook.presto.hive.HdfsContext; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.testing.TestingConnectorSession; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.TypeLiteral; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.rest.IcebergRestCatalogServlet; +import org.apache.iceberg.rest.RESTCatalogAdapter; +import org.apache.iceberg.rest.RESTSessionCatalog; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static com.facebook.presto.iceberg.CatalogType.REST; +import static com.facebook.presto.iceberg.IcebergDistributedTestBase.getHdfsEnvironment; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.CatalogProperties.URI; +import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; + +public class IcebergRestTestUtil +{ + public static final ConnectorSession SESSION = new TestingConnectorSession(ImmutableList.of()); + + private IcebergRestTestUtil() + { + } + + public static Map restConnectorProperties(String serverUri) + { + return ImmutableMap.of("iceberg.catalog.type", REST.name(), "iceberg.rest.uri", serverUri); + } + + public static TestingHttpServer getRestServer(String location) + { + JdbcCatalog backingCatalog = new JdbcCatalog(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(); + backingCatalog.setConf(hdfsEnvironment.getConfiguration(new HdfsContext(SESSION), new Path(location))); + + Map properties = ImmutableMap.builder() + .put(URI, "jdbc:h2:mem:test_" + System.nanoTime() + "_" + ThreadLocalRandom.current().nextInt()) + .put(WAREHOUSE_LOCATION, location) + .put("jdbc.username", "user") + .put("jdbc.password", "password") + .build(); + backingCatalog.initialize("rest_jdbc_backend", properties); + + DelegateRestSessionCatalog delegate = new DelegateRestSessionCatalog(new RESTCatalogAdapter(backingCatalog), backingCatalog); + return delegate.getServerInstance(); + } + + public static class DelegateRestSessionCatalog + extends RESTSessionCatalog + { + public RESTCatalogAdapter adapter; + private final Catalog delegate; + + public DelegateRestSessionCatalog(RESTCatalogAdapter adapter, Catalog delegate) + { + super(properties -> adapter, null); + this.adapter = requireNonNull(adapter, "adapter is null"); + this.delegate = requireNonNull(delegate, "delegate catalog is null"); + } + + @Override + public void close() + throws IOException + { + super.close(); + adapter.close(); + + if (delegate instanceof Closeable) { + ((Closeable) delegate).close(); + } + } + + public TestingHttpServer getServerInstance() + { + Bootstrap app = new Bootstrap( + new TestingHttpServerModule(), + new RestHttpServerModule()); + + Injector injector = app + .doNotInitializeLogging() + .initialize(); + + return injector.getInstance(TestingHttpServer.class); + } + + private class RestHttpServerModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(new TypeLiteral>() {}).annotatedWith(TheServlet.class).toInstance(ImmutableMap.of()); + binder.bind(javax.servlet.Servlet.class).annotatedWith(TheServlet.class).toInstance(new IcebergRestCatalogServlet(adapter)); + binder.bind(NodeInfo.class).toInstance(new NodeInfo("test")); + } + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergDistributedRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergDistributedRest.java new file mode 100644 index 000000000000..ff8474bf7394 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergDistributedRest.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.http.server.testing.TestingHttpServer; +import com.facebook.presto.iceberg.IcebergDistributedTestBase; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import org.assertj.core.util.Files; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.facebook.presto.iceberg.CatalogType.REST; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.getRestServer; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.restConnectorProperties; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; + +@Test +public class TestIcebergDistributedRest + extends IcebergDistributedTestBase +{ + private TestingHttpServer restServer; + private String serverUri; + private File warehouseLocation; + + protected TestIcebergDistributedRest() + { + super(REST); + } + + @Override + protected boolean supportsViews() + { + return false; + } + + @Override + protected Map getProperties() + { + return ImmutableMap.of("uri", serverUri); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + warehouseLocation = Files.newTemporaryFolder(); + + restServer = getRestServer(warehouseLocation.getAbsolutePath()); + restServer.start(); + + serverUri = restServer.getBaseUrl().toString(); + super.init(); + } + + @AfterClass + public void tearDown() + throws Exception + { + if (restServer != null) { + restServer.stop(); + } + deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner( + ImmutableMap.of(), + restConnectorProperties(serverUri), + PARQUET, + true, + false, + OptionalInt.empty(), + Optional.of(warehouseLocation.toPath())); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestConfig.java new file mode 100644 index 000000000000..730a58d862c5 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.configuration.testing.ConfigAssertions; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static com.facebook.presto.iceberg.rest.AuthenticationType.OAUTH2; +import static com.facebook.presto.iceberg.rest.SessionType.USER; + +public class TestIcebergRestConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(ConfigAssertions.recordDefaults(IcebergRestConfig.class) + .setServerUri(null) + .setAuthenticationType(null) + .setCredential(null) + .setToken(null) + .setSessionType(null)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("iceberg.rest.uri", "http://localhost:xxx") + .put("iceberg.rest.auth.type", "OAUTH2") + .put("iceberg.rest.auth.oauth2.credential", "key:secret") + .put("iceberg.rest.auth.oauth2.token", "SXVLUXUhIExFQ0tFUiEK") + .put("iceberg.rest.session.type", "USER") + .build(); + + IcebergRestConfig expected = new IcebergRestConfig() + .setServerUri("http://localhost:xxx") + .setAuthenticationType(OAUTH2) + .setCredential("key:secret") + .setToken("SXVLUXUhIExFQ0tFUiEK") + .setSessionType(USER); + + assertFullMapping(properties, expected); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java new file mode 100644 index 000000000000..2483dd5b9cf5 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java @@ -0,0 +1,168 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.rest; + +import com.facebook.airlift.http.server.testing.TestingHttpServer; +import com.facebook.presto.hive.NodeVersion; +import com.facebook.presto.hive.gcs.HiveGcsConfig; +import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.iceberg.IcebergCatalogName; +import com.facebook.presto.iceberg.IcebergConfig; +import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.IcebergResourceFactory; +import com.facebook.presto.iceberg.nessie.NessieConfig; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.Table; +import org.assertj.core.util.Files; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.facebook.presto.iceberg.CatalogType.REST; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.getRestServer; +import static com.facebook.presto.iceberg.rest.IcebergRestTestUtil.restConnectorProperties; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Test +public class TestIcebergSmokeRest + extends IcebergDistributedSmokeTestBase +{ + private File warehouseLocation; + private TestingHttpServer restServer; + private String serverUri; + + public TestIcebergSmokeRest() + { + super(REST); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + warehouseLocation = Files.newTemporaryFolder(); + + restServer = getRestServer(warehouseLocation.getAbsolutePath()); + restServer.start(); + + serverUri = restServer.getBaseUrl().toString(); + super.init(); + } + + @AfterClass + public void tearDown() + throws Exception + { + if (restServer != null) { + restServer.stop(); + } + deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE); + } + + @Override + protected String getLocation(String schema, String table) + { + return format("%s/%s/%s", warehouseLocation, schema, table); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner( + ImmutableMap.of(), + restConnectorProperties(serverUri), + PARQUET, + true, + false, + OptionalInt.empty(), + Optional.of(warehouseLocation.toPath())); + } + + protected IcebergResourceFactory getResourceFactory() + { + IcebergConfig icebergConfig = new IcebergConfig() + .setCatalogType(REST) + .setCatalogWarehouse(warehouseLocation.getAbsolutePath().toString()); + IcebergRestConfig restConfig = new IcebergRestConfig().setServerUri(serverUri); + + return new IcebergResourceFactory(icebergConfig, + new IcebergCatalogName(ICEBERG_CATALOG), + new NessieConfig(), + restConfig, + new PrestoS3ConfigurationUpdater(new HiveS3Config()), + new HiveGcsConfigurationInitializer(new HiveGcsConfig()), + new NodeVersion("test_version")); + } + + @Override + protected Table getIcebergTable(ConnectorSession session, String schema, String tableName) + { + return getNativeIcebergTable(getResourceFactory(), + session, + SchemaTableName.valueOf(schema + "." + tableName)); + } + + @Test + public void testDeleteOnPartitionedV1Table() + { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(super::testDeleteOnPartitionedV1Table) + .isInstanceOf(RuntimeException.class) + .hasMessageMatching("Cannot downgrade v2 table to v1"); + } + + @Test + public void testCreateTableWithFormatVersion() + { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnNonIdentityPartitionColumn("1", "copy-on-write")) + .isInstanceOf(RuntimeException.class) + .hasMessageMatching("Cannot downgrade v2 table to v1"); + + // v2 succeeds + super.testCreateTableWithFormatVersion("2", "merge-on-read"); + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnNonIdentityPartitionColumn(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnNonIdentityPartitionColumn(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnNonIdentityPartitionColumn(version, mode); + } + } +} diff --git a/presto-iceberg/src/test/java/org/apache/iceberg/rest/IcebergRestCatalogServlet.java b/presto-iceberg/src/test/java/org/apache/iceberg/rest/IcebergRestCatalogServlet.java new file mode 100644 index 000000000000..e9aa46a58968 --- /dev/null +++ b/presto-iceberg/src/test/java/org/apache/iceberg/rest/IcebergRestCatalogServlet.java @@ -0,0 +1,252 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.rest; + +import com.facebook.airlift.log.Logger; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.io.CharStreams; +import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod; +import org.apache.iceberg.rest.RESTCatalogAdapter.Route; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.util.Pair; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; + +/** + * The IcebergRestCatalogServlet provides a servlet implementation used in combination with a + * RESTCatalogAdaptor to proxy the REST Spec to any Catalog implementation. + */ +public class IcebergRestCatalogServlet + extends HttpServlet +{ + private static final Logger LOG = Logger.get(IcebergRestCatalogServlet.class); + + private final RESTCatalogAdapter restCatalogAdapter; + private final Map responseHeaders = + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + + public IcebergRestCatalogServlet(RESTCatalogAdapter restCatalogAdapter) + { + this.restCatalogAdapter = restCatalogAdapter; + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + @Override + protected void doHead(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + @Override + protected void doDelete(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + execute(ServletRequestContext.from(request), response); + } + + protected void execute(ServletRequestContext context, HttpServletResponse response) + throws IOException + { + response.setStatus(HttpServletResponse.SC_OK); + responseHeaders.forEach(response::setHeader); + + if (context.error().isPresent()) { + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + RESTObjectMapper.mapper().writeValue(response.getWriter(), context.error().get()); + return; + } + + try { + Object responseBody = + restCatalogAdapter.execute( + context.method(), + context.path(), + context.queryParams(), + context.body(), + context.route().responseClass(), + context.headers(), + handle(response)); + + if (responseBody != null) { + RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody); + } + } + catch (RESTException e) { + if (context.route() == Route.LOAD_TABLE && e.getLocalizedMessage().contains("NoSuchTableException")) { + // Suppress stack trace for load_table requests, most of which occur immediately + // preceding a create_table request + LOG.warn("Table at endpoint %s does not exist", context.path()); + } + else { + LOG.error(e, "Error processing REST request at endpoint %s", context.path()); + } + response.setStatus(SC_INTERNAL_SERVER_ERROR); + } + catch (Exception e) { + LOG.error(e, "Unexpected exception when processing REST request"); + response.setStatus(SC_INTERNAL_SERVER_ERROR); + } + } + + protected Consumer handle(HttpServletResponse response) + { + return (errorResponse) -> { + response.setStatus(errorResponse.code()); + try { + RESTObjectMapper.mapper().writeValue(response.getWriter(), errorResponse); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + } + + public static class ServletRequestContext + { + private HTTPMethod method; + private Route route; + private String path; + private Map headers; + private Map queryParams; + private Object body; + private ErrorResponse errorResponse; + + private ServletRequestContext(ErrorResponse errorResponse) + { + this.errorResponse = errorResponse; + } + + private ServletRequestContext( + HTTPMethod method, + Route route, + String path, + Map headers, + Map queryParams, + Object body) + { + this.method = method; + this.route = route; + this.path = path; + this.headers = headers; + this.queryParams = queryParams; + this.body = body; + } + + static ServletRequestContext from(HttpServletRequest request) + throws IOException + { + HTTPMethod method = HTTPMethod.valueOf(request.getMethod()); + String path = request.getRequestURI().substring(1); + Pair> routeContext = Route.from(method, path); + + if (routeContext == null) { + return new ServletRequestContext( + ErrorResponse.builder() + .responseCode(400) + .withType("BadRequestException") + .withMessage(format("No route for request: %s %s", method, path)) + .build()); + } + + Route route = routeContext.first(); + Object requestBody = null; + if (route.requestClass() != null) { + requestBody = + RESTObjectMapper.mapper().readValue(request.getReader(), route.requestClass()); + } + else if (route == Route.TOKENS) { + try (Reader reader = new InputStreamReader(request.getInputStream())) { + requestBody = RESTUtil.decodeFormData(CharStreams.toString(reader)); + } + } + + Map queryParams = + request.getParameterMap().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()[0])); + Map headers = + Collections.list(request.getHeaderNames()).stream() + .collect(Collectors.toMap(Function.identity(), request::getHeader)); + + return new ServletRequestContext(method, route, path, headers, queryParams, requestBody); + } + + public HTTPMethod method() + { + return method; + } + + public Route route() + { + return route; + } + + public String path() + { + return path; + } + + public Map headers() + { + return headers; + } + + public Map queryParams() + { + return queryParams; + } + + public Object body() + { + return body; + } + + public Optional error() + { + return Optional.ofNullable(errorResponse); + } + } +}