scala – 使用Spark Streaming从Cassandra中读取
发布时间:2020-12-16 09:26:15 所属栏目:安全 来源:网络整理
导读:当我使用火花流从Cassandra读取时,我遇到了问题. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext 作为上面的链接,我使用 val rdd = ssc.cassandraTable("streami
当我使用火花流从Cassandra读取时,我遇到了问题.
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext 作为上面的链接,我使用 val rdd = ssc.cassandraTable("streaming_test","key_value").select("key","value").where("fu = ?",3) 从cassandra中选择数据,但似乎火花流只有一次查询,但我希望它继续使用10秒的间隔进行查询. 我的代码如下,希望您的回复. 谢谢! import org.apache.spark._ import org.apache.spark.streaming._ import com.datastax.spark.connector.streaming._ import org.apache.spark.rdd._ import scala.collection.mutable.Queue object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host","127.0.0.1") val ssc = new StreamingContext(conf,Seconds(10)) val rdd = ssc.cassandraTable("mykeyspace","users").select("fname","lname").where("lname = ?","yu") //rdd.collect().foreach(println) val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]() val dstream = ssc.queueStream(rddQueue) dstream.print() ssc.start() rdd.collect().foreach(println) rddQueue += rdd ssc.awaitTermination() } } 解决方法
您可以使用CassandraRDD作为输入创建ConstantInputDStream. ConstantInputDStream将在每个流间隔上提供相同的RDD,并且通过对该RDD执行操作,您将触发RDD沿袭的实现,从而导致每次都在Cassandra上执行查询.
确保要查询的数据不会无限制地增加,以避免增加查询时间并导致不稳定的流式处理. 像这样的东西应该成功(使用你的代码作为起点): import org.apache.spark.streaming.dstream.ConstantInputDStream val ssc = new StreamingContext(conf,Seconds(10)) val cassandraRDD = ssc.cassandraTable("mykeyspace","yu") val dstream = new ConstantInputDStream(ssc,cassandraRDD) dstream.foreachRDD{ rdd => // any action will trigger the underlying cassandra query,using collect to have a simple output println(rdd.collect.mkString("n")) } ssc.start() ssc.awaitTermination() (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |