加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 将Cassandra查询数据组合/更新为从Kafka收到的结构化流

发布时间:2020-12-16 08:56:52 所属栏目:安全 来源:网络整理
导读:我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据. 为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息 我有点陷入困境,围绕如何在整个集群中保持Cassandra数据可用,并且不时地以某种方式更新数据,以
我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据.

为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息

我有点陷入困境,围绕如何在整个集群中保持Cassandra数据可用,并且不时地以某种方式更新数据,以防我们对数据库表进行了一些更改.

目前,我在使用Datastax Spark-Cassandra-connector本地启动Spark后立即查询数据库

val cassandraSensorDf = spark
  .read
  .cassandraFormat("specifications","sensors")
  .load

从这里开始,我可以使用这个cassandraSensorDs将其与我的结构化流数据集连接起来.

.join(
   cassandraSensorDs,sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)

如何在结构化流运行时更新此Cassandra数据?
如何在群集设置中提供查询的数据?

解决方法

使用广播变量,您可以编写一个包装器来定期从Cassandra获取数据并更新广播变量.使用广播变量对流进行地图侧连接.我没有测试过这种方法,我认为根据您的使用情况(吞吐量),这可能是一种过度杀伤力.

How can I update a broadcast variable in spark streaming?

另一种方法是查询流中每个项目的Cassandra,以优化连接,确保使用连接池并为JVM /分区仅创建一个连接.这种方法更简单,您不必担心定期加热Cassandra数据.

spark-streaming and connection pool implementation

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读