Skip to content

Commit

Permalink
v2 changes for timestream (#2239)
Browse files Browse the repository at this point in the history
  • Loading branch information
Trianz-Akshay committed Sep 13, 2024
1 parent a6fc14d commit b70cbd5
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 234 deletions.
18 changes: 12 additions & 6 deletions athena-timestream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
<version>${slf4j-log4j.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-timestreamwrite</artifactId>
<version>${aws-sdk.version}</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>timestreamwrite</artifactId>
<version>${aws-sdk-v2.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-timestreamquery</artifactId>
<version>${aws-sdk.version}</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>timestreamquery</artifactId>
<version>${aws-sdk-v2.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -85,6 +85,12 @@
<version>${log4j2Version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,42 @@
*/
package com.amazonaws.athena.connectors.timestream;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.timestreamquery.AmazonTimestreamQuery;
import com.amazonaws.services.timestreamquery.AmazonTimestreamQueryClientBuilder;
import com.amazonaws.services.timestreamwrite.AmazonTimestreamWrite;
import com.amazonaws.services.timestreamwrite.AmazonTimestreamWriteClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient;
import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient;

public class TimestreamClientBuilder
{
private static final Logger logger = LoggerFactory.getLogger(TimestreamClientBuilder.class);

static Region defaultRegion = DefaultAwsRegionProviderChain.builder().build().getRegion();
private TimestreamClientBuilder()
{
// prevent instantiation with private constructor
}

public static AmazonTimestreamQuery buildQueryClient(String sourceType)
public static TimestreamQueryClient buildQueryClient(String sourceType)
{
return AmazonTimestreamQueryClientBuilder.standard().withClientConfiguration(buildClientConfiguration(sourceType)).build();
return TimestreamQueryClient.builder().region(defaultRegion).credentialsProvider(DefaultCredentialsProvider.create())
.overrideConfiguration(buildClientConfiguration(sourceType)).build();
}

public static AmazonTimestreamWrite buildWriteClient(String sourceType)
public static TimestreamWriteClient buildWriteClient(String sourceType)
{
return AmazonTimestreamWriteClientBuilder.standard().withClientConfiguration(buildClientConfiguration(sourceType)).build();
return TimestreamWriteClient.builder().region(defaultRegion).credentialsProvider(DefaultCredentialsProvider.create())
.overrideConfiguration(buildClientConfiguration(sourceType)).build();
}

static ClientConfiguration buildClientConfiguration(String sourceType)
static ClientOverrideConfiguration buildClientConfiguration(String sourceType)
{
String userAgent = "aws-athena-" + sourceType + "-connector";
ClientConfiguration clientConfiguration = new ClientConfiguration().withUserAgentPrefix(userAgent);
logger.info("Created client configuration with user agent {} for Timestream SDK", clientConfiguration.getUserAgentPrefix());
ClientOverrideConfiguration clientConfiguration = ClientOverrideConfiguration.builder().putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, userAgent).build();
logger.info("Created client configuration with user agent {} for Timestream SDK is present", clientConfiguration.advancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX).isPresent());
return clientConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@
import com.amazonaws.athena.connector.util.PaginatedRequestIterator;
import com.amazonaws.athena.connectors.timestream.qpt.TimestreamQueryPassthrough;
import com.amazonaws.athena.connectors.timestream.query.QueryFactory;
import com.amazonaws.services.timestreamquery.AmazonTimestreamQuery;
import com.amazonaws.services.timestreamquery.model.ColumnInfo;
import com.amazonaws.services.timestreamquery.model.Datum;
import com.amazonaws.services.timestreamquery.model.QueryRequest;
import com.amazonaws.services.timestreamquery.model.QueryResult;
import com.amazonaws.services.timestreamquery.model.Row;
import com.amazonaws.services.timestreamwrite.AmazonTimestreamWrite;
import com.amazonaws.services.timestreamwrite.model.ListDatabasesRequest;
import com.amazonaws.services.timestreamwrite.model.ListDatabasesResult;
import com.amazonaws.services.timestreamwrite.model.ListTablesResult;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.types.pojo.Field;
Expand All @@ -62,6 +52,16 @@
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
import software.amazon.awssdk.services.timestreamquery.TimestreamQueryClient;
import software.amazon.awssdk.services.timestreamquery.model.ColumnInfo;
import software.amazon.awssdk.services.timestreamquery.model.Datum;
import software.amazon.awssdk.services.timestreamquery.model.QueryRequest;
import software.amazon.awssdk.services.timestreamquery.model.QueryResponse;
import software.amazon.awssdk.services.timestreamquery.model.Row;
import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient;
import software.amazon.awssdk.services.timestreamwrite.model.Database;
import software.amazon.awssdk.services.timestreamwrite.model.ListDatabasesRequest;
import software.amazon.awssdk.services.timestreamwrite.model.ListDatabasesResponse;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -90,8 +90,8 @@ public class TimestreamMetadataHandler
private final QueryFactory queryFactory = new QueryFactory();

private final GlueClient glue;
private final AmazonTimestreamQuery tsQuery;
private final AmazonTimestreamWrite tsMeta;
private final TimestreamQueryClient tsQuery;
private final TimestreamWriteClient tsMeta;

private final TimestreamQueryPassthrough queryPassthrough;

Expand All @@ -106,8 +106,8 @@ public TimestreamMetadataHandler(java.util.Map<String, String> configOptions)

@VisibleForTesting
protected TimestreamMetadataHandler(
AmazonTimestreamQuery tsQuery,
AmazonTimestreamWrite tsMeta,
TimestreamQueryClient tsQuery,
TimestreamWriteClient tsMeta,
GlueClient glue,
EncryptionKeyFactory keyFactory,
SecretsManagerClient secretsManager,
Expand Down Expand Up @@ -136,19 +136,19 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca
public ListSchemasResponse doListSchemaNames(BlockAllocator blockAllocator, ListSchemasRequest request)
throws Exception
{
List<String> schemas = PaginatedRequestIterator.stream(this::doListSchemaNamesOnePage, ListDatabasesResult::getNextToken)
.flatMap(result -> result.getDatabases().stream())
.map(db -> db.getDatabaseName())
List<String> schemas = PaginatedRequestIterator.stream(this::doListSchemaNamesOnePage, ListDatabasesResponse::nextToken)
.flatMap(result -> result.databases().stream())
.map(Database::databaseName)
.collect(Collectors.toList());

return new ListSchemasResponse(
request.getCatalogName(),
schemas);
}

private ListDatabasesResult doListSchemaNamesOnePage(String nextToken)
private ListDatabasesResponse doListSchemaNamesOnePage(String nextToken)
{
return tsMeta.listDatabases(new ListDatabasesRequest().withNextToken(nextToken));
return tsMeta.listDatabases(ListDatabasesRequest.builder().nextToken(nextToken).build());
}

@Override
Expand All @@ -159,7 +159,7 @@ public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTables
try {
return doListTablesInternal(blockAllocator, request);
}
catch (com.amazonaws.services.timestreamwrite.model.ResourceNotFoundException ex) {
catch (software.amazon.awssdk.services.timestreamwrite.model.ResourceNotFoundException ex) {
// If it fails then we will retry after resolving the schema name by ignoring the casing
String resolvedSchemaName = findSchemaNameIgnoringCase(request.getSchemaName());
request = new ListTablesRequest(request.getIdentity(), request.getQueryId(), request.getCatalogName(), resolvedSchemaName, request.getNextToken(), request.getPageSize());
Expand Down Expand Up @@ -191,43 +191,43 @@ private ListTablesResponse doListTablesInternal(BlockAllocator blockAllocator, L
}

// Otherwise don't retrieve all pages, just pass through the page token.
ListTablesResult timestreamResults = doListTablesOnePage(request.getSchemaName(), request.getNextToken());
List<TableName> tableNames = timestreamResults.getTables()
software.amazon.awssdk.services.timestreamwrite.model.ListTablesResponse timestreamResults = doListTablesOnePage(request.getSchemaName(), request.getNextToken());
List<TableName> tableNames = timestreamResults.tables()
.stream()
.map(table -> new TableName(request.getSchemaName(), table.getTableName()))
.map(table -> new TableName(request.getSchemaName(), table.tableName()))
.collect(Collectors.toList());

// Pass through whatever token we got from Glue to the user
ListTablesResponse result = new ListTablesResponse(
request.getCatalogName(),
tableNames,
timestreamResults.getNextToken());
timestreamResults.nextToken());
logger.debug("doListTables [paginated] result: {}", result);
return result;
}

private ListTablesResult doListTablesOnePage(String schemaName, String nextToken)
private software.amazon.awssdk.services.timestreamwrite.model.ListTablesResponse doListTablesOnePage(String schemaName, String nextToken)
{
// TODO: We should pass through the pageSize as withMaxResults(pageSize)
com.amazonaws.services.timestreamwrite.model.ListTablesRequest listTablesRequest =
new com.amazonaws.services.timestreamwrite.model.ListTablesRequest()
.withDatabaseName(schemaName)
.withNextToken(nextToken);
software.amazon.awssdk.services.timestreamwrite.model.ListTablesRequest listTablesRequest = software.amazon.awssdk.services.timestreamwrite.model.ListTablesRequest.builder()
.databaseName(schemaName)
.nextToken(nextToken)
.build();
return tsMeta.listTables(listTablesRequest);
}

private Stream<TableName> getTableNamesInSchema(String schemaName)
{
return PaginatedRequestIterator.stream((pageToken) -> doListTablesOnePage(schemaName, pageToken), ListTablesResult::getNextToken)
.flatMap(currResult -> currResult.getTables().stream())
.map(table -> new TableName(schemaName, table.getTableName()));
return PaginatedRequestIterator.stream((pageToken) -> doListTablesOnePage(schemaName, pageToken), software.amazon.awssdk.services.timestreamwrite.model.ListTablesResponse::nextToken)
.flatMap(currResult -> currResult.tables().stream())
.map(table -> new TableName(schemaName, table.tableName()));
}

private String findSchemaNameIgnoringCase(String schemaNameInsensitive)
{
return PaginatedRequestIterator.stream(this::doListSchemaNamesOnePage, ListDatabasesResult::getNextToken)
.flatMap(result -> result.getDatabases().stream())
.map(db -> db.getDatabaseName())
return PaginatedRequestIterator.stream(this::doListSchemaNamesOnePage, ListDatabasesResponse::nextToken)
.flatMap(result -> result.databases().stream())
.map(Database::databaseName)
.filter(name -> name.equalsIgnoreCase(schemaNameInsensitive))
.findAny()
.orElseThrow(() -> new RuntimeException(String.format("Could not find a case-insensitive match for schema name %s", schemaNameInsensitive)));
Expand All @@ -238,9 +238,9 @@ private TableName findTableNameIgnoringCase(BlockAllocator blockAllocator, GetTa
String caseInsenstiveSchemaNameMatch = findSchemaNameIgnoringCase(getTableRequest.getTableName().getSchemaName());

// based on AmazonMskMetadataHandler::findGlueRegistryNameIgnoringCasing
return PaginatedRequestIterator.stream((pageToken) -> doListTablesOnePage(caseInsenstiveSchemaNameMatch, pageToken), ListTablesResult::getNextToken)
.flatMap(result -> result.getTables().stream())
.map(tbl -> new TableName(caseInsenstiveSchemaNameMatch, tbl.getTableName()))
return PaginatedRequestIterator.stream((pageToken) -> doListTablesOnePage(caseInsenstiveSchemaNameMatch, pageToken), software.amazon.awssdk.services.timestreamwrite.model.ListTablesResponse::nextToken)
.flatMap(result -> result.tables().stream())
.map(tbl -> new TableName(caseInsenstiveSchemaNameMatch, tbl.tableName()))
.filter(tbl -> tbl.getTableName().equalsIgnoreCase(getTableRequest.getTableName().getTableName()))
.findAny()
.orElseThrow(() -> new RuntimeException(String.format("Could not find a case-insensitive match for table name %s", getTableRequest.getTableName().getTableName())));
Expand All @@ -256,24 +256,24 @@ private Schema inferSchemaForTable(TableName tableName)
logger.info("doGetTable: Retrieving schema for table[{}] from TimeStream using describeQuery[{}].",
tableName, describeQuery);

QueryRequest queryRequest = new QueryRequest().withQueryString(describeQuery);
QueryRequest queryRequest = QueryRequest.builder().queryString(describeQuery).build();
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
do {
QueryResult queryResult = tsQuery.query(queryRequest);
for (Row next : queryResult.getRows()) {
List<Datum> datum = next.getData();
QueryResponse queryResult = tsQuery.query(queryRequest);
for (Row next : queryResult.rows()) {
List<Datum> datum = next.data();

if (datum.size() != 3) {
throw new RuntimeException("Unexpected datum size " + datum.size() +
" while getting schema from datum[" + datum.toString() + "]");
}

Field nextField = TimestreamSchemaUtils.makeField(datum.get(0).getScalarValue(), datum.get(1).getScalarValue());
Field nextField = TimestreamSchemaUtils.makeField(datum.get(0).scalarValue(), datum.get(1).scalarValue());
schemaBuilder.addField(nextField);
}
queryRequest = new QueryRequest().withNextToken(queryResult.getNextToken());
queryRequest = QueryRequest.builder().nextToken(queryResult.nextToken()).build();
}
while (queryRequest.getNextToken() != null);
while (queryRequest.nextToken() != null);

return schemaBuilder.build();
}
Expand All @@ -300,7 +300,7 @@ public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableReques
Schema schema = inferSchemaForTable(request.getTableName());
return new GetTableResponse(request.getCatalogName(), request.getTableName(), schema);
}
catch (com.amazonaws.services.timestreamquery.model.ValidationException ex) {
catch (software.amazon.awssdk.services.timestreamquery.model.ValidationException ex) {
logger.debug("Could not find table name matching {} in database {}. Falling back to case-insensitive lookup.", request.getTableName().getTableName(), request.getTableName().getSchemaName());

TableName resolvedTableName = findTableNameIgnoringCase(blockAllocator, request);
Expand All @@ -319,13 +319,13 @@ public GetTableResponse doGetQueryPassthroughSchema(BlockAllocator allocator, Ge

queryPassthrough.verify(request.getQueryPassthroughArguments());
String customerPassedQuery = request.getQueryPassthroughArguments().get(TimestreamQueryPassthrough.QUERY);
QueryRequest queryRequest = new QueryRequest().withQueryString(customerPassedQuery).withMaxRows(1);
QueryRequest queryRequest = QueryRequest.builder().queryString(customerPassedQuery).maxRows(1).build();
// Timestream Query does not provide a way to conduct a dry run or retrieve metadata results without execution. Therefore, we need to "seek" at least once before obtaining metadata.
QueryResult queryResult = tsQuery.query(queryRequest);
List<ColumnInfo> columnInfo = queryResult.getColumnInfo();
QueryResponse queryResult = tsQuery.query(queryRequest);
List<ColumnInfo> columnInfo = queryResult.columnInfo();
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
for (ColumnInfo column : columnInfo) {
Field nextField = TimestreamSchemaUtils.makeField(column.getName(), column.getType().getScalarType().toLowerCase());
Field nextField = TimestreamSchemaUtils.makeField(column.name(), column.type().scalarTypeAsString().toLowerCase());
schemaBuilder.addField(nextField);
}

Expand Down
Loading

0 comments on commit b70cbd5

Please sign in to comment.