python – 作为实时kafka消费者的Flask API
发布时间:2020-12-20 13:15:21 所属栏目:Python 来源:网络整理
导读:我想构建一个使用Flask框架开发的 python API,它使用Kafka主题并将流推送到客户端(html页面或其他应用程序). 我尝试使用虚拟数据生成实时流程(请参阅下面的实时路线).发生的问题是结果变量仅在循环结束后被推送,而结果变量应该在每次迭代时被推送. 我还尝试
我想构建一个使用Flask框架开发的
python API,它使用Kafka主题并将流推送到客户端(html页面或其他应用程序).
我尝试使用虚拟数据生成实时流程(请参阅下面的实时路线).发生的问题是结果变量仅在循环结束后被推送,而结果变量应该在每次迭代时被推送. 我还尝试使用Kafka连接生成实时流(请参阅下面的kafka路线).问题是没有返回数据,而是请求没有完成. from flask import Response,Flask import time from kafka import KafkaConsumer application = Flask(__name__) @application.route('/') def index(): return "Hello,World!" @application.route('/realtime/') def realtime(): def createGenerator(): for i in range(1,10): yield str(i) + 'n' time.sleep(0.2) return Response(createGenerator()) @application.route('/kafka/') def kafkaStream(): consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',client_id = 'name of client',auto_offset_reset = 'earliest',value_deserializer = lambda m: json.loads(m.decode('ascii'))) consumer.subscribe(topics=['my-topic']) def events(): result = [] for message in consumer: if message is not None: result.append(message.value) yield result return Response(events()) if __name__ == '__main__': application.run(debug = True) 到目前为止,我有效地从Kafka接收数据的唯一方法是在控制台中打印结果. from kafka import KafkaConsumer consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',value_deserializer = lambda m: json.loads(m.decode('ascii'))) consumer.subscribe(topics=['my-topic']) for message in consumer: print message 我认为问题是API在进程完成之前无法推送数据,并且因为KafkaConsumer连接是无限的,所以没有任何东西被推送到客户端. 我怎样才能克服这个问题? 解决方法
既然我想了解更多相关信息,请花一些时间.经过4个小时的尝试,注意到:
def events(): result = [] for message in consumer: if message is not None: result.append(str(message.value)) # <--- here (str) yield result (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |