diff --git a/athena-cloudwatch/pom.xml b/athena-cloudwatch/pom.xml
index bd2dad00d8..95a34d1d37 100644
--- a/athena-cloudwatch/pom.xml
+++ b/athena-cloudwatch/pom.xml
@@ -29,15 +29,35 @@
test
- com.amazonaws
- aws-java-sdk-logs
- ${aws-sdk.version}
+ software.amazon.awssdk
+ cloudwatchlogs
+ 2.28.2
+
+
+
+ commons-logging
+ commons-logging
+
+
+ software.amazon.awssdk
+ netty-nio-client
+
+
+
+
+ software.amazon.awssdk
+ cloudwatch
+ ${aws-sdk-v2.version}
commons-logging
commons-logging
+
+ software.amazon.awssdk
+ netty-nio-client
+
diff --git a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchExceptionFilter.java b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchExceptionFilter.java
index c71db552cf..093aeedd7e 100644
--- a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchExceptionFilter.java
+++ b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchExceptionFilter.java
@@ -20,8 +20,8 @@
package com.amazonaws.athena.connectors.cloudwatch;
import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
-import com.amazonaws.services.logs.model.AWSLogsException;
-import com.amazonaws.services.logs.model.LimitExceededException;
+import software.amazon.awssdk.services.cloudwatch.model.LimitExceededException;
+import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
/**
* Used to identify Exceptions that are related to Cloudwatch Logs throttling events.
@@ -36,7 +36,7 @@ private CloudwatchExceptionFilter() {}
@Override
public boolean isMatch(Exception ex)
{
- if (ex instanceof AWSLogsException && ex.getMessage().startsWith("Rate exceeded")) {
+ if (ex instanceof CloudWatchLogsException && ex.getMessage().startsWith("Rate exceeded")) {
return true;
}
diff --git a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandler.java b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandler.java
index e07c6f5422..e62ca50477 100644
--- a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandler.java
+++ b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandler.java
@@ -43,15 +43,6 @@
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connectors.cloudwatch.qpt.CloudwatchQueryPassthrough;
-import com.amazonaws.services.logs.AWSLogs;
-import com.amazonaws.services.logs.AWSLogsClientBuilder;
-import com.amazonaws.services.logs.model.DescribeLogGroupsRequest;
-import com.amazonaws.services.logs.model.DescribeLogGroupsResult;
-import com.amazonaws.services.logs.model.DescribeLogStreamsRequest;
-import com.amazonaws.services.logs.model.DescribeLogStreamsResult;
-import com.amazonaws.services.logs.model.GetQueryResultsResult;
-import com.amazonaws.services.logs.model.LogStream;
-import com.amazonaws.services.logs.model.ResultField;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.complex.reader.FieldReader;
@@ -61,6 +52,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.athena.AthenaClient;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetQueryResultsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
+import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
import java.util.ArrayList;
@@ -123,7 +122,7 @@ public class CloudwatchMetadataHandler
.build();
}
- private final AWSLogs awsLogs;
+ private final CloudWatchLogsClient awsLogs;
private final ThrottlingInvoker invoker;
private final CloudwatchTableResolver tableResolver;
private final CloudwatchQueryPassthrough queryPassthrough = new CloudwatchQueryPassthrough();
@@ -131,14 +130,14 @@ public class CloudwatchMetadataHandler
public CloudwatchMetadataHandler(java.util.Map configOptions)
{
super(SOURCE_TYPE, configOptions);
- this.awsLogs = AWSLogsClientBuilder.standard().build();
+ this.awsLogs = CloudWatchLogsClient.create();
this.invoker = ThrottlingInvoker.newDefaultBuilder(EXCEPTION_FILTER, configOptions).build();
- this.tableResolver = new CloudwatchTableResolver(this.invoker, awsLogs, MAX_RESULTS, MAX_RESULTS);
+ this.tableResolver = new CloudwatchTableResolver(this.invoker, awsLogs, MAX_RESULTS, MAX_RESULTS);
}
@VisibleForTesting
protected CloudwatchMetadataHandler(
- AWSLogs awsLogs,
+ CloudWatchLogsClient awsLogs,
EncryptionKeyFactory keyFactory,
SecretsManagerClient secretsManager,
AthenaClient athena,
@@ -161,19 +160,19 @@ protected CloudwatchMetadataHandler(
public ListSchemasResponse doListSchemaNames(BlockAllocator blockAllocator, ListSchemasRequest listSchemasRequest)
throws TimeoutException
{
- DescribeLogGroupsRequest request = new DescribeLogGroupsRequest();
- DescribeLogGroupsResult result;
+ DescribeLogGroupsRequest.Builder requestBuilder = DescribeLogGroupsRequest.builder();
+ DescribeLogGroupsResponse response;
List schemas = new ArrayList<>();
do {
if (schemas.size() > MAX_RESULTS) {
throw new RuntimeException("Too many log groups, exceeded max metadata results for schema count.");
}
- result = invoker.invoke(() -> awsLogs.describeLogGroups(request));
- result.getLogGroups().forEach(next -> schemas.add(next.getLogGroupName()));
- request.setNextToken(result.getNextToken());
- logger.info("doListSchemaNames: Listing log groups {} {}", result.getNextToken(), schemas.size());
+ response = invoker.invoke(() -> awsLogs.describeLogGroups(requestBuilder.build()));
+ response.logGroups().forEach(next -> schemas.add(next.logGroupName()));
+ requestBuilder.nextToken(response.nextToken());
+ logger.info("doListSchemaNames: Listing log groups {} {}", response.nextToken(), schemas.size());
}
- while (result.getNextToken() != null);
+ while (response.nextToken() != null);
return new ListSchemasResponse(listSchemasRequest.getCatalogName(), schemas);
}
@@ -189,28 +188,28 @@ public ListTablesResponse doListTables(BlockAllocator blockAllocator, ListTables
{
String nextToken = null;
String logGroupName = tableResolver.validateSchema(listTablesRequest.getSchemaName());
- DescribeLogStreamsRequest request = new DescribeLogStreamsRequest(logGroupName);
- DescribeLogStreamsResult result;
+ DescribeLogStreamsRequest.Builder requestBuilder = DescribeLogStreamsRequest.builder().logGroupName(logGroupName);
+ DescribeLogStreamsResponse response;
List tables = new ArrayList<>();
if (listTablesRequest.getPageSize() == UNLIMITED_PAGE_SIZE_VALUE) {
do {
if (tables.size() > MAX_RESULTS) {
throw new RuntimeException("Too many log streams, exceeded max metadata results for table count.");
}
- result = invoker.invoke(() -> awsLogs.describeLogStreams(request));
- result.getLogStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
- request.setNextToken(result.getNextToken());
- logger.info("doListTables: Listing log streams with token {} and size {}", result.getNextToken(), tables.size());
+ response = invoker.invoke(() -> awsLogs.describeLogStreams(requestBuilder.build()));
+ response.logStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
+ requestBuilder.nextToken(response.nextToken());
+ logger.info("doListTables: Listing log streams with token {} and size {}", response.nextToken(), tables.size());
}
- while (result.getNextToken() != null);
+ while (response.nextToken() != null);
}
else {
- request.setNextToken(listTablesRequest.getNextToken());
- request.setLimit(listTablesRequest.getPageSize());
- result = invoker.invoke(() -> awsLogs.describeLogStreams(request));
- result.getLogStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
- nextToken = result.getNextToken();
- logger.info("doListTables: Listing log streams with token {} and size {}", result.getNextToken(), tables.size());
+ requestBuilder.nextToken(listTablesRequest.getNextToken());
+ requestBuilder.limit(listTablesRequest.getPageSize());
+ response = invoker.invoke(() -> awsLogs.describeLogStreams(requestBuilder.build()));
+ response.logStreams().forEach(next -> tables.add(toTableName(listTablesRequest, next)));
+ nextToken = response.nextToken();
+ logger.info("doListTables: Listing log streams with token {} and size {}", response.nextToken(), tables.size());
}
// Don't add the ALL_LOG_STREAMS_TABLE unless we're at the end of listing out all the tables.
@@ -276,26 +275,26 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request
CloudwatchTableName cwTableName = tableResolver.validateTable(request.getTableName());
- DescribeLogStreamsRequest cwRequest = new DescribeLogStreamsRequest(cwTableName.getLogGroupName());
+ DescribeLogStreamsRequest.Builder cwRequestBuilder = DescribeLogStreamsRequest.builder().logGroupName(cwTableName.getLogGroupName());
if (!ALL_LOG_STREAMS_TABLE.equals(cwTableName.getLogStreamName())) {
- cwRequest.setLogStreamNamePrefix(cwTableName.getLogStreamName());
+ cwRequestBuilder.logStreamNamePrefix(cwTableName.getLogStreamName());
}
- DescribeLogStreamsResult result;
+ DescribeLogStreamsResponse response;
do {
- result = invoker.invoke(() -> awsLogs.describeLogStreams(cwRequest));
- for (LogStream next : result.getLogStreams()) {
+ response = invoker.invoke(() -> awsLogs.describeLogStreams(cwRequestBuilder.build()));
+ for (LogStream next : response.logStreams()) {
//Each log stream that matches any possible partition pruning should be added to the partition list.
blockWriter.writeRows((Block block, int rowNum) -> {
- boolean matched = block.setValue(LOG_GROUP_FIELD, rowNum, cwRequest.getLogGroupName());
- matched &= block.setValue(LOG_STREAM_FIELD, rowNum, next.getLogStreamName());
- matched &= block.setValue(LOG_STREAM_SIZE_FIELD, rowNum, next.getStoredBytes());
+ boolean matched = block.setValue(LOG_GROUP_FIELD, rowNum, cwRequestBuilder.build().logGroupName());
+ matched &= block.setValue(LOG_STREAM_FIELD, rowNum, next.logStreamName());
+ matched &= block.setValue(LOG_STREAM_SIZE_FIELD, rowNum, next.storedBytes());
return matched ? 1 : 0;
});
}
- cwRequest.setNextToken(result.getNextToken());
+ cwRequestBuilder.nextToken(response.nextToken());
}
- while (result.getNextToken() != null && queryStatusChecker.isQueryRunning());
+ while (response.nextToken() != null && queryStatusChecker.isQueryRunning());
}
/**
@@ -367,11 +366,11 @@ public GetTableResponse doGetQueryPassthroughSchema(BlockAllocator allocator, Ge
throw new IllegalArgumentException("No Query passed through [{}]" + request);
}
// to get column names with limit 1
- GetQueryResultsResult getQueryResultsResult = getResult(invoker, awsLogs, request.getQueryPassthroughArguments(), 1);
+ GetQueryResultsResponse getQueryResultsResponse = getResult(invoker, awsLogs, request.getQueryPassthroughArguments(), 1);
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
- if (!getQueryResultsResult.getResults().isEmpty()) {
- for (ResultField field : getQueryResultsResult.getResults().get(0)) {
- schemaBuilder.addField(field.getField(), Types.MinorType.VARCHAR.getType());
+ if (!getQueryResultsResponse.results().isEmpty()) {
+ for (ResultField field : getQueryResultsResponse.results().get(0)) {
+ schemaBuilder.addField(field.field(), Types.MinorType.VARCHAR.getType());
}
}
@@ -415,6 +414,6 @@ private String encodeContinuationToken(int partition)
*/
private TableName toTableName(ListTablesRequest request, LogStream logStream)
{
- return new TableName(request.getSchemaName(), logStream.getLogStreamName());
+ return new TableName(request.getSchemaName(), logStream.logStreamName());
}
}
diff --git a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandler.java b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandler.java
index 7b4aa47596..912b94d218 100644
--- a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandler.java
+++ b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandler.java
@@ -32,17 +32,16 @@
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.cloudwatch.qpt.CloudwatchQueryPassthrough;
-import com.amazonaws.services.logs.AWSLogs;
-import com.amazonaws.services.logs.AWSLogsClientBuilder;
-import com.amazonaws.services.logs.model.GetLogEventsRequest;
-import com.amazonaws.services.logs.model.GetLogEventsResult;
-import com.amazonaws.services.logs.model.GetQueryResultsResult;
-import com.amazonaws.services.logs.model.OutputLogEvent;
-import com.amazonaws.services.logs.model.ResultField;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.athena.AthenaClient;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetQueryResultsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent;
+import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
@@ -75,7 +74,7 @@ public class CloudwatchRecordHandler
//Used to handle Throttling events and apply AIMD congestion control
private final ThrottlingInvoker invoker;
private final AtomicLong count = new AtomicLong(0);
- private final AWSLogs awsLogs;
+ private final CloudWatchLogsClient awsLogs;
private final CloudwatchQueryPassthrough queryPassthrough = new CloudwatchQueryPassthrough();
public CloudwatchRecordHandler(java.util.Map configOptions)
@@ -84,12 +83,12 @@ public CloudwatchRecordHandler(java.util.Map configOptions)
S3Client.create(),
SecretsManagerClient.create(),
AthenaClient.create(),
- AWSLogsClientBuilder.defaultClient(),
+ CloudWatchLogsClient.create(),
configOptions);
}
@VisibleForTesting
- protected CloudwatchRecordHandler(S3Client amazonS3, SecretsManagerClient secretsManager, AthenaClient athena, AWSLogs awsLogs, java.util.Map configOptions)
+ protected CloudwatchRecordHandler(S3Client amazonS3, SecretsManagerClient secretsManager, AthenaClient athena, CloudWatchLogsClient awsLogs, java.util.Map configOptions)
{
super(amazonS3, secretsManager, athena, SOURCE_TYPE, configOptions);
this.awsLogs = awsLogs;
@@ -115,37 +114,38 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor
invoker.setBlockSpiller(spiller);
do {
final String actualContinuationToken = continuationToken;
- GetLogEventsResult logEventsResult = invoker.invoke(() -> awsLogs.getLogEvents(
+ GetLogEventsResponse logEventsResponse = invoker.invoke(() -> awsLogs.getLogEvents(
pushDownConstraints(recordsRequest.getConstraints(),
- new GetLogEventsRequest()
- .withLogGroupName(split.getProperty(LOG_GROUP_FIELD))
+ GetLogEventsRequest.builder()
+ .logGroupName(split.getProperty(LOG_GROUP_FIELD))
//We use the property instead of the table name because of the special all_streams table
- .withLogStreamName(split.getProperty(LOG_STREAM_FIELD))
- .withNextToken(actualContinuationToken)
+ .logStreamName(split.getProperty(LOG_STREAM_FIELD))
+ .nextToken(actualContinuationToken)
// must be set to use nextToken correctly
- .withStartFromHead(true)
+ .startFromHead(true)
+ .build()
)));
- if (continuationToken == null || !continuationToken.equals(logEventsResult.getNextForwardToken())) {
- continuationToken = logEventsResult.getNextForwardToken();
+ if (continuationToken == null || !continuationToken.equals(logEventsResponse.nextForwardToken())) {
+ continuationToken = logEventsResponse.nextForwardToken();
}
else {
continuationToken = null;
}
- for (OutputLogEvent ole : logEventsResult.getEvents()) {
+ for (OutputLogEvent ole : logEventsResponse.events()) {
spiller.writeRows((Block block, int rowNum) -> {
boolean matched = true;
matched &= block.offerValue(LOG_STREAM_FIELD, rowNum, split.getProperty(LOG_STREAM_FIELD));
- matched &= block.offerValue(LOG_TIME_FIELD, rowNum, ole.getTimestamp());
- matched &= block.offerValue(LOG_MSG_FIELD, rowNum, ole.getMessage());
+ matched &= block.offerValue(LOG_TIME_FIELD, rowNum, ole.timestamp());
+ matched &= block.offerValue(LOG_MSG_FIELD, rowNum, ole.message());
return matched ? 1 : 0;
});
}
logger.info("readWithConstraint: LogGroup[{}] LogStream[{}] Continuation[{}] rows[{}]",
tableName.getSchemaName(), tableName.getTableName(), continuationToken,
- logEventsResult.getEvents().size());
+ logEventsResponse.events().size());
}
while (continuationToken != null && queryStatusChecker.isQueryRunning());
}
@@ -155,13 +155,13 @@ private void getQueryPassthreoughResults(BlockSpiller spiller, ReadRecordsReques
{
Map qptArguments = recordsRequest.getConstraints().getQueryPassthroughArguments();
queryPassthrough.verify(qptArguments);
- GetQueryResultsResult getQueryResultsResult = getResult(invoker, awsLogs, qptArguments, Integer.parseInt(qptArguments.get(CloudwatchQueryPassthrough.LIMIT)));
+ GetQueryResultsResponse getQueryResultsResponse = getResult(invoker, awsLogs, qptArguments, Integer.parseInt(qptArguments.get(CloudwatchQueryPassthrough.LIMIT)));
- for (List resultList : getQueryResultsResult.getResults()) {
+ for (List resultList : getQueryResultsResponse.results()) {
spiller.writeRows((Block block, int rowNum) -> {
for (ResultField resultField : resultList) {
boolean matched = true;
- matched &= block.offerValue(resultField.getField(), rowNum, resultField.getValue());
+ matched &= block.offerValue(resultField.field(), rowNum, resultField.value());
if (!matched) {
return 0;
}
@@ -181,6 +181,7 @@ private void getQueryPassthreoughResults(BlockSpiller spiller, ReadRecordsReques
*/
private GetLogEventsRequest pushDownConstraints(Constraints constraints, GetLogEventsRequest request)
{
+ GetLogEventsRequest.Builder requestBuilder = request.toBuilder();
ValueSet timeConstraint = constraints.getSummary().get(LOG_TIME_FIELD);
if (timeConstraint instanceof SortedRangeSet && !timeConstraint.isNullAllowed()) {
//SortedRangeSet is how >, <, between is represented which are easiest and most common when
@@ -192,15 +193,15 @@ private GetLogEventsRequest pushDownConstraints(Constraints constraints, GetLogE
if (!basicPredicate.getLow().isNullValue()) {
Long lowerBound = (Long) basicPredicate.getLow().getValue();
- request.setStartTime(lowerBound);
+ requestBuilder.startTime(lowerBound);
}
if (!basicPredicate.getHigh().isNullValue()) {
Long upperBound = (Long) basicPredicate.getHigh().getValue();
- request.setEndTime(upperBound);
+ requestBuilder.endTime(upperBound);
}
}
- return request;
+ return requestBuilder.build();
}
}
diff --git a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchTableResolver.java b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchTableResolver.java
index 4c7f25ec7e..d4059b0438 100644
--- a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchTableResolver.java
+++ b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchTableResolver.java
@@ -21,18 +21,18 @@
import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
import com.amazonaws.athena.connector.lambda.domain.TableName;
-import com.amazonaws.services.logs.AWSLogs;
-import com.amazonaws.services.logs.model.DescribeLogGroupsRequest;
-import com.amazonaws.services.logs.model.DescribeLogGroupsResult;
-import com.amazonaws.services.logs.model.DescribeLogStreamsRequest;
-import com.amazonaws.services.logs.model.DescribeLogStreamsResult;
-import com.amazonaws.services.logs.model.LogGroup;
-import com.amazonaws.services.logs.model.LogStream;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogGroup;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -51,7 +51,7 @@ public class CloudwatchTableResolver
{
private static final Logger logger = LoggerFactory.getLogger(CloudwatchTableResolver.class);
- private AWSLogs awsLogs;
+ private CloudWatchLogsClient logsClient;
//Used to handle Throttling events using an AIMD strategy for congestion control.
private ThrottlingInvoker invoker;
//The LogStream pattern that is capitalized by LAMBDA
@@ -67,14 +67,14 @@ public class CloudwatchTableResolver
* Constructs an instance of the table resolver.
*
* @param invoker The ThrottlingInvoker to use to handle throttling events.
- * @param awsLogs The AWSLogs client to use for cache misses.
+ * @param logsClient The AWSLogs client to use for cache misses.
* @param maxSchemaCacheSize The max number of schemas to cache.
* @param maxTableCacheSize The max tables to cache.
*/
- public CloudwatchTableResolver(ThrottlingInvoker invoker, AWSLogs awsLogs, long maxSchemaCacheSize, long maxTableCacheSize)
+ public CloudwatchTableResolver(ThrottlingInvoker invoker, CloudWatchLogsClient logsClient, long maxSchemaCacheSize, long maxTableCacheSize)
{
this.invoker = invoker;
- this.awsLogs = awsLogs;
+ this.logsClient = logsClient;
this.tableCache = CacheBuilder.newBuilder()
.maximumSize(maxTableCacheSize)
.build(
@@ -119,12 +119,12 @@ private CloudwatchTableName loadLogStreams(String logGroup, String logStream)
logger.info("loadLogStreams: Did not find a match for the table, falling back to LogGroup scan for {}:{}",
logGroup, logStream);
- DescribeLogStreamsRequest validateTableRequest = new DescribeLogStreamsRequest(logGroup);
- DescribeLogStreamsResult validateTableResult;
+ DescribeLogStreamsRequest.Builder validateTableRequestBuilder = DescribeLogStreamsRequest.builder().logGroupName(logGroup);
+ DescribeLogStreamsResponse validateTableResponse;
do {
- validateTableResult = invoker.invoke(() -> awsLogs.describeLogStreams(validateTableRequest));
- for (LogStream nextStream : validateTableResult.getLogStreams()) {
- String logStreamName = nextStream.getLogStreamName();
+ validateTableResponse = invoker.invoke(() -> logsClient.describeLogStreams(validateTableRequestBuilder.build()));
+ for (LogStream nextStream : validateTableResponse.logStreams()) {
+ String logStreamName = nextStream.logStreamName();
CloudwatchTableName nextCloudwatch = new CloudwatchTableName(logGroup, logStreamName);
tableCache.put(nextCloudwatch.toTableName(), nextCloudwatch);
if (nextCloudwatch.getLogStreamName().equalsIgnoreCase(logStream)) {
@@ -134,9 +134,9 @@ private CloudwatchTableName loadLogStreams(String logGroup, String logStream)
return nextCloudwatch;
}
}
- validateTableRequest.setNextToken(validateTableResult.getNextToken());
+ validateTableRequestBuilder.nextToken(validateTableResponse.nextToken());
}
- while (validateTableResult.getNextToken() != null);
+ while (validateTableResponse.nextToken() != null);
//We could not find a match
throw new IllegalArgumentException("No such table " + logGroup + " " + logStream);
@@ -163,11 +163,11 @@ private CloudwatchTableName loadLogStream(String logGroup, String logStream)
LAMBDA_PATTERN, effectiveTableName);
effectiveTableName = effectiveTableName.replace(LAMBDA_PATTERN, LAMBDA_ACTUAL_PATTERN);
}
- DescribeLogStreamsRequest request = new DescribeLogStreamsRequest(logGroup)
- .withLogStreamNamePrefix(effectiveTableName);
- DescribeLogStreamsResult result = invoker.invoke(() -> awsLogs.describeLogStreams(request));
- for (LogStream nextStream : result.getLogStreams()) {
- String logStreamName = nextStream.getLogStreamName();
+ DescribeLogStreamsRequest request = DescribeLogStreamsRequest.builder().logGroupName(logGroup)
+ .logStreamNamePrefix(effectiveTableName).build();
+ DescribeLogStreamsResponse response = invoker.invoke(() -> logsClient.describeLogStreams(request));
+ for (LogStream nextStream : response.logStreams()) {
+ String logStreamName = nextStream.logStreamName();
CloudwatchTableName nextCloudwatch = new CloudwatchTableName(logGroup, logStreamName);
if (nextCloudwatch.getLogStreamName().equalsIgnoreCase(logStream)) {
logger.info("loadLogStream: Matched {} for {}:{}", nextCloudwatch, logGroup, logStream);
@@ -195,21 +195,21 @@ private String loadLogGroups(String schemaName)
}
logger.info("loadLogGroups: Did not find a match for the schema, falling back to LogGroup scan for {}", schemaName);
- DescribeLogGroupsRequest validateSchemaRequest = new DescribeLogGroupsRequest();
- DescribeLogGroupsResult validateSchemaResult;
+ DescribeLogGroupsRequest.Builder validateSchemaRequestBuilder = DescribeLogGroupsRequest.builder();
+ DescribeLogGroupsResponse validateSchemaResponse;
do {
- validateSchemaResult = invoker.invoke(() -> awsLogs.describeLogGroups(validateSchemaRequest));
- for (LogGroup next : validateSchemaResult.getLogGroups()) {
- String nextLogGroupName = next.getLogGroupName();
+ validateSchemaResponse = invoker.invoke(() -> logsClient.describeLogGroups(validateSchemaRequestBuilder.build()));
+ for (LogGroup next : validateSchemaResponse.logGroups()) {
+ String nextLogGroupName = next.logGroupName();
schemaCache.put(schemaName, nextLogGroupName);
if (nextLogGroupName.equalsIgnoreCase(schemaName)) {
logger.info("loadLogGroups: Matched {} for {}", nextLogGroupName, schemaName);
return nextLogGroupName;
}
}
- validateSchemaRequest.setNextToken(validateSchemaResult.getNextToken());
+ validateSchemaRequestBuilder.nextToken(validateSchemaResponse.nextToken());
}
- while (validateSchemaResult.getNextToken() != null);
+ while (validateSchemaResponse.nextToken() != null);
//We could not find a match
throw new IllegalArgumentException("No such schema " + schemaName);
@@ -224,10 +224,10 @@ private String loadLogGroups(String schemaName)
private String loadLogGroup(String schemaName)
throws TimeoutException
{
- DescribeLogGroupsRequest request = new DescribeLogGroupsRequest().withLogGroupNamePrefix(schemaName);
- DescribeLogGroupsResult result = invoker.invoke(() -> awsLogs.describeLogGroups(request));
- for (LogGroup next : result.getLogGroups()) {
- String nextLogGroupName = next.getLogGroupName();
+ DescribeLogGroupsRequest request = DescribeLogGroupsRequest.builder().logGroupNamePrefix(schemaName).build();
+ DescribeLogGroupsResponse response = invoker.invoke(() -> logsClient.describeLogGroups(request));
+ for (LogGroup next : response.logGroups()) {
+ String nextLogGroupName = next.logGroupName();
if (nextLogGroupName.equalsIgnoreCase(schemaName)) {
logger.info("loadLogGroup: Matched {} for {}", nextLogGroupName, schemaName);
return nextLogGroupName;
diff --git a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchUtils.java b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchUtils.java
index 5c19ec17ee..bb8a209d47 100644
--- a/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchUtils.java
+++ b/athena-cloudwatch/src/main/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchUtils.java
@@ -21,13 +21,14 @@
import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
import com.amazonaws.athena.connectors.cloudwatch.qpt.CloudwatchQueryPassthrough;
-import com.amazonaws.services.logs.AWSLogs;
-import com.amazonaws.services.logs.model.GetQueryResultsRequest;
-import com.amazonaws.services.logs.model.GetQueryResultsResult;
-import com.amazonaws.services.logs.model.StartQueryRequest;
-import com.amazonaws.services.logs.model.StartQueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetQueryResultsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetQueryResultsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.QueryStatus;
+import software.amazon.awssdk.services.cloudwatchlogs.model.StartQueryRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.StartQueryResponse;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
@@ -41,8 +42,8 @@ public final class CloudwatchUtils
private CloudwatchUtils() {}
public static StartQueryRequest startQueryRequest(Map qptArguments)
{
- return new StartQueryRequest().withEndTime(Long.valueOf(qptArguments.get(CloudwatchQueryPassthrough.ENDTIME))).withStartTime(Long.valueOf(qptArguments.get(CloudwatchQueryPassthrough.STARTTIME)))
- .withQueryString(qptArguments.get(CloudwatchQueryPassthrough.QUERYSTRING)).withLogGroupNames(getLogGroupNames(qptArguments));
+ return StartQueryRequest.builder().endTime(Long.valueOf(qptArguments.get(CloudwatchQueryPassthrough.ENDTIME))).startTime(Long.valueOf(qptArguments.get(CloudwatchQueryPassthrough.STARTTIME)))
+ .queryString(qptArguments.get(CloudwatchQueryPassthrough.QUERYSTRING)).logGroupNames(getLogGroupNames(qptArguments)).build();
}
private static String[] getLogGroupNames(Map qptArguments)
@@ -55,25 +56,25 @@ private static String[] getLogGroupNames(Map qptArguments)
return logGroupNames;
}
- public static StartQueryResult getQueryResult(AWSLogs awsLogs, StartQueryRequest startQueryRequest)
+ public static StartQueryResponse getQueryResult(CloudWatchLogsClient awsLogs, StartQueryRequest startQueryRequest)
{
return awsLogs.startQuery(startQueryRequest);
}
- public static GetQueryResultsResult getQueryResults(AWSLogs awsLogs, StartQueryResult startQueryResult)
+ public static GetQueryResultsResponse getQueryResults(CloudWatchLogsClient awsLogs, StartQueryResponse startQueryResponse)
{
- return awsLogs.getQueryResults(new GetQueryResultsRequest().withQueryId(startQueryResult.getQueryId()));
+ return awsLogs.getQueryResults(GetQueryResultsRequest.builder().queryId(startQueryResponse.queryId()).build());
}
- public static GetQueryResultsResult getResult(ThrottlingInvoker invoker, AWSLogs awsLogs, Map qptArguments, int limit) throws TimeoutException, InterruptedException
+ public static GetQueryResultsResponse getResult(ThrottlingInvoker invoker, CloudWatchLogsClient awsLogs, Map qptArguments, int limit) throws TimeoutException, InterruptedException
{
- StartQueryResult startQueryResult = invoker.invoke(() -> getQueryResult(awsLogs, startQueryRequest(qptArguments).withLimit(limit)));
- String status = null;
- GetQueryResultsResult getQueryResultsResult;
+ StartQueryResponse startQueryResponse = invoker.invoke(() -> getQueryResult(awsLogs, startQueryRequest(qptArguments).toBuilder().limit(limit).build()));
+ QueryStatus status = null;
+ GetQueryResultsResponse getQueryResultsResponse;
Instant startTime = Instant.now(); // Record the start time
do {
- getQueryResultsResult = invoker.invoke(() -> getQueryResults(awsLogs, startQueryResult));
- status = getQueryResultsResult.getStatus();
+ getQueryResultsResponse = invoker.invoke(() -> getQueryResults(awsLogs, startQueryResponse));
+ status = getQueryResultsResponse.status();
Thread.sleep(1000);
// Check if 10 minutes have passed
@@ -82,8 +83,8 @@ public static GetQueryResultsResult getResult(ThrottlingInvoker invoker, AWSLogs
if (elapsedMinutes >= RESULT_TIMEOUT) {
throw new RuntimeException("Query execution timeout exceeded.");
}
- } while (!status.equalsIgnoreCase("Complete"));
+ } while (!status.equals(QueryStatus.COMPLETE));
- return getQueryResultsResult;
+ return getQueryResultsResponse;
}
}
diff --git a/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandlerTest.java b/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandlerTest.java
index cc2ce27fb8..f615b3c7b1 100644
--- a/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandlerTest.java
+++ b/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchMetadataHandlerTest.java
@@ -43,13 +43,6 @@
import com.amazonaws.athena.connector.lambda.metadata.MetadataResponse;
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
-import com.amazonaws.services.logs.AWSLogs;
-import com.amazonaws.services.logs.model.DescribeLogGroupsRequest;
-import com.amazonaws.services.logs.model.DescribeLogGroupsResult;
-import com.amazonaws.services.logs.model.DescribeLogStreamsRequest;
-import com.amazonaws.services.logs.model.DescribeLogStreamsResult;
-import com.amazonaws.services.logs.model.LogGroup;
-import com.amazonaws.services.logs.model.LogStream;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -64,6 +57,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.athena.AthenaClient;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogGroup;
+import software.amazon.awssdk.services.cloudwatchlogs.model.LogStream;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
import java.util.ArrayList;
@@ -92,7 +92,7 @@ public class CloudwatchMetadataHandlerTest
private BlockAllocator allocator;
@Mock
- private AWSLogs mockAwsLogs;
+ private CloudWatchLogsClient mockAwsLogs;
@Mock
private SecretsManagerClient mockSecretsManager;
@@ -105,13 +105,19 @@ public void setUp()
throws Exception
{
Mockito.lenient().when(mockAwsLogs.describeLogStreams(nullable(DescribeLogStreamsRequest.class))).thenAnswer((InvocationOnMock invocationOnMock) -> {
- return new DescribeLogStreamsResult().withLogStreams(new LogStream().withLogStreamName("table-9"),
- new LogStream().withLogStreamName("table-10"));
+ return DescribeLogStreamsResponse.builder()
+ .logStreams(
+ LogStream.builder().logStreamName("table-9").build(),
+ LogStream.builder().logStreamName("table-10").build())
+ .build();
});
when(mockAwsLogs.describeLogGroups(nullable(DescribeLogGroupsRequest.class))).thenAnswer((InvocationOnMock invocationOnMock) -> {
- return new DescribeLogGroupsResult().withLogGroups(new LogGroup().withLogGroupName("schema-1"),
- new LogGroup().withLogGroupName("schema-20"));
+ return DescribeLogGroupsResponse.builder()
+ .logGroups(
+ LogGroup.builder().logGroupName("schema-1").build(),
+ LogGroup.builder().logGroupName("schema-20").build())
+ .build();
});
handler = new CloudwatchMetadataHandler(mockAwsLogs, new LocalKeyFactory(), mockSecretsManager, mockAthena, "spillBucket", "spillPrefix", com.google.common.collect.ImmutableMap.of());
allocator = new BlockAllocatorImpl();
@@ -133,34 +139,33 @@ public void doListSchemaNames()
when(mockAwsLogs.describeLogGroups(nullable(DescribeLogGroupsRequest.class))).thenAnswer((InvocationOnMock invocationOnMock) -> {
DescribeLogGroupsRequest request = (DescribeLogGroupsRequest) invocationOnMock.getArguments()[0];
- DescribeLogGroupsResult result = new DescribeLogGroupsResult();
+ DescribeLogGroupsResponse.Builder responseBuilder = DescribeLogGroupsResponse.builder();
Integer nextToken;
- if (request.getNextToken() == null) {
+ if (request.nextToken() == null) {
nextToken = 1;
}
- else if (Integer.valueOf(request.getNextToken()) < 3) {
- nextToken = Integer.valueOf(request.getNextToken()) + 1;
+ else if (Integer.valueOf(request.nextToken()) < 3) {
+ nextToken = Integer.valueOf(request.nextToken()) + 1;
}
else {
nextToken = null;
}
List logGroups = new ArrayList<>();
- if (request.getNextToken() == null || Integer.valueOf(request.getNextToken()) < 3) {
+ if (request.nextToken() == null || Integer.valueOf(request.nextToken()) < 3) {
for (int i = 0; i < 10; i++) {
- LogGroup nextLogGroup = new LogGroup();
- nextLogGroup.setLogGroupName("schema-" + String.valueOf(i));
+ LogGroup nextLogGroup = LogGroup.builder().logGroupName("schema-" + String.valueOf(i)).build();
logGroups.add(nextLogGroup);
}
}
- result.withLogGroups(logGroups);
+ responseBuilder.logGroups(logGroups);
if (nextToken != null) {
- result.setNextToken(String.valueOf(nextToken));
+ responseBuilder.nextToken(String.valueOf(nextToken));
}
- return result;
+ return responseBuilder.build();
});
ListSchemasRequest req = new ListSchemasRequest(identity, "queryId", "default");
@@ -183,34 +188,33 @@ public void doListTables()
when(mockAwsLogs.describeLogStreams(nullable(DescribeLogStreamsRequest.class))).thenAnswer((InvocationOnMock invocationOnMock) -> {
DescribeLogStreamsRequest request = (DescribeLogStreamsRequest) invocationOnMock.getArguments()[0];
- DescribeLogStreamsResult result = new DescribeLogStreamsResult();
+ DescribeLogStreamsResponse.Builder responseBuilder = DescribeLogStreamsResponse.builder();
Integer nextToken;
- if (request.getNextToken() == null) {
+ if (request.nextToken() == null) {
nextToken = 1;
}
- else if (Integer.valueOf(request.getNextToken()) < 3) {
- nextToken = Integer.valueOf(request.getNextToken()) + 1;
+ else if (Integer.valueOf(request.nextToken()) < 3) {
+ nextToken = Integer.valueOf(request.nextToken()) + 1;
}
else {
nextToken = null;
}
List logStreams = new ArrayList<>();
- if (request.getNextToken() == null || Integer.valueOf(request.getNextToken()) < 3) {
+ if (request.nextToken() == null || Integer.valueOf(request.nextToken()) < 3) {
for (int i = 0; i < 10; i++) {
- LogStream nextLogStream = new LogStream();
- nextLogStream.setLogStreamName("table-" + String.valueOf(i));
+ LogStream nextLogStream = LogStream.builder().logStreamName("table-" + String.valueOf(i)).build();
logStreams.add(nextLogStream);
}
}
- result.withLogStreams(logStreams);
+ responseBuilder.logStreams(logStreams);
if (nextToken != null) {
- result.setNextToken(String.valueOf(nextToken));
+ responseBuilder.nextToken(String.valueOf(nextToken));
}
- return result;
+ return responseBuilder.build();
});
ListTablesRequest req = new ListTablesRequest(identity, "queryId", "default",
@@ -238,35 +242,34 @@ public void doGetTable()
when(mockAwsLogs.describeLogStreams(nullable(DescribeLogStreamsRequest.class))).thenAnswer((InvocationOnMock invocationOnMock) -> {
DescribeLogStreamsRequest request = (DescribeLogStreamsRequest) invocationOnMock.getArguments()[0];
- assertTrue(request.getLogGroupName().equals(expectedSchema));
- DescribeLogStreamsResult result = new DescribeLogStreamsResult();
+ assertTrue(request.logGroupName().equals(expectedSchema));
+ DescribeLogStreamsResponse.Builder responseBuilder = DescribeLogStreamsResponse.builder();
Integer nextToken;
- if (request.getNextToken() == null) {
+ if (request.nextToken() == null) {
nextToken = 1;
}
- else if (Integer.valueOf(request.getNextToken()) < 3) {
- nextToken = Integer.valueOf(request.getNextToken()) + 1;
+ else if (Integer.valueOf(request.nextToken()) < 3) {
+ nextToken = Integer.valueOf(request.nextToken()) + 1;
}
else {
nextToken = null;
}
List logStreams = new ArrayList<>();
- if (request.getNextToken() == null || Integer.valueOf(request.getNextToken()) < 3) {
+ if (request.nextToken() == null || Integer.valueOf(request.nextToken()) < 3) {
for (int i = 0; i < 10; i++) {
- LogStream nextLogStream = new LogStream();
- nextLogStream.setLogStreamName("table-" + String.valueOf(i));
+ LogStream nextLogStream = LogStream.builder().logStreamName("table-" + String.valueOf(i)).build();
logStreams.add(nextLogStream);
}
}
- result.withLogStreams(logStreams);
+ responseBuilder.logStreams(logStreams);
if (nextToken != null) {
- result.setNextToken(String.valueOf(nextToken));
+ responseBuilder.nextToken(String.valueOf(nextToken));
}
- return result;
+ return responseBuilder.build();
});
GetTableRequest req = new GetTableRequest(identity, "queryId", "default", new TableName(expectedSchema, "table-9"), Collections.emptyMap());
@@ -290,36 +293,37 @@ public void doGetTableLayout()
when(mockAwsLogs.describeLogStreams(nullable(DescribeLogStreamsRequest.class))).thenAnswer((InvocationOnMock invocationOnMock) -> {
DescribeLogStreamsRequest request = (DescribeLogStreamsRequest) invocationOnMock.getArguments()[0];
- DescribeLogStreamsResult result = new DescribeLogStreamsResult();
+ DescribeLogStreamsResponse.Builder responseBuilder = DescribeLogStreamsResponse.builder();
Integer nextToken;
- if (request.getNextToken() == null) {
+ if (request.nextToken() == null) {
nextToken = 1;
}
- else if (Integer.valueOf(request.getNextToken()) < 3) {
- nextToken = Integer.valueOf(request.getNextToken()) + 1;
+ else if (Integer.valueOf(request.nextToken()) < 3) {
+ nextToken = Integer.valueOf(request.nextToken()) + 1;
}
else {
nextToken = null;
}
List logStreams = new ArrayList<>();
- if (request.getNextToken() == null || Integer.valueOf(request.getNextToken()) < 3) {
- int continuation = request.getNextToken() == null ? 0 : Integer.valueOf(request.getNextToken());
+ if (request.nextToken() == null || Integer.valueOf(request.nextToken()) < 3) {
+ int continuation = request.nextToken() == null ? 0 : Integer.valueOf(request.nextToken());
for (int i = 0 + continuation * 100; i < 300; i++) {
- LogStream nextLogStream = new LogStream();
- nextLogStream.setLogStreamName("table-" + String.valueOf(i));
- nextLogStream.setStoredBytes(i * 1000L);
+ LogStream nextLogStream = LogStream.builder()
+ .logStreamName("table-" + String.valueOf(i))
+ .storedBytes(i * 1000L)
+ .build();
logStreams.add(nextLogStream);
}
}
- result.withLogStreams(logStreams);
+ responseBuilder.logStreams(logStreams);
if (nextToken != null) {
- result.setNextToken(String.valueOf(nextToken));
+ responseBuilder.nextToken(String.valueOf(nextToken));
}
- return result;
+ return responseBuilder.build();
});
Map constraintsMap = new HashMap<>();
diff --git a/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandlerTest.java b/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandlerTest.java
index 758deacb50..f8b95fdafc 100644
--- a/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandlerTest.java
+++ b/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/CloudwatchRecordHandlerTest.java
@@ -39,10 +39,6 @@
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
-import com.amazonaws.services.logs.AWSLogs;
-import com.amazonaws.services.logs.model.GetLogEventsRequest;
-import com.amazonaws.services.logs.model.GetLogEventsResult;
-import com.amazonaws.services.logs.model.OutputLogEvent;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import org.apache.arrow.vector.types.Types;
@@ -59,6 +55,10 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.athena.AthenaClient;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
+import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
+import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@@ -95,7 +95,7 @@ public class CloudwatchRecordHandlerTest
private EncryptionKeyFactory keyFactory = new LocalKeyFactory();
@Mock
- private AWSLogs mockAwsLogs;
+ private CloudWatchLogsClient mockAwsLogs;
@Mock
private S3Client mockS3;
@@ -144,39 +144,40 @@ public void setUp()
GetLogEventsRequest request = (GetLogEventsRequest) invocationOnMock.getArguments()[0];
//Check that predicate pushdown was propagated to cloudwatch
- assertNotNull(request.getStartTime());
- assertNotNull(request.getEndTime());
+ assertNotNull(request.startTime());
+ assertNotNull(request.endTime());
- GetLogEventsResult result = new GetLogEventsResult();
+ GetLogEventsResponse.Builder responseBuilder = GetLogEventsResponse.builder();
Integer nextToken;
- if (request.getNextToken() == null) {
+ if (request.nextToken() == null) {
nextToken = 1;
}
- else if (Integer.valueOf(request.getNextToken()) < 3) {
- nextToken = Integer.valueOf(request.getNextToken()) + 1;
+ else if (Integer.valueOf(request.nextToken()) < 3) {
+ nextToken = Integer.valueOf(request.nextToken()) + 1;
}
else {
nextToken = null;
}
List logEvents = new ArrayList<>();
- if (request.getNextToken() == null || Integer.valueOf(request.getNextToken()) < 3) {
- long continuation = request.getNextToken() == null ? 0 : Integer.valueOf(request.getNextToken());
+ if (request.nextToken() == null || Integer.valueOf(request.nextToken()) < 3) {
+ long continuation = request.nextToken() == null ? 0 : Integer.valueOf(request.nextToken());
for (int i = 0; i < 100_000; i++) {
- OutputLogEvent outputLogEvent = new OutputLogEvent();
- outputLogEvent.setMessage("message-" + (continuation * i));
- outputLogEvent.setTimestamp(i * 100L);
+ OutputLogEvent outputLogEvent = OutputLogEvent.builder()
+ .message("message-" + (continuation * i))
+ .timestamp(i * 100L)
+ .build();
logEvents.add(outputLogEvent);
}
}
- result.withEvents(logEvents);
+ responseBuilder.events(logEvents);
if (nextToken != null) {
- result.setNextForwardToken(String.valueOf(nextToken));
+ responseBuilder.nextForwardToken(String.valueOf(nextToken));
}
- return result;
+ return responseBuilder.build();
});
}
diff --git a/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/integ/CloudwatchIntegTest.java b/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/integ/CloudwatchIntegTest.java
index 9c2c9cd839..c9d1dd9f73 100644
--- a/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/integ/CloudwatchIntegTest.java
+++ b/athena-cloudwatch/src/test/java/com/amazonaws/athena/connectors/cloudwatch/integ/CloudwatchIntegTest.java
@@ -20,11 +20,6 @@
package com.amazonaws.athena.connectors.cloudwatch.integ;
import com.amazonaws.athena.connector.integ.IntegrationTestBase;
-import com.amazonaws.services.logs.AWSLogs;
-import com.amazonaws.services.logs.AWSLogsClientBuilder;
-import com.amazonaws.services.logs.model.DeleteLogGroupRequest;
-import com.amazonaws.services.logs.model.InputLogEvent;
-import com.amazonaws.services.logs.model.PutLogEventsRequest;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +33,9 @@
import software.amazon.awscdk.services.logs.LogGroup;
import software.amazon.awscdk.services.logs.LogStream;
import software.amazon.awssdk.services.athena.model.Row;
+import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
+import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
+import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
import java.util.ArrayList;
import java.util.List;
@@ -134,20 +132,21 @@ protected void setUpTableData()
logger.info("Setting up Log Group: {}, Log Stream: {}", logGroupName, logStreamName);
logger.info("----------------------------------------------------");
- AWSLogs logsClient = AWSLogsClientBuilder.defaultClient();
+ CloudWatchLogsClient logsClient = CloudWatchLogsClient.create();
try {
- logsClient.putLogEvents(new PutLogEventsRequest()
- .withLogGroupName(logGroupName)
- .withLogStreamName(logStreamName)
- .withLogEvents(
- new InputLogEvent().withTimestamp(currentTimeMillis).withMessage("Space, the final frontier."),
- new InputLogEvent().withTimestamp(fromTimeMillis).withMessage(logMessage),
- new InputLogEvent().withTimestamp(toTimeMillis + 5000)
- .withMessage("To boldly go where no man has gone before!")));
+ logsClient.putLogEvents(PutLogEventsRequest.builder()
+ .logGroupName(logGroupName)
+ .logStreamName(logStreamName)
+ .logEvents(
+ InputLogEvent.builder().timestamp(currentTimeMillis).message("Space, the final frontier.").build(),
+ InputLogEvent.builder().timestamp(fromTimeMillis).message(logMessage).build(),
+ InputLogEvent.builder().timestamp(toTimeMillis + 5000)
+ .message("To boldly go where no man has gone before!").build())
+ .build());
}
finally {
- logsClient.shutdown();
+ logsClient.close();
}
}