加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

spring-integration-kafka配置使用者从指定分区接收消息

发布时间:2020-12-15 01:29:37 所属栏目:大数据 来源:网络整理
导读:我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息. 示例我想生成到分区3的消息,而消费只会从分区3接收消息. 到目前为止,我的主题有8个分区,我可以向特定分

我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息.

示例我想生成到分区3的消息,而消费只会从分区3接收消息.

到目前为止,我的主题有8个分区,我可以向特定分区生成消息,但我还没有找到配置消费者的方法,只接收来自特定分区的消息.

所以关于如何使用spring-integration-kafka配置使用者的任何建议,或者其他任何需要与KafkaConsumer.java类配合使用的建议都可以从特定分区接收消息.

谢谢.

这是我的代码:

kafka-producer-context.xml

KafkaProducer.java

public class KafkaProducer {

private static final Logger logger = LoggerFactory
        .getLogger(KafkaProducer.class);

@Autowired
private MessageChannel inputToKafka;

public void sendMessage(String message) {

    try {
        inputToKafka.send(MessageBuilder.withPayload(message)
                    .setHeader(KafkaHeaders.TOPIC,"testTopic")
                    .setHeader(KafkaHeaders.PARTITION_ID,3).build());
    } catch (Exception e) {
        logger.error(String.format(
                "Failed to send [ %s ] to topic %s ",message,topic),e);
    }
}

}

kafka-consumer-context.xml

KafkaConsumer.java

public class KafkaConsumer {

private static final Logger log = LoggerFactory
        .getLogger(KafkaConsumer.class);

@Autowired
KafkaService kafkaService;

public void processMessage(Map

所以我的问题就在这里.当我向分区3或任何分区生成消息时,KafkaConsumer始终收到消息.我想要的只是:KafkaConsumer只接收来自分区3的消息,而不是来自其他分区的消息.

再次感谢.

最佳答案
你需要使用message-driven-channel-adapter.

As a variant,the KafkaMessageListenerContainer can accept org.springframework.integration.kafka.core.Partition array argument to specify topics and their partitions pair.

您需要使用this constructor连接侦听器容器,并使用listener-container属性将其提供给适配器.

我们将通过示例更新自述文件.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读