Skip to content

Commit

Permalink
Add support for REST catalog in Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
kiersten-stokes authored and tdcmeehan committed May 24, 2024
1 parent 865a6c9 commit 7db640f
Show file tree
Hide file tree
Showing 19 changed files with 1,047 additions and 35 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
<dep.docker-java.version>3.3.0</dep.docker-java.version>
<dep.jayway.version>2.6.0</dep.jayway.version>
<dep.ratis.version>2.2.0</dep.ratis.version>
<dep.jackson.version>2.11.0</dep.jackson.version>
<!--
America/Bahia_Banderas has:
- offset change since 1970 (offset Jan 1970: -08:00, offset Jan 2018: -06:00)
Expand Down
48 changes: 45 additions & 3 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Iceberg tables store most of the metadata in the metadata files, along with the
filesystem, but it still requires a central place to find the current location of the
current metadata pointer for a table. This central place is called the ``Iceberg Catalog``.
The Presto Iceberg connector supports different types of Iceberg Catalogs : ``Hive Metastore``,
``GLUE``, ``NESSIE``, and ``HADOOP``.
``GLUE``, ``NESSIE``, ``REST`` and ``HADOOP``.

To configure the Iceberg connector, create a catalog properties file
``etc/catalog/iceberg.properties``. To define the catalog type, ``iceberg.catalog.type`` property
Expand Down Expand Up @@ -144,12 +144,49 @@ If an error similar to the following example is displayed, this is probably beca
at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.executeRequest(UrlConnectionRequest.java:71)
... 42 more
REST catalog
^^^^^^^^^^^^

To use a REST catalog, configure the catalog type as
``iceberg.catalog.type=rest``. A minimal configuration includes:

.. code-block:: none
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest.uri=https://localhost:8181
Additional supported properties for the REST catalog:

==================================================== ============================================================
Property Name Description
==================================================== ============================================================
``iceberg.rest.uri`` REST API endpoint URI (required).
Example: ``https://localhost:8181``

``iceberg.rest.auth.type`` The authentication type to use.
Available values are ``NONE`` or ``OAUTH2`` (default: ``NONE``).
``OAUTH2`` requires either a credential or token.

``iceberg.rest.auth.oauth2.credential`` The credential to use for OAUTH2 authentication.
Example: ``key:secret``

``iceberg.rest.auth.oauth2.token`` The Bearer token to use for OAUTH2 authentication.
Example: ``SXVLUXUhIExFQ0tFUiEK``

``iceberg.rest.session.type`` The session type to use when communicating with the REST catalog.
Available values are ``NONE`` or ``USER`` (default: ``NONE``).

``iceberg.catalog.warehouse`` A catalog warehouse root path for Iceberg tables (optional).
Example: ``s3://warehouse/``

==================================================== ============================================================

Hadoop catalog
^^^^^^^^^^^^^^

To use a Hadoop catalog, configure the catalog type as
``iceberg.catalog.type=hadoop``
``iceberg.catalog.type=hadoop``. A minimal configuration includes:

.. code-block:: none
Expand Down Expand Up @@ -190,7 +227,7 @@ Property Name Description

Example: ``hdfs://nn:8020/warehouse/path``
This property is required if the ``iceberg.catalog.type`` is
``hadoop``. Otherwise, it will be ignored.
``hadoop``.

``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
required if the ``iceberg.catalog.type`` is ``hadoop``.
Expand Down Expand Up @@ -898,6 +935,11 @@ in the case where a specific metadata file contains the targeted table state::

CALL iceberg.system.register_table('schema_name', 'table_name', 'hdfs://localhost:9000/path/to/iceberg/table/metadata/dir', '00000-35a08aed-f4b0-4010-95d2-9d73ef4be01c.metadata.json')

.. note::

The Iceberg REST catalog may not support table register depending on the
type of the backing catalog.

.. note::

When registering a table with the Hive metastore, the user calling the
Expand Down
58 changes: 55 additions & 3 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,12 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.2</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.2</version>
<scope>compile</scope>
</dependency>

Expand Down Expand Up @@ -480,6 +478,21 @@
<artifactId>datasketches-memory</artifactId>
</dependency>

<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<scope>runtime</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
Expand Down Expand Up @@ -581,6 +594,40 @@
<artifactId>presto-jmx</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>1.5.0</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
<exclusion>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</exclusion>
<exclusion>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand All @@ -590,10 +637,15 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<!-- Ignore these because they are picked up as false-positives when configuring logging in the IcebergQueryRunner-->
<ignoredUsedUndeclaredDependencies>
<!-- Ignore these because they are picked up as false-positives when configuring logging in the IcebergQueryRunner-->
<dependency>org.glassfish.jersey.core:jersey-common:jar</dependency>
<dependency>org.eclipse.jetty:jetty-server:jar</dependency>
<!-- Ignore these because they are picked up as false-positives when configuring REST catalog testing http server -->
<dependency>com.facebook.airlift:http-server:jar</dependency>
<dependency>com.facebook.airlift:node:jar</dependency>
<dependency>javax.servlet:javax.servlet-api:jar</dependency>
<dependency>org.apache.httpcomponents.core5:httpcore5:jar</dependency>
</ignoredUsedUndeclaredDependencies>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.nessie.NessieCatalog;
import org.apache.iceberg.rest.RESTCatalog;

public enum CatalogType
{
HADOOP(HadoopCatalog.class.getName()),
HIVE(HiveCatalog.class.getName()),
NESSIE(NessieCatalog.class.getName()),

REST(RESTCatalog.class.getName())
/**/;

private final String catalogImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.iceberg.rest.IcebergRestConfig;
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
Expand Down Expand Up @@ -128,6 +129,7 @@ public void setup(Binder binder)

configBinder(binder).bindConfig(IcebergConfig.class);
configBinder(binder).bindConfig(NessieConfig.class);
configBinder(binder).bindConfig(IcebergRestConfig.class);

binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(IcebergTableProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.gcs.GcsConfigurationInitializer;
import com.facebook.presto.hive.s3.S3ConfigurationUpdater;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.rest.IcebergRestConfig;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.jsonwebtoken.Jwts;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.rest.auth.OAuth2Properties;

import javax.inject.Inject;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -40,12 +45,18 @@
import static com.facebook.presto.iceberg.IcebergUtil.loadCachingProperties;
import static com.facebook.presto.iceberg.nessie.AuthenticationType.BASIC;
import static com.facebook.presto.iceberg.nessie.AuthenticationType.BEARER;
import static com.facebook.presto.iceberg.rest.AuthenticationType.OAUTH2;
import static com.facebook.presto.iceberg.rest.SessionType.USER;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.CatalogProperties.FILE_IO_IMPL;
import static org.apache.iceberg.CatalogProperties.URI;
import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION;
import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL;
import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN;

/**
* Factory for loading Iceberg resources such as Catalog.
Expand All @@ -63,18 +74,29 @@ public class IcebergResourceFactory
private final GcsConfigurationInitializer gcsConfigurationInitialize;

private final IcebergConfig icebergConfig;
private final IcebergRestConfig restConfig;
private final NodeVersion nodeVersion;

@Inject
public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogName, NessieConfig nessieConfig, S3ConfigurationUpdater s3ConfigurationUpdater, GcsConfigurationInitializer gcsConfigurationInitialize)
public IcebergResourceFactory(
IcebergConfig config,
IcebergCatalogName catalogName,
NessieConfig nessieConfig,
IcebergRestConfig restConfig,
S3ConfigurationUpdater s3ConfigurationUpdater,
GcsConfigurationInitializer gcsConfigurationInitialize,
NodeVersion nodeVersion)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null").getCatalogName();
this.icebergConfig = requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.catalogWarehouse = config.getCatalogWarehouse();
this.hadoopConfigResources = config.getHadoopConfigResources();
this.nessieConfig = requireNonNull(nessieConfig, "nessieConfig is null");
this.restConfig = requireNonNull(restConfig, "restConfig is null");
this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null");
this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
catalogCache = CacheBuilder.newBuilder()
.maximumSize(config.getCatalogCacheSize())
.build();
Expand Down Expand Up @@ -150,31 +172,63 @@ public Map<String, String> getCatalogProperties(ConnectorSession session)
if (catalogWarehouse != null) {
properties.put(WAREHOUSE_LOCATION, catalogWarehouse);
}
if (catalogType == NESSIE) {
properties.put("ref", getNessieReferenceName(session));
properties.put("uri", nessieConfig.getServerUri().orElseThrow(() -> new IllegalStateException("iceberg.nessie.uri must be set for Nessie")));
String hash = getNessieReferenceHash(session);
if (hash != null) {
properties.put("ref.hash", hash);
}
nessieConfig.getReadTimeoutMillis().ifPresent(val -> properties.put("transport.read-timeout", val.toString()));
nessieConfig.getConnectTimeoutMillis().ifPresent(val -> properties.put("transport.connect-timeout", val.toString()));
nessieConfig.getClientBuilderImpl().ifPresent(val -> properties.put("client-builder-impl", val));
nessieConfig.getAuthenticationType().ifPresent(type -> {
if (type == BASIC) {
properties.put("authentication.username", nessieConfig.getUsername()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.username must be set with BASIC authentication")));
properties.put("authentication.password", nessieConfig.getPassword()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.password must be set with BASIC authentication")));
switch (catalogType) {
case NESSIE:
properties.put("ref", getNessieReferenceName(session));
properties.put("uri", nessieConfig.getServerUri().orElseThrow(() -> new IllegalStateException("iceberg.nessie.uri must be set for Nessie")));
String hash = getNessieReferenceHash(session);
if (hash != null) {
properties.put("ref.hash", hash);
}
else if (type == BEARER) {
properties.put("authentication.token", nessieConfig.getBearerToken()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.bearer.token must be set with BEARER authentication")));
nessieConfig.getReadTimeoutMillis().ifPresent(val -> properties.put("transport.read-timeout", val.toString()));
nessieConfig.getConnectTimeoutMillis().ifPresent(val -> properties.put("transport.connect-timeout", val.toString()));
nessieConfig.getClientBuilderImpl().ifPresent(val -> properties.put("client-builder-impl", val));
nessieConfig.getAuthenticationType().ifPresent(type -> {
if (type == BASIC) {
properties.put("authentication.username", nessieConfig.getUsername()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.username must be set with BASIC authentication")));
properties.put("authentication.password", nessieConfig.getPassword()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.password must be set with BASIC authentication")));
}
else if (type == BEARER) {
properties.put("authentication.token", nessieConfig.getBearerToken()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.bearer.token must be set with BEARER authentication")));
}
});
if (!nessieConfig.isCompressionEnabled()) {
properties.put("transport.disable-compression", "true");
}
});
if (!nessieConfig.isCompressionEnabled()) {
properties.put("transport.disable-compression", "true");
}
break;
case REST:
properties.put(URI, restConfig.getServerUri().orElseThrow(
() -> new IllegalStateException("iceberg.rest.uri must be set for REST catalog")));
restConfig.getAuthenticationType().ifPresent(type -> {
if (type == OAUTH2) {
if (!restConfig.credentialOrTokenExists()) {
throw new IllegalStateException("iceberg.rest.auth.oauth2 requires either a credential or a token");
}
restConfig.getCredential().ifPresent(credential -> properties.put(CREDENTIAL, credential));
restConfig.getToken().ifPresent(token -> properties.put(TOKEN, token));
}
});
restConfig.getSessionType().ifPresent(type -> {
if (type == USER) {
properties.putAll(session.getIdentity().getExtraCredentials());

String sessionId = format("%s-%s", session.getUser(), session.getSource().orElse("default"));
String jwt = Jwts.builder()
.setId(sessionId)
.setSubject(session.getUser())
.setIssuedAt(new Date())
.setIssuer(nodeVersion.toString())
.claim("user", session.getUser())
.claim("source", session.getSource().orElse(""))
.compact();

properties.put(OAuth2Properties.JWT_TOKEN_TYPE, jwt);
}
});
break;
}
return properties;
}
Expand Down
Loading

0 comments on commit 7db640f

Please sign in to comment.