Skip to content

Commit

Permalink
migrate awslogs to cloudwatchlogs (v1 to v2)
Browse files Browse the repository at this point in the history
  • Loading branch information
aimethed committed Sep 18, 2024
1 parent b70cbd5 commit 60a8bbf
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 223 deletions.
26 changes: 23 additions & 3 deletions athena-cloudwatch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,35 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-logs</artifactId>
<version>${aws-sdk.version}</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatchlogs</artifactId>
<version>2.28.2</version>
<exclusions>
<!-- replaced with jcl-over-slf4j -->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatch</artifactId>
<version>${aws-sdk-v2.version}</version>
<exclusions>
<!-- replaced with jcl-over-slf4j -->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package com.amazonaws.athena.connectors.cloudwatch;

import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
import com.amazonaws.services.logs.model.AWSLogsException;
import com.amazonaws.services.logs.model.LimitExceededException;
import software.amazon.awssdk.services.cloudwatch.model.LimitExceededException;
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;

/**
* Used to identify Exceptions that are related to Cloudwatch Logs throttling events.
Expand All @@ -36,7 +36,7 @@ private CloudwatchExceptionFilter() {}
@Override
public boolean isMatch(Exception ex)
{
if (ex instanceof AWSLogsException && ex.getMessage().startsWith("Rate exceeded")) {
if (ex instanceof CloudWatchLogsException && ex.getMessage().startsWith("Rate exceeded")) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connectors.cloudwatch.qpt.CloudwatchQueryPassthrough;
import com.amazonaws.services.logs.AWSLogs;
import com.amazonaws.services.logs.AWSLogsClientBuilder;
import com.amazonaws.services.logs.model.DescribeLogGroupsRequest;
import com.amazonaws.services.logs.model.DescribeLogGroupsResult;
import com.amazonaws.services.logs.model.DescribeLogStreamsRequest;
import com.amazonaws.services.logs.model.DescribeLogStreamsResult;
import com.amazonaws.services.logs.model.GetQueryResultsResult;
import com.amazonaws.services.logs.model.LogStream;
import com.amazonaws.services.logs.model.ResultField;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.complex.reader.FieldReader;
Expand All @@ -61,6 +52,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetQueryResultsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.ArrayList;
Expand Down Expand Up @@ -123,22 +122,22 @@ public class CloudwatchMetadataHandler
.build();
}

private final AWSLogs awsLogs;
private final CloudWatchLogsClient awsLogs;
private final ThrottlingInvoker invoker;
private final CloudwatchTableResolver tableResolver;
private final CloudwatchQueryPassthrough queryPassthrough = new CloudwatchQueryPassthrough();

public CloudwatchMetadataHandler(java.util.Map<String, String> configOptions)
{
super(SOURCE_TYPE, configOptions);
this.awsLogs = AWSLogsClientBuilder.standard().build();
this.awsLogs = CloudWatchLogsClient.create();
this.invoker = ThrottlingInvoker.newDefaultBuilder(EXCEPTION_FILTER, configOptions).build();
this.tableResolver = new CloudwatchTableResolver(this.invoker, awsLogs, MAX_RESULTS, MAX_RESULTS);
this.tableResolver = new CloudwatchTableResolver(this.invoker, awsLogs, MAX_RESULTS, MAX_RESULTS);
}

@VisibleForTesting
protected CloudwatchMetadataHandler(
AWSLogs awsLogs,
CloudWatchLogsClient awsLogs,
EncryptionKeyFactory keyFactory,
SecretsManagerClient secretsManager,
AthenaClient athena,
Expand All @@ -161,19 +160,19 @@ protected CloudwatchMetadataHandler(
public ListSchemasResponse doListSchemaNames(BlockAllocator blockAllocator, ListSchemasRequest listSchemasRequest)
throws TimeoutException
{
DescribeLogGroupsRequest request = new DescribeLogGroupsRequest();
DescribeLogGroupsResult result;
DescribeLogGroupsRequest.Builder requestBuilder = DescribeLogGroupsRequest.builder();
DescribeLogGroupsResponse response;
List<String> schemas = new ArrayList<>();
do {
if (schemas.size() > MAX_RESULTS) {
throw new RuntimeException("Too many log groups, exceeded max metadata results for schema count.");
}
result = invoker.invoke(() -> awsLogs.describeLogGroups(request));
result.getLogGroups().forEach(next -> schemas.add(next.getLogGroupName()));
request.setNextToken(result.getNextToken());
logger.info("doListSchemaNames: Listing log groups {} {}", result.getNextToken(), schemas.size());
response = invoker.invoke(() -> awsLogs.describeLogGroups(requestBuilder.build()));
response.logGroups().forEach(next -> schemas.add(next.logGroupName()));
requestBuilder.nextToken(response.nextToken());
logger.info("doListSchemaNames: Listing log groups {} {}", response.nextToken(), schemas.size());
}
while (result.getNextToken() != null);
while (response.nextToken() != null);

return new ListSchemasResponse(listSchemasRequest.getCatalogName(), schemas);
}
Expand All @@ -189,28 +188,28 @@ public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTables
{
String nextToken = null;
String logGroupName = tableResolver.validateSchema(listTablesRequest.getSchemaName());
DescribeLogStreamsRequest request = new DescribeLogStreamsRequest(logGroupName);
DescribeLogStreamsResult result;
DescribeLogStreamsRequest.Builder requestBuilder = DescribeLogStreamsRequest.builder().logGroupName(logGroupName);
DescribeLogStreamsResponse response;
List<TableName> tables = new ArrayList<>();
if (listTablesRequest.getPageSize() == UNLIMITED_PAGE_SIZE_VALUE) {
do {
if (tables.size() > MAX_RESULTS) {
throw new RuntimeException("Too many log streams, exceeded max metadata results for table count.");
}
result = invoker.invoke(() -> awsLogs.describeLogStreams(request));
result.getLogStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
request.setNextToken(result.getNextToken());
logger.info("doListTables: Listing log streams with token {} and size {}", result.getNextToken(), tables.size());
response = invoker.invoke(() -> awsLogs.describeLogStreams(requestBuilder.build()));
response.logStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
requestBuilder.nextToken(response.nextToken());
logger.info("doListTables: Listing log streams with token {} and size {}", response.nextToken(), tables.size());
}
while (result.getNextToken() != null);
while (response.nextToken() != null);
}
else {
request.setNextToken(listTablesRequest.getNextToken());
request.setLimit(listTablesRequest.getPageSize());
result = invoker.invoke(() -> awsLogs.describeLogStreams(request));
result.getLogStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
nextToken = result.getNextToken();
logger.info("doListTables: Listing log streams with token {} and size {}", result.getNextToken(), tables.size());
requestBuilder.nextToken(listTablesRequest.getNextToken());
requestBuilder.limit(listTablesRequest.getPageSize());
response = invoker.invoke(() -> awsLogs.describeLogStreams(requestBuilder.build()));
response.logStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
nextToken = response.nextToken();
logger.info("doListTables: Listing log streams with token {} and size {}", response.nextToken(), tables.size());
}

// Don't add the ALL_LOG_STREAMS_TABLE unless we're at the end of listing out all the tables.
Expand Down Expand Up @@ -276,26 +275,26 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request

CloudwatchTableName cwTableName = tableResolver.validateTable(request.getTableName());

DescribeLogStreamsRequest cwRequest = new DescribeLogStreamsRequest(cwTableName.getLogGroupName());
DescribeLogStreamsRequest.Builder cwRequestBuilder = DescribeLogStreamsRequest.builder().logGroupName(cwTableName.getLogGroupName());
if (!ALL_LOG_STREAMS_TABLE.equals(cwTableName.getLogStreamName())) {
cwRequest.setLogStreamNamePrefix(cwTableName.getLogStreamName());
cwRequestBuilder.logStreamNamePrefix(cwTableName.getLogStreamName());
}

DescribeLogStreamsResult result;
DescribeLogStreamsResponse response;
do {
result = invoker.invoke(() -> awsLogs.describeLogStreams(cwRequest));
for (LogStream next : result.getLogStreams()) {
response = invoker.invoke(() -> awsLogs.describeLogStreams(cwRequestBuilder.build()));
for (LogStream next : response.logStreams()) {
//Each log stream that matches any possible partition pruning should be added to the partition list.
blockWriter.writeRows((Block block, int rowNum) -> {
boolean matched = block.setValue(LOG_GROUP_FIELD, rowNum, cwRequest.getLogGroupName());
matched &= block.setValue(LOG_STREAM_FIELD, rowNum, next.getLogStreamName());
matched &= block.setValue(LOG_STREAM_SIZE_FIELD, rowNum, next.getStoredBytes());
boolean matched = block.setValue(LOG_GROUP_FIELD, rowNum, cwRequestBuilder.build().logGroupName());
matched &= block.setValue(LOG_STREAM_FIELD, rowNum, next.logStreamName());
matched &= block.setValue(LOG_STREAM_SIZE_FIELD, rowNum, next.storedBytes());
return matched ? 1 : 0;
});
}
cwRequest.setNextToken(result.getNextToken());
cwRequestBuilder.nextToken(response.nextToken());
}
while (result.getNextToken() != null && queryStatusChecker.isQueryRunning());
while (response.nextToken() != null && queryStatusChecker.isQueryRunning());
}

/**
Expand Down Expand Up @@ -367,11 +366,11 @@ public GetTableResponse doGetQueryPassthroughSchema(BlockAllocator allocator, Ge
throw new IllegalArgumentException("No Query passed through [{}]" + request);
}
// to get column names with limit 1
GetQueryResultsResult getQueryResultsResult = getResult(invoker, awsLogs, request.getQueryPassthroughArguments(), 1);
GetQueryResultsResponse getQueryResultsResponse = getResult(invoker, awsLogs, request.getQueryPassthroughArguments(), 1);
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
if (!getQueryResultsResult.getResults().isEmpty()) {
for (ResultField field : getQueryResultsResult.getResults().get(0)) {
schemaBuilder.addField(field.getField(), Types.MinorType.VARCHAR.getType());
if (!getQueryResultsResponse.results().isEmpty()) {
for (ResultField field : getQueryResultsResponse.results().get(0)) {
schemaBuilder.addField(field.field(), Types.MinorType.VARCHAR.getType());
}
}

Expand Down Expand Up @@ -415,6 +414,6 @@ private String encodeContinuationToken(int partition)
*/
private TableName toTableName(ListTablesRequest request, LogStream logStream)
{
return new TableName(request.getSchemaName(), logStream.getLogStreamName());
return new TableName(request.getSchemaName(), logStream.logStreamName());
}
}
Loading

0 comments on commit 60a8bbf

Please sign in to comment.