scala – 将Cassandra查询数据组合/更新为从Kafka收到的结构化流
发布时间:2020-12-16 08:56:52 所属栏目:安全 来源:网络整理
导读:我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据. 为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息 我有点陷入困境,围绕如何在整个集群中保持Cassandra数据可用,并且不时地以某种方式更新数据,以
我正在创建一个Spark Structured流应用程序,它将计算每10秒从Kafka收到的数据.
为了能够进行一些计算,我需要在Cassandra数据库中查找有关传感器和放置的一些信息 我有点陷入困境,围绕如何在整个集群中保持Cassandra数据可用,并且不时地以某种方式更新数据,以防我们对数据库表进行了一些更改. 目前,我在使用Datastax Spark-Cassandra-connector本地启动Spark后立即查询数据库 val cassandraSensorDf = spark .read .cassandraFormat("specifications","sensors") .load 从这里开始,我可以使用这个cassandraSensorDs将其与我的结构化流数据集连接起来. .join( cassandraSensorDs,sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey") ) 如何在结构化流运行时更新此Cassandra数据? 解决方法
使用广播变量,您可以编写一个包装器来定期从Cassandra获取数据并更新广播变量.使用广播变量对流进行地图侧连接.我没有测试过这种方法,我认为根据您的使用情况(吞吐量),这可能是一种过度杀伤力.
How can I update a broadcast variable in spark streaming? 另一种方法是查询流中每个项目的Cassandra,以优化连接,确保使用连接池并为JVM /分区仅创建一个连接.这种方法更简单,您不必担心定期加热Cassandra数据. spark-streaming and connection pool implementation (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |