scala – 如何在使用数据帧时下推限制Cassandra的谓词?
我有大型的Cassandra桌子.我想从Cassandra只加载50行.
以下代码 val ds = sparkSession.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> s"$Aggregates","keyspace" -> s"$KeySpace")) .load() .where(col("aggregate_type") === "DAY") .where(col("start_time") <= "2018-03-28") .limit(50).collect() 以下代码从方法中推送两个谓词,但不限制一个.获取整个数据(100万条记录)是真的吗?如果没有,为什么运行此代码和代码的时间没有限制(50)大致相同. 解决方法
与Spark Streaming不同,Spark本身试图尽可能快地预加载尽可能多的数据,以便能够并行地对其进行操作.因此预加载是懒惰的,但是当它被触发时会贪婪.然而,有cassandra-conector特定因素:
> Automatic predicate pushdown有效“where”条款.
可能的解决方案: >可以通过限制numPartitions和数据交换率来部分管理DataFrame限制( 代码如下: filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect() 这会将LIMIT $N附加到每个CQL请求.与DataFrame的限制不同,如果多次指定CassandraRDD限制(.limit(10).limit(20)) – 只会追加最后一个.此外,我使用n而不是n / numPartitions 1(即使Spark和Cassandra分区是一对一的),每个分区可能返回较少的结果.结果,我不得不添加take(n)以便将< = numPartitions * n减少到n. 警告请仔细检查您的位置是否可以转换为CQL(使用explain()) – 否则将在过滤之前应用LIMIT. 附:您还可以尝试使用sparkSession.sql(…)(like here)直接运行CQL并比较结果. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |