Skip to content

Commit

Permalink
Deserialize Protobuf messages using descriptor files
Browse files Browse the repository at this point in the history
  • Loading branch information
Taisiia Goltseva committed Oct 22, 2020
1 parent 3ebfd4b commit ac7009d
Show file tree
Hide file tree
Showing 21 changed files with 2,752 additions and 8 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,37 @@ These parameters are the default values used in the topic creation page.
* `akhq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the
buffer (default: 1000).

#### Protobuf deserialization

To deserialize topics containing data in Protobuf format, you can put descriptor files in `descriptors-folder`
and set topics mapping: for each `topic-regex` you can specify `descriptor-file` name (from `descriptors-folder`)
and corresponding message types for keys and values. If, for example, keys are not in Protobuf format,
`key-message-type` can be omitted, the same for `value-message-type`.

Example configuration can look like as follows:

```
akhq:
topic:
deserialization:
protobuf:
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file: "album.desc"
value-message-type: "Album"
- topic-regex: "film.*"
descriptor-file: "film.desc"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"
```

More examples about Protobuf deserialization can be found in [tests](./src/test/java/org/akhq/utils).
Info about descriptor files generation can be found in [test resources](./src/test/resources/protobuf_proto).


### Security
* `akhq.security.default-group`: Default group for all the user even unlogged user.
Expand Down
18 changes: 16 additions & 2 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ micronaut:

server:
context-path: "" # if behind a reverse proxy, path to akhq without trailing slash (optional). Example: akhq is
# behind a reverse proxy with url http://my-server/akhq, set base-path: "/akhq".
# Not needed if you're behind a reverse proxy with subdomain http://akhq.my-server/
# behind a reverse proxy with url http://my-server/akhq, set base-path: "/akhq".
# Not needed if you're behind a reverse proxy with subdomain http://akhq.my-server/
akhq:
server:
access-log: # Access log configuration (optional)
Expand Down Expand Up @@ -123,6 +123,20 @@ akhq:
- "^.*-changelog$"
- "^.*-repartition$"
- "^.*-rekey$"
deserialization:
protobuf:
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file: "album.desc"
value-message-type: "Album"
- topic-regex: "film.*"
descriptor-file: "film.desc"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"

# Topic display data options (optional)
topic-data:
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ dependencies {
// utils
implementation group: 'org.codehaus.httpcache4j.uribuilder', name: 'uribuilder', version: '2.0.0'

// protobuf
implementation group: "com.google.protobuf", name: "protobuf-java", version: "3.13.0"
implementation group: "com.google.protobuf", name: "protobuf-java-util", version: "3.13.0"

// Password hashing
implementation group: "org.mindrot", name: "jbcrypt", version: "0.4"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.akhq.configs;

import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Data;

import java.util.ArrayList;
import java.util.List;

@ConfigurationProperties("akhq.topic.deserialization.protobuf")
@Data
public class ProtobufDeserializationTopicsMapping {
private String descriptorsFolder;
private List<TopicsMapping> topicsMapping = new ArrayList<>();
}
11 changes: 11 additions & 0 deletions src/main/java/org/akhq/configs/TopicsMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.akhq.configs;

import lombok.Data;

@Data
public class TopicsMapping {
String topicRegex;
String descriptorFile;
String keyMessageType;
String valueMessageType;
}
22 changes: 16 additions & 6 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lombok.*;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ProtobufToJsonDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -34,6 +35,7 @@ public class Record {
private Map<String, String> headers = new HashMap<>();
@JsonIgnore
private KafkaAvroDeserializer kafkaAvroDeserializer;
private ProtobufToJsonDeserializer protobufToJsonDeserializer;

@Getter(AccessLevel.NONE)
private byte[] bytesKey;
Expand All @@ -59,7 +61,8 @@ public Record(RecordMetadata record, byte[] bytesKey, byte[] bytesValue, Map<Str
this.headers = headers;
}

public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer, byte[] bytesValue) {
public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue) {
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
Expand All @@ -69,16 +72,17 @@ public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafka
this.keySchemaId = getAvroSchemaId(this.bytesKey);
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
for (Header header: record.headers()) {
for (Header header : record.headers()) {
this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null);
}

this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
}

public String getKey() {
if (this.key == null) {
this.key = convertToString(bytesKey, keySchemaId);
this.key = convertToString(bytesKey, keySchemaId, true);
}

return this.key;
Expand All @@ -95,23 +99,29 @@ public String getKeyAsBase64() {

public String getValue() {
if (this.value == null) {
this.value = convertToString(bytesValue, valueSchemaId);
this.value = convertToString(bytesValue, valueSchemaId, false);
}

return this.value;
}

private String convertToString(byte[] payload, Integer keySchemaId) {
private String convertToString(byte[] payload, Integer schemaId, boolean isKey) {
if (payload == null) {
return null;
} else if (keySchemaId != null) {
} else if (schemaId != null) {
try {
GenericRecord record = (GenericRecord) kafkaAvroDeserializer.deserialize(topic, payload);
return AvroToJsonSerializer.toJson(record);
} catch (Exception exception) {
return new String(payload);
}
} else {
if (protobufToJsonDeserializer != null) {
String record = protobufToJsonDeserializer.deserialize(topic, payload, isKey);
if (record != null) {
return record;
}
}
return new String(payload);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.akhq.repositories;

import org.akhq.configs.ProtobufDeserializationTopicsMapping;
import org.akhq.utils.ProtobufToJsonDeserializer;

import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
public class CustomDeserializerRepository {
@Inject
private ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping;
private ProtobufToJsonDeserializer protobufToJsonDeserializer;

public ProtobufToJsonDeserializer getProtobufToJsonDeserializer() {
if (this.protobufToJsonDeserializer == null) {
this.protobufToJsonDeserializer = new ProtobufToJsonDeserializer(protobufDeserializationTopicsMapping);
}
return this.protobufToJsonDeserializer;
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private SchemaRegistryRepository schemaRegistryRepository;

@Inject
private CustomDeserializerRepository customDeserializerRepository;

@Inject
private AvroWireFormatConverter avroWireFormatConverter;

Expand Down Expand Up @@ -381,6 +384,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
return new Record(
record,
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
this.customDeserializerRepository.getProtobufToJsonDeserializer(),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId))
);
}
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/org/akhq/utils/ProtobufConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.akhq.utils;

public class ProtobufConfig {
private final String descriptorFile;
private final String keyMessageType;
private final String valueMessageType;

public ProtobufConfig(String descriptorFile, String keyMessageType, String valueMessageType) {
this.descriptorFile = descriptorFile;
this.keyMessageType = keyMessageType;
this.valueMessageType = valueMessageType;
}

public String getDescriptorFile() {
return descriptorFile;
}

public String getKeyMessageType() {
return keyMessageType;
}

public String getValueMessageType() {
return valueMessageType;
}
}
Loading

0 comments on commit ac7009d

Please sign in to comment.