加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark – 在没有开放流的情况下获得Kafka的最早和最新

发布时间:2020-12-16 19:24:02 所属栏目:安全 来源:网络整理
导读:我目前正在使用spark-streaming-kafka-0-10_2.11将我的spark应用程序与kafka队列连接起来. Streams一切正常.但是对于特定场景,我只需要kafka队列的整个内容一次 – 为此我得到了更好地使用KafkaUtils.createRDD( SparkStreaming: Read Kafka Stream and prov
我目前正在使用spark-streaming-kafka-0-10_2.11将我的spark应用程序与kafka队列连接起来. Streams一切正常.但是对于特定场景,我只需要kafka队列的整个内容一次 – 为此我得到了更好地使用KafkaUtils.createRDD( SparkStreaming: Read Kafka Stream and provide it as RDD for further processing)的建议

然而对于spark-streaming-kafka-0-10_2.11,我无法弄清楚如何获得创建我必须手工创建的RDRD方法所需的Kafka主题的最早和最新偏移量.

在不打开流的情况下获得这些偏移量的推荐方法是什么?任何帮助将不胜感激.

解决方法

在阅读了几个讨论之后,我能够从特定分区获得最早或最新的偏移:

val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic,initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

return offsets.head

但是,如何在kafka_consumer.sh CLI命令中复制“from_beginning”的行为是我不知道的KafkaUtils.createRDD aproach.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读