安装kafka-python:
C:anaconda3Scripts>pip install kafka-python
import datetime import json from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError
''' 使用kafka-python的生产模块 ''' class Kafka_producer(): def __init__(self,bootstrapServers,kafkaTopic): self.bootstrapServers = bootstrapServers self.kafkaTopic = kafkaTopic self.producer = KafkaProducer(bootstrap_servers=self.bootstrapServers)
def sendjsondata(self,params): try: parmas_message = json.dumps(params) producer = self.producer future = producer.send(self.kafkaTopic,parmas_message.encode('utf-8')) producer.flush() recordMetadata = future.get(timeout=10) print(recordMetadata,datetime.datetime.now().strftime('%Y%m%d%H%M%S')) except KafkaError as e: print(e)
''' 使用Kafka-python的消费模块 ''' class Kafka_consumer(): def __init__(self,kafkaTopic,groupId): self.kafkaTopic = kafkaTopic self.bootstrapServers = bootstrapServers self.groupId = groupId self.consumer = KafkaConsumer(self.kafkaTopic,group_id=self.groupId,bootstrap_servers=self.bootstrapServers)
def consume_data(self): try: for message in self.consumer: yield message except BaseException as e: print(e)
if __name__ == '__main__': bootstrapServers = ['ip1:port1','ip2:port2','ip3:port3'] topicStr = '主题'
print('-' * 20) print('生产者') print('-' * 20)
producer = Kafka_producer(bootstrapServers,topicStr) for id in range(5): params = '{tst}:{null}---' + str(id) producer.sendjsondata(params)
print('-' * 20) print('消费者') print('-' * 20)
groupId = 'group名称' consumer = Kafka_consumer(bootstrapServers,topicStr,groupId) message = consumer.consume_data() for i in message: print(i.value)
? (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|