加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – Kafka消费者配置/性能问题

发布时间:2020-12-15 01:06:35 所属栏目:Java 来源:网络整理
导读:我正在尝试将kafka作为AWS SQS的替代品.动机主要是提高性能,其中kafka将消除限制,一次性提取10条消息,上限为256kb.这是我的用例的高级场景.我有一堆爬虫正在发送索引文件.有效载荷的大小平均约为1 MB.爬虫调用SOAP端点,后者又运行生产者代码以将消息提交给ka

我正在尝试将kafka作为AWS SQS的替代品.动机主要是提高性能,其中kafka将消除限制,一次性提取10条消息,上限为256kb.这是我的用例的高级场景.我有一堆爬虫正在发送索引文件.有效载荷的大小平均约为1 MB.爬虫调用SOAP端点,后者又运行生产者代码以将消息提交给kafka队列.消费者应用程序接收消息并处理它们.对于我的测试框,我已经为主题配置了30个分区和2个复制.两个kafka实例正在运行1个zookeeper实例.卡夫卡版本是0.10.0.

对于我的测试,我在队列中发布了700万条消息.我创建了一个包含30个消费者线程的消费者组,每个分区一个.我最初的印象是,与我通过SQS获得的相比,这将大大加快处理能力.不幸的是,事实并非如此.就我而言,数据处理很复杂,平均需要1-2分钟才能完成.这导致了一系列的分区重新平衡,因为线程无法按时心跳.我可以在日志引用中看到一堆消息

Auto offset commit failed for group full_group: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
session.timeout.ms,which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in the poll() with max.poll.records.

这导致多次处理相同的消息.我尝试使用会话超时,max.poll.records和轮询时间来避免这种情况,但这会减慢整个处理时间.这是一些配置参数.

metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092,kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest

我将消费者轮询时间减少到100毫秒.它减少了重新平衡问题,消除了重复处理,但显着减慢了整个过程.与使用基于SQS的解决方案的25小时相比,最终花了35个小时来完成所有600万条消息的处理.每个消费者线程平均每次轮询检索50-60条消息,尽管其中一些有时会轮询0条记录.当分区中有大量消息时,我不确定这种行为.同一个线程能够在后续迭代期间获取消息.这可能是由于重新平衡?

这是我的消费者代码

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                }
            }
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker",ex);
    }

我理解记录处理部分是我的一个瓶颈.但我相信这里的一些人有类似处理大处理时间的用例.我想通过旋转其专用线程中的每个处理器或使用大容量的线程池来进行异步处理,但不确定它是否会在系统中产生很大的负载.与此同时,我看到过一些人们使用暂停和恢复API来执行处理以避免重新平衡问题的情况.

在这种情况下,我真的在寻找一些建议/最佳实践.特别是,推荐的配置设置围绕听力,请求超时,最大轮询记录,自动提交间隔,轮询间隔等,如果kafka不是我的用例的正确工具,请让我知道.

最佳答案
您可以在与从Kafka读取的线程不同的线程中异步处理消息.这样自动提交将非常快,Kafka不会削减您的会话.像这样的东西:

    private final BlockingQueue

在阅读帖子中:

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    requests.offer(textAnalysisObj);
                }
            }
     }    
}
catch(Exception ex){
    LOGGER.error("Error in Full Consumer group worker",ex);
}

在处理线程中:

            while (!Thread.currentThread().isInterrupted()) {
                try {
                    TextAnalysisRequest textAnalysisObj = requests.take();
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                } catch (InterruptedException e) {
                    LOGGER.info("Process thread interrupted",e);
                    Thread.currentThread().interrupt();
                } catch (Throwable t) {
                    LOGGER.warn("Unexpected throwable while processing.",t);
                }
            }

另请参阅此文档,了解通过Kafka发送大型邮件的策略:http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

简而言之,它表示Kafka在大约10K的小尺寸消息上表现最佳,如果您需要发送更大的消息,最好将它们放在网络存储上,然后通过Kafka发送它们的位置,或者拆分它们.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读