加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用Spark Streaming从Cassandra中读取

发布时间:2020-12-16 09:26:15 所属栏目:安全 来源:网络整理
导读:当我使用火花流从Cassandra读取时,我遇到了问题. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext 作为上面的链接,我使用 val rdd = ssc.cassandraTable("streami
当我使用火花流从Cassandra读取时,我遇到了问题.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

作为上面的链接,我使用

val rdd = ssc.cassandraTable("streaming_test","key_value").select("key","value").where("fu = ?",3)

从cassandra中选择数据,但似乎火花流只有一次查询,但我希望它继续使用10秒的间隔进行查询.

我的代码如下,希望您的回复.

谢谢!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host","127.0.0.1")

    val ssc = new StreamingContext(conf,Seconds(10))

    val rdd = ssc.cassandraTable("mykeyspace","users").select("fname","lname").where("lname = ?","yu")

    //rdd.collect().foreach(println)

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


    val dstream = ssc.queueStream(rddQueue)

    dstream.print()

    ssc.start()
    rdd.collect().foreach(println)
    rddQueue += rdd
    ssc.awaitTermination()
}

}

解决方法

您可以使用CassandraRDD作为输入创建ConstantInputDStream. ConstantInputDStream将在每个流间隔上提供相同的RDD,并且通过对该RDD执行操作,您将触发RDD沿袭的实现,从而导致每次都在Cassandra上执行查询.

确保要查询的数据不会无限制地增加,以避免增加查询时间并导致不稳定的流式处理.

像这样的东西应该成功(使用你的代码作为起点):

import org.apache.spark.streaming.dstream.ConstantInputDStream

val ssc = new StreamingContext(conf,Seconds(10))

val cassandraRDD = ssc.cassandraTable("mykeyspace","yu")

val dstream = new ConstantInputDStream(ssc,cassandraRDD)

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query,using collect to have a simple output
    println(rdd.collect.mkString("n")) 
}
ssc.start()
ssc.awaitTermination()

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读