Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

package custom serde classes #149

Closed
jorgheymans opened this issue Nov 21, 2019 · 13 comments · Fixed by #472
Closed

package custom serde classes #149

jorgheymans opened this issue Nov 21, 2019 · 13 comments · Fixed by #472
Labels
enhancement New feature or request

Comments

@jorgheymans
Copy link
Contributor

We use protobuf as message format, and have custom serializers for this. Is the only way to include these serializers to build kafkahq from source and include our jar somehow in the gradle build ?

@tchiotludo tchiotludo added the enhancement New feature or request label Nov 21, 2019
@tchiotludo
Copy link
Owner

Hey @jorgheymans ,

It will need to code on KafkaHQ to handle this case, and will need some precision about the need.

How to you see this ?

  • Is this a custom configuration that will map a class with a topic ?
  • Is a full package with an interface with for example :
interface CustomDeser { 
  bool isCustomSerde(ConsumerRecord record);
  Object decode(ConsumerRecord record); 
}
* Any other way ? 

Feel free to give me some advice on this ! 

@jorgheymans
Copy link
Contributor Author

Thanks ! I was thinking this could be done on the level of the consumer because it has standard configuration and properties for this value.deserializer (http://kafka.apache.org/documentation/#consumerconfigs) . Ofcourse it could be more flexible per topic etc but then serde will end up in your problem space with interfaces to maintain, not sure you would want this. So maybe all that is needed is documentation how to easily package a custom jar ? Looking at the exec jar, all it's jars are expanded in package directories, that makes it slightly more difficult to add custom ones.

@jorgheymans
Copy link
Contributor Author

jorgheymans commented Nov 28, 2019

Looked into this a bit deeper

  • ByteArray serde is hardcoded currently in KafkaModule
    public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties properties) {
        Properties props = this.getConsumerProperties(clusterId);
        props.putAll(properties);

        return new KafkaConsumer<>(
            props
            new ByteArrayDeserializer(),
            new ByteArrayDeserializer()
        );
    }
  • so for controlling payload serde i had to remove the passed-in serializer so that the kafka consumer looks for key.deserializer and value.deserializer instead
  • org.kafkahq.models.Record contains similar assumptions, key and value should be Object rather and the avro serde made to work with this.

This, together with manually copying my required classes for proto serde into kafkahq.jar, makes that i have something working. But it's cumbersome.

So if maybe you would agree that in first phase, custom serde should be done via key and value serde properties of the Kafka Consumer i could clean up this part of my patch and propose a PR.

@jorgheymans
Copy link
Contributor Author

@tchiotludo any thoughts on this ?

@tchiotludo
Copy link
Owner

@jorgheymans
Will need to have a deeper look on that part.

If you want to use a custom deserializer value.deserializer config, it need from kafkahq part to remove deserializer from the whole codebase, since kafka ignore the config if I pass one:

            if (valueDeserializer == null) {
                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
                this.valueDeserializer.configure(config.originals(), false);
            } else {
                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                this.valueDeserializer = valueDeserializer;
            }

I don't know what is the impact for now, but I have doubt that can work.
Using Object will lead also to ugly cast on the whole code base.
I'm not really sure how to handle it right now.
First feeling is to allow to add a custom config on kafkahq (not on kafka config) that allow to implement this kind of class :

package org.kafkahq;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class ValueDeserializer implements Deserializer<byte[]> {

    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public byte[] deserialize(String topic, byte[] data) {
        return "{\"a\": 1, \"b\": 1}".getBytes();
    }

    @Override
    public byte[] deserialize(String topic, Headers headers, byte[] data) {
        return "{\"a\": 1, \"b\": 1}".getBytes();
    }

    @Override
    public void close() {

    }
}

Mostly the same than Kafka for that force to return byte[]

Not really for now.

@xakassi
Copy link
Contributor

xakassi commented Jun 4, 2020

Hi, @tchiotludo ! Our team also use Protobuf as message format, and we really like your AKHQ, it will be perfect to have a possibility to pass custom deserializer class (or .proto file as it's done in Kafdrop). Do you plan to add this feature?

@tchiotludo
Copy link
Owner

I think this feature is more than welcome, even more since confluent include support for Protobuf on their schema registry.

Looking quickly to Kafdrop and .proto files seems to be a better options than a custom jar packaging.

Maybe another try catch case here, looping from proto files to try deserialize could do the trick

private String convertToString(byte[] payload, Integer keySchemaId) {
if (payload == null) {
return null;
} else if (keySchemaId != null) {
try {
GenericRecord deserialize = (GenericRecord) kafkaAvroDeserializer.deserialize(topic, payload);
return deserialize.toString();
} catch (Exception exception) {
return new String(payload);
}
} else {
return new String(payload);
}
}

But to be honest, I not a pro with protobuf at all, never used personally.
PR from a guy who have better knowledge than me are clearly welcome on that.

@jorgheymans what do you think about that (simple proto / try catch deserializer) ?
I've the feeling it's not perfect but it will not break the application changing from byte[] to Object

@jorgheymans
Copy link
Contributor Author

We have a fork running inhouse that adds proto serde based on protobin descriptiors. I will check if we're allowed to clean it up and submit a PR.

@xakassi
Copy link
Contributor

xakassi commented Jun 9, 2020

Hi, @jorgheymans !
Any updates? Are you going to submit a PR? Or should I start an investigation and propose a PR?
Sorry for the rush, but we really need this functionality ASAP.

@xakassi
Copy link
Contributor

xakassi commented Jun 10, 2020

Some additional thoughts about this theme.
I think it will be nice to have a possibility to choose deserializer for each topic when you want to read the topic. But I'm not sure how it could be done beatiful on UI part. Because a topic is read immediatly when you click on magnifier icon. So maybe a pop up should appear when you click on a magnifier?

@tchiotludo
Copy link
Owner

Maybe a simply try catch can do the trick, if a custom serde, we try to deserialize with all of them.
Most not be a real cpu intensive task on 50 message per page.

ghost pushed a commit that referenced this issue Jun 22, 2020
* fix column topics

* fix topic link
@xakassi
Copy link
Contributor

xakassi commented Oct 15, 2020

Hi, @tchiotludo !
Seems no one working on it. I'm going to start an investigation, and I hope will propose a PR soon.

@tchiotludo
Copy link
Owner

thanks @xakassi 👍 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants