Skip to content

Commit

Permalink
Merge pull request #11 from 1infras/feature/kafka-consumer-v2
Browse files Browse the repository at this point in the history
Add kafka consumer v2 with sarama
  • Loading branch information
ducmeit1 authored Aug 31, 2020
2 parents 356112f + 2692cdf commit 6818279
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 153 deletions.
55 changes: 1 addition & 54 deletions driver/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/1infras/go-kit/util/cert_utils"
"github.com/1infras/go-kit/util/file_utils"
"github.com/Shopify/sarama"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -57,56 +56,4 @@ func NewDefaultKafkaConnection() (*Connection, error) {
}

return c, nil
}

//NewKafkaConfig
func NewKafkaConfig(conn *Connection) *sarama.Config {
c := sarama.NewConfig()
c.Producer.RequiredAcks = sarama.WaitForAll
c.Producer.Return.Successes = true
c.Producer.Return.Errors = true
if conn.TLS != nil {
c.Net.TLS.Enable = true
c.Net.TLS.Config = conn.TLS
}
c.Producer.Partitioner = sarama.NewRandomPartitioner
return c
}

//CreateAsyncProducer
func CreateAsyncProducer(conn *Connection) (sarama.AsyncProducer, error) {
if conn == nil {
return CreateAsyncProducerFromDefaultConnection()
}

c := NewKafkaConfig(conn)
return sarama.NewAsyncProducer(conn.Brokers, c)
}

//CreateSyncProducer
func CreateSyncProducer(conn *Connection) (sarama.SyncProducer, error) {
if conn == nil {
return CreateSyncProducerFromDefaultConnection()
}

c := NewKafkaConfig(conn)
return sarama.NewSyncProducer(conn.Brokers, c)
}

//CreateAsyncProducerFromDefaultConnection
func CreateAsyncProducerFromDefaultConnection() (sarama.AsyncProducer, error) {
c, err := NewDefaultKafkaConnection()
if err != nil {
return nil, err
}
return CreateAsyncProducer(c)
}

//CreateSyncProducerFromDefaultConnection
func CreateSyncProducerFromDefaultConnection() (sarama.SyncProducer, error) {
c, err := NewDefaultKafkaConnection()
if err != nil {
return nil, err
}
return CreateSyncProducer(c)
}
}
182 changes: 83 additions & 99 deletions driver/kafka/kafka_consumer.go
Original file line number Diff line number Diff line change
@@ -1,141 +1,125 @@
package kafka

import (
"fmt"
"github.com/1infras/go-kit/logger"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/spf13/viper"
"github.com/Shopify/sarama"
"os"
"os/signal"
"strings"
"syscall"
)

//KafkaConsumerConfigMap --
type ConsumerConfigMap struct {
Servers string `json:"servers"`
Topic string `json:"topic"`
TLS bool `json:"tls"`
TrustStorePath string `json:"trust_store_path"`
TrustStorePassword string `json:"trust_store_password"`
KeyStorePath string `json:"key_store_path"`
KeyStorePassword string `json:"key_store_password"`
Offset string `json:"offset"`
SessionTimeoutMs int `json:"session_timeout_ms"`
ConsumerGroupID string `json:"consumer_group_id"`
}
//NewConsumerConfig -
func NewConsumerConfig(conn *Connection) *sarama.Config {
c := sarama.NewConfig()

//LoadConsumerConfigMap --
func LoadConsumerConfigMap() *ConsumerConfigMap {
return &ConsumerConfigMap{
Servers: viper.GetString("kafka.servers"),
Topic: viper.GetString("kafka.topic"),
TLS: viper.GetBool("kafka.tls"),
TrustStorePath: viper.GetString("kafka.truststore_path"),
TrustStorePassword: viper.GetString("kafka.truststore_password"),
KeyStorePath: viper.GetString("kafka.keystore_path"),
KeyStorePassword: viper.GetString("kafka.keystore_password"),
Offset: viper.GetString("kafka.offset"),
SessionTimeoutMs: viper.GetInt("kafka.session_timeout_ms"),
ConsumerGroupID: viper.GetString("kafka.consumer_group_id"),
if conn.TLS != nil {
c.Net.TLS.Enable = true
c.Net.TLS.Config = conn.TLS
}
}

//KafkaConsumer --
type Consumer struct {
Consumer *kafka.Consumer
Topics []string
c.Consumer.Offsets.Initial = sarama.OffsetNewest
c.Consumer.Return.Errors = true

return c
}

//ToConfigMap --
func (c *ConsumerConfigMap) ToConfigMap() *kafka.ConfigMap {
if c.Offset == "" {
c.Offset = "latest"
//CreateConsumer -
func CreateConsumer(conn *Connection) (sarama.Consumer, error) {
if conn == nil {
return CreateConsumerFromDefaultConnection()
}

if c.ConsumerGroupID == "" {
c.ConsumerGroupID = "flink-kafka-consumer-group"
}
c := NewConsumerConfig(conn)
return sarama.NewConsumer(conn.Brokers, c)
}

if c.SessionTimeoutMs <= 0 {
c.SessionTimeoutMs = 60000
//CreateConsumerGroup -
func CreateConsumerGroup(conn *Connection, group string) (sarama.ConsumerGroup, error) {
if group == "" {
return nil, fmt.Errorf("group must be defined")
}

cm := &kafka.ConfigMap{
"bootstrap.servers": c.Servers,
"broker.address.family": "v4", //Avoid connecting to IPv6 brokers
"group.id": c.ConsumerGroupID,
"session.timeout.ms": c.SessionTimeoutMs,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": c.Offset,
if conn == nil {
return CreateConsumerGroupFromDefaultConnection(group)
}

if c.TLS {
cm.SetKey("security.protocol", "SSL")
cm.SetKey("ssl.truststore.location", c.TrustStorePath)
cm.SetKey("ssl.truststore.password", c.TrustStorePassword)
cm.SetKey("ssl.keystore.location", c.KeyStorePath)
cm.SetKey("ssl.keystore.password", c.KeyStorePassword)
c := NewConsumerConfig(conn)
return sarama.NewConsumerGroup(conn.Brokers, group, c)
}

//CreateConsumerFromDefaultConnection -
func CreateConsumerFromDefaultConnection() (sarama.Consumer, error) {
c, err := NewDefaultKafkaConnection()
if err != nil {
return nil, err
}

return cm
return CreateConsumer(c)
}

//NewConsumer --
func NewConsumer(configMap *ConsumerConfigMap) (*Consumer, error) {
c, err := kafka.NewConsumer(configMap.ToConfigMap())
//CreateConsumerGroupFromDefaultConnection -
func CreateConsumerGroupFromDefaultConnection(group string) (sarama.ConsumerGroup, error) {
if group == "" {
return nil, fmt.Errorf("group must be defined")
}

c, err := NewDefaultKafkaConnection()
if err != nil {
return nil, err
}

return &Consumer{
Consumer: c,
Topics: []string{configMap.Topic}}, nil
return CreateConsumerGroup(c, group)
}

//Subscribes --
func (c *Consumer) Subscribes(fn func(message *kafka.Message)) error {
sigChan := make(chan os.Signal, 1)
//Consume -
func Consume(consumer sarama.Consumer, topic string, fn func(msg *sarama.ConsumerMessage)) error {
if topic == "" {
return fmt.Errorf("topic must be defined")
}

var (
sigChan = make(chan os.Signal, 1)
mc = make(chan *sarama.ConsumerMessage, 256)
)

signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

err := c.Consumer.SubscribeTopics(c.Topics, nil)
partitions, err := consumer.Partitions(topic)
if err != nil {
return err
}

logger.Infof("Starting consume topic: %v", strings.Join(c.Topics, ", "))
for _, p := range partitions {
pc, err := consumer.ConsumePartition(topic, p, sarama.OffsetNewest)
if err != nil {
return err
}

for {
select {
case <- sigChan:
logger.Infof("Closing consumer")
err := c.Consumer.Close()
if err != nil {
return err
}
close(sigChan)
return nil
case event := <-c.Consumer.Events():
switch e := event.(type) {
case kafka.AssignedPartitions:
err := c.Consumer.Assign(e.Partitions)
if err != nil {
return err
}
logger.Infof("Assigned Partitions %v\n", e)
case kafka.RevokedPartitions:
err := c.Consumer.Unassign()
if err != nil {
return err
}
logger.Infof("Revoked Partitions %v\n", e)
case kafka.PartitionEOF:
case kafka.Error:
return e
case *kafka.Message:
fn(e)
go func(pc sarama.PartitionConsumer) {
for m := range pc.Messages() {
mc <- m
}
}(pc)
}

run := true
for run == true {
select {
case <-sigChan:
run = false
case message := <-mc:
fn(message)
}
}

close(sigChan)
logger.Infof("Closing consumer")
return consumer.Close()
}

//TODO: Write ConsumeGroup
//ConsumeGroup -
func ConsumeGroup(consumerGroup sarama.ConsumerGroup, topic string, fn func(msg *sarama.ConsumerMessage)) error {
return nil
}
63 changes: 63 additions & 0 deletions driver/kafka/kafka_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package kafka

import "github.com/Shopify/sarama"

//NewProducerConfig
func NewProducerConfig(conn *Connection) *sarama.Config {
c := sarama.NewConfig()

c.Producer.RequiredAcks = sarama.WaitForAll
c.Producer.Return.Successes = true
c.Producer.Return.Errors = true

if conn.TLS != nil {
c.Net.TLS.Enable = true
c.Net.TLS.Config = conn.TLS
}

c.Producer.Partitioner = sarama.NewRandomPartitioner

return c
}

//CreateAsyncProducer
func CreateAsyncProducer(conn *Connection) (sarama.AsyncProducer, error) {
if conn == nil {
return CreateAsyncProducerFromDefaultConnection()
}

c := NewProducerConfig(conn)

return sarama.NewAsyncProducer(conn.Brokers, c)
}

//CreateSyncProducer
func CreateSyncProducer(conn *Connection) (sarama.SyncProducer, error) {
if conn == nil {
return CreateSyncProducerFromDefaultConnection()
}

c := NewProducerConfig(conn)

return sarama.NewSyncProducer(conn.Brokers, c)
}

//CreateAsyncProducerFromDefaultConnection
func CreateAsyncProducerFromDefaultConnection() (sarama.AsyncProducer, error) {
c, err := NewDefaultKafkaConnection()
if err != nil {
return nil, err
}

return CreateAsyncProducer(c)
}

//CreateSyncProducerFromDefaultConnection
func CreateSyncProducerFromDefaultConnection() (sarama.SyncProducer, error) {
c, err := NewDefaultKafkaConnection()
if err != nil {
return nil, err
}

return CreateSyncProducer(c)
}

0 comments on commit 6818279

Please sign in to comment.