Skip to content

Commit

Permalink
[FEATURE] Add Support for querying Avro/Protobuf Data in athena-msk C…
Browse files Browse the repository at this point in the history
…onnector (#2075)
  • Loading branch information
Jithendar12 committed Aug 7, 2024
1 parent 1d405ca commit 5752309
Show file tree
Hide file tree
Showing 16 changed files with 893 additions and 178 deletions.
105 changes: 103 additions & 2 deletions athena-msk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,63 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>1.9.10</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-common</artifactId>
<version>1.9.10</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>1.9.10</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
<version>1.9.10</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
<version>4.9.0</version>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema-jvm</artifactId>
<version>4.9.0</version>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-runtime-jvm</artifactId>
<version>4.9.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-compiler</artifactId>
<version>4.9.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.12.660</version>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<!-- Only use the simple logger for testing so that we can see the output -->
Expand All @@ -26,6 +83,46 @@
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.4</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.4</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -82,12 +179,10 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.2.0</version>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -206,4 +301,10 @@
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class AmazonMskConstants

public static final int MAX_RECORDS_IN_SPLIT = 10_000;

public static final String AVRO_DATA_FORMAT = "avro";
public static final String PROTOBUF_DATA_FORMAT = "protobuf";

private AmazonMskConstants()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
import com.amazonaws.athena.connector.util.PaginatedRequestIterator;
import com.amazonaws.athena.connectors.msk.dto.AvroTopicSchema;
import com.amazonaws.athena.connectors.msk.dto.SplitParameters;
import com.amazonaws.athena.connectors.msk.dto.TopicPartitionPiece;
import com.amazonaws.athena.connectors.msk.dto.TopicSchema;
Expand All @@ -50,6 +51,8 @@
import com.amazonaws.services.glue.model.RegistryId;
import com.amazonaws.services.glue.model.RegistryListItem;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -68,7 +71,9 @@
import java.util.stream.Stream;

import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
import static com.amazonaws.athena.connectors.msk.AmazonMskConstants.AVRO_DATA_FORMAT;
import static com.amazonaws.athena.connectors.msk.AmazonMskConstants.MAX_RECORDS_IN_SPLIT;
import static com.amazonaws.athena.connectors.msk.AmazonMskConstants.PROTOBUF_DATA_FORMAT;

public class AmazonMskMetadataHandler extends MetadataHandler
{
Expand Down Expand Up @@ -256,7 +261,7 @@ private String findGlueSchemaNameIgnoringCasing(String glueRegistryNameIn, Strin
public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableRequest getTableRequest) throws Exception
{
LOGGER.info("doGetTable request: {}", getTableRequest);
Schema tableSchema = null;
Schema tableSchema;
try {
tableSchema = getSchema(getTableRequest.getTableName().getSchemaName(), getTableRequest.getTableName().getTableName());
}
Expand Down Expand Up @@ -323,9 +328,17 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest
// returning a single partition.
String glueRegistryName = request.getTableName().getSchemaName();
String glueSchemaName = request.getTableName().getTableName();
String topic;
GlueRegistryReader registryReader = new GlueRegistryReader();
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);
String topic = topicSchema.getTopicName();
String dataFormat = registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName);
if (dataFormat.equalsIgnoreCase(AVRO_DATA_FORMAT) || dataFormat.equalsIgnoreCase(PROTOBUF_DATA_FORMAT)) {
//if schema type is avro/protobuf, then topic name should be glue schema name
topic = glueSchemaName;
}
else {
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);
topic = topicSchema.getTopicName();
}

LOGGER.info("Retrieved topicName: {}", topic);

Expand Down Expand Up @@ -417,31 +430,70 @@ private Schema getSchema(String glueRegistryName, String glueSchemaName) throws
{
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();

// Get topic schema json from GLue registry as translated to TopicSchema pojo
GlueRegistryReader registryReader = new GlueRegistryReader();
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);

// Creating ArrowType for each fields in the topic schema.
// Also putting the additional column level information
// into the metadata in ArrowType field.
topicSchema.getMessage().getFields().forEach(it -> {
FieldType fieldType = new FieldType(
true,
AmazonMskUtils.toArrowType(it.getType()),
null,
com.google.common.collect.ImmutableMap.of(
"mapping", it.getMapping(),
"formatHint", it.getFormatHint(),
"type", it.getType()
)
);
Field field = new Field(it.getName(), fieldType, null);
schemaBuilder.addField(field);
});

// Putting the additional schema level information into the metadata in ArrowType schema.
schemaBuilder.addMetadata("dataFormat", topicSchema.getMessage().getDataFormat());
String dataFormat = registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName);
if (dataFormat.equalsIgnoreCase(AVRO_DATA_FORMAT)) {
// Get avro topic schema json from Glue registry as translated to AvroTopicSchema pojo
AvroTopicSchema avroTopicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, AvroTopicSchema.class);
// Creating ArrowType for each field in the avro topic schema.
avroTopicSchema.getFields().forEach(it -> {
FieldType fieldType = new FieldType(
true,
AmazonMskUtils.toArrowType(it.getType()),
null,
com.google.common.collect.ImmutableMap.of(
"name", it.getName(),
"formatHint", it.getFormatHint(),
"type", it.getType()
)
);
Field field = new Field(it.getName(), fieldType, null);
schemaBuilder.addField(field);
});
schemaBuilder.addMetadata("dataFormat", AVRO_DATA_FORMAT);
}
else if (dataFormat.equalsIgnoreCase(PROTOBUF_DATA_FORMAT)) {
// Get protobuf topic schema from Glue registry
String glueSchema = registryReader.getSchemaDef(glueRegistryName, glueSchemaName);
ProtobufSchema protobufSchema = new ProtobufSchema(glueSchema);
Descriptors.Descriptor descriptor = protobufSchema.toDescriptor();
// Creating ArrowType for each field in the protobuf topic schema.
for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) {
FieldType fieldType = new FieldType(
true,
AmazonMskUtils.toArrowType(fieldDescriptor.getType().toString()),
null
);
Field field = new Field(fieldDescriptor.getName(), fieldType, null);
schemaBuilder.addField(field);
}
schemaBuilder.addMetadata("dataFormat", PROTOBUF_DATA_FORMAT);
}
else {
// Get topic schema json from Glue registry as translated to TopicSchema pojo
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);

// Creating ArrowType for each field in the json/csv topic schema.
// Also putting the additional column level information
// into the metadata in ArrowType field.
topicSchema.getMessage().getFields().forEach(it -> {
FieldType fieldType = new FieldType(
true,
AmazonMskUtils.toArrowType(it.getType()),
null,
com.google.common.collect.ImmutableMap.of(
"mapping", it.getMapping(),
"formatHint", it.getFormatHint(),
"type", it.getType()
)
);
Field field = new Field(it.getName(), fieldType, null);
schemaBuilder.addField(field);
});

// Putting the additional schema level information into the metadata in ArrowType schema.
schemaBuilder.addMetadata("dataFormat", topicSchema.getMessage().getDataFormat());
}
// NOTE: these values are being shoved in here for usage later in the calling context
// of doGetTable() since Java doesn't have tuples.
schemaBuilder.addMetadata("glueRegistryName", glueRegistryName);
Expand Down
Loading

0 comments on commit 5752309

Please sign in to comment.