-
Notifications
You must be signed in to change notification settings - Fork 2
/
kafka_test.py
82 lines (60 loc) · 2.15 KB
/
kafka_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# from sys import api_version
# from kafka import KafkaProducer
# import time
# import json
# from datetime import datetime
# def kf():
# #producer = KafkaProducer(boostrap_servers='localhost:9092')
# producer = KafkaProducer(
# value_serializer=lambda m: json.dumps(m).encode('UTF-8')
# ,bootstrap_servers=['10.10.1.3:39092'])
# for i in range(1,100):
# producer.send('t1',value={"hello":i})
# time.sleep(0.02)
# #future = producer.send('youtube',b'hello')
# #result = future.get(timeout=10)
# #producer.flush()
# kf()
import json
from sys import api_version
from kafka import KafkaProducer
from kafka.errors import KafkaError
import msgpack
producer = KafkaProducer(bootstrap_servers=['10.10.1.3:9092'],api_version=(0,11,5))
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)