Skip to content

Commit e2127d2

Browse files
committed
adopted version; not ready for normal use yet
1 parent 20de27b commit e2127d2

File tree

5 files changed

+18
-32
lines changed

5 files changed

+18
-32
lines changed

README.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Add the following to your http://github.com/technomancy/leiningen[Leiningen's]
1717

1818
[source,clojure]
1919
----
20-
[net.tbt-post/clj-kafka-x "0.3.0"]
20+
[net.tbt-post/clj-kafka-x "0.3.1"]
2121
----
2222

2323
== Usage

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
(defproject net.tbt-post/clj-kafka-x "0.3.0"
1+
(defproject net.tbt-post/clj-kafka-x "0.3.1"
22
:description "A Clojure wrapper for Apache Kafka v2.0.0 client"
33
:url "https://github.com/source-c/clj-kafka-x"
44
:license {:name "Apache License 2.0"

src/clj_kafka_x/admin.clj

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
(ns clj-kafka-x.admin
22
(:require [clj-kafka-x.data :refer [map->properties]])
33
(:import kafka.admin.AdminUtils
4-
kafka.admin.RackAwareMode.Enforced
4+
(kafka.admin RackAwareMode RackAwareMode$Enforced$)
55
kafka.utils.ZkUtils))
66

77
(defn zk-utils
88
""
99
[zk-url & {:keys [session-timeout connection-timeout security-enabled]
10-
:or {session-timeout 1000
11-
connection-timeout 1000
12-
security-enabled false}}]
10+
:or {session-timeout 1000
11+
connection-timeout 1000
12+
security-enabled false}}]
1313
(ZkUtils/apply zk-url session-timeout connection-timeout (Boolean/valueOf security-enabled)))
1414

1515

@@ -30,15 +30,15 @@
3030
(create-topic z-utils \"topic-b\" :topic-config {\"cleanup.policy\" \"compact\"})
3131
"
3232
[z-utils topic & {:keys [partitions replication-factor topic-config]
33-
:or {partitions 1
34-
replication-factor 1
35-
topic-config nil}}]
33+
:or {partitions 1
34+
replication-factor 1
35+
topic-config nil}}]
3636
(AdminUtils/createTopic z-utils
3737
topic
3838
(int partitions)
3939
(int replication-factor)
4040
(map->properties topic-config)
41-
(Enforced)))
41+
RackAwareMode$Enforced$))
4242

4343
(defn topic-exists?
4444
"Returns true or false dependant on the existance of the given topic"

src/clj_kafka_x/consumers/ballanced.clj

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
(ns clj-kafka-x.consumers.ballanced
22
(:require [clojure.string :refer [join]])
33
(:import (java.util.concurrent Executors)
4-
(kafka.consumer Consumer ConsumerConfig)
5-
(org.apache.kafka.clients.producer KafkaProducer)
4+
(org.apache.kafka.clients.consumer KafkaConsumer Consumer ConsumerConfig ConsumerRecords)
65
(java.util Properties)
7-
(kafka.consumer KafkaStream)
8-
(clojure.lang PersistentArrayMap)
9-
(org.apache.kafka.clients.producer ProducerRecord)))
6+
(clojure.lang PersistentArrayMap)))
107

118
(defrecord KafkaMessage [topic partition offset key message])
129

@@ -18,15 +15,15 @@
1815
([stream thread-num spec]
1916
(consume-messages stream thread-num spec (:id spec)))
2017
([stream thread-num spec id]
21-
(let [it (.iterator ^KafkaStream stream)]
18+
(let [it (.iterator ^ConsumerRecords stream)]
2219
(while (.hasNext it)
2320
(let [msg (.next it)
2421
kmsg (KafkaMessage.
2522
(.topic msg)
2623
(.partition msg)
2724
(.offset msg)
2825
(.key msg)
29-
(.message msg))
26+
(.value msg))
3027
prc @(resolve (:processor spec))]
3128

3229
(process-msg prc kmsg id)))
@@ -84,7 +81,7 @@
8481
([config blist prefix]
8582
(create-consumer blist prefix (:group (:consumer config))))
8683
([config blist prefix grname]
87-
(Consumer/createJavaConsumerConnector
84+
(KafkaConsumer.
8885
(create-consumer-config config (->zklist blist prefix) grname))))
8986

9087
(defn- shutdown-topic [topic obj]
@@ -110,10 +107,10 @@
110107
tpool (Executors/newFixedThreadPool psize)
111108
consumer (create-consumer zkpool zkpref group)]
112109
(try (swap! storage merge
113-
{T {:instance (hk/consume-topic
110+
{T {:instance (consume-topic
114111
consumer T
115112
tpool psize
116113
(get topics-list T))
117114
:pool tpool}})
118115
(catch Exception e (do (shutdown-topic T {:pool tpool}))))))
119-
storage)
116+
storage)

src/clj_kafka_x/data.clj

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22
(:import [java.util HashMap Map Properties]
33
[org.apache.kafka.clients.consumer ConsumerRecord ConsumerRecords OffsetAndMetadata]
44
org.apache.kafka.clients.producer.RecordMetadata
5-
[org.apache.kafka.common Metric MetricName Node PartitionInfo TopicPartition]
6-
7-
(kafka.consumer Consumer ConsumerConfig KafkaStream)
8-
(kafka.message MessageAndMetadata)))
5+
[org.apache.kafka.common Metric MetricName Node PartitionInfo TopicPartition]))
96

107
(defprotocol ToClojure
118
""
@@ -30,14 +27,6 @@
3027
:key (.key x)
3128
:value (.value x)})
3229

33-
MessageAndMetadata
34-
(to-clojure [x]
35-
{:topic (.topic x)
36-
:partition (.partition x)
37-
:offset (.offset x)
38-
:key (.key x)
39-
:value (.message x)}) ;; FIME: :value/:message
40-
4130
ConsumerRecords
4231
(to-clojure [x]
4332
(mapv to-clojure x))

0 commit comments

Comments
 (0)