加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python中kafka生产者和消费者实现

发布时间:2020-12-20 10:12:31 所属栏目:Python 来源:网络整理
导读:安装kafka-python: C:anaconda3Scriptspip install kafka-python import datetime import json from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError ''' 使用kafka-python的生产模块 ''' class Kafka_

安装kafka-python:

C:anaconda3Scripts>pip install kafka-python


import datetime
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

'''
使用kafka-python的生产模块
'''
class Kafka_producer():
def __init__(self,bootstrapServers,kafkaTopic):
self.bootstrapServers = bootstrapServers
self.kafkaTopic = kafkaTopic
self.producer = KafkaProducer(bootstrap_servers=self.bootstrapServers)

def sendjsondata(self,params):
try:
parmas_message = json.dumps(params)
producer = self.producer
future = producer.send(self.kafkaTopic,parmas_message.encode('utf-8'))
producer.flush()
recordMetadata = future.get(timeout=10)
print(recordMetadata,datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
except KafkaError as e:
print(e)

'''
使用Kafka-python的消费模块
'''
class Kafka_consumer():
def __init__(self,kafkaTopic,groupId):
self.kafkaTopic = kafkaTopic
self.bootstrapServers = bootstrapServers
self.groupId = groupId
self.consumer = KafkaConsumer(self.kafkaTopic,group_id=self.groupId,bootstrap_servers=self.bootstrapServers)

def consume_data(self):
try:
for message in self.consumer:
yield message
except BaseException as e:
print(e)

if __name__ == '__main__':
bootstrapServers = ['ip1:port1','ip2:port2','ip3:port3']
topicStr = '主题'

print('-' * 20)
print('生产者')
print('-' * 20)

producer = Kafka_producer(bootstrapServers,topicStr)
for id in range(5):
params = '{tst}:{null}---' + str(id)
producer.sendjsondata(params)

print('-' * 20)
print('消费者')
print('-' * 20)

groupId = 'group名称'
consumer = Kafka_consumer(bootstrapServers,topicStr,groupId)
message = consumer.consume_data()
for i in message:
print(i.value)


?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读