加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在使用数据帧时下推限制Cassandra的谓词?

发布时间:2020-12-16 18:14:46 所属栏目:安全 来源:网络整理
导读:我有大型的Cassandra桌子.我想从Cassandra只加载50行. 以下代码 val ds = sparkSession.read .format("org.apache.spark.sql.cassandra") .options(Map("table" - s"$Aggregates","keyspace" - s"$KeySpace")) .load() .where(col("aggregate_type") === "DA
我有大型的Cassandra桌子.我想从Cassandra只加载50行.
以下代码

val ds = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> s"$Aggregates","keyspace" -> s"$KeySpace"))
      .load()
      .where(col("aggregate_type") === "DAY")
      .where(col("start_time") <= "2018-03-28")
      .limit(50).collect()

以下代码从方法中推送两个谓词,但不限制一个.获取整个数据(100万条记录)是真的吗?如果没有,为什么运行此代码和代码的时间没有限制(50)大致相同.

解决方法

与Spark Streaming不同,Spark本身试图尽可能快地预加载尽可能多的数据,以便能够并行地对其进行操作.因此预加载是懒惰的,但是当它被触发时会贪婪.然而,有cassandra-conector特定因素:

> Automatic predicate pushdown有效“where”条款.
>根据this answer限制(…)未转换为CQL的LIMIT,因此其行为取决于在下载足够数据后创建的提取作业数量.引用:

calling limit will allow Spark to skip reading some portions from the
underlying DataSource. These would limit the amount of data read from
Cassandra by canceling tasks from being executed.

可能的解决方案:

>可以通过限制numPartitions和数据交换率来部分管理DataFrame限制(concurrent.reads and other params).如果你在大多数情况下你可以使用n~50“,你也可以限制像where(dayIndex< 50 * factor * num_records).
>有一种方法可以通过SparkPartitionLimit设置CQL LIMIT,它直接影响每个CQL请求(see more) – 请记住请求是每个spark-partition.它在CassandraRdd扩展类中可用,因此您必须先转换为RDD.

代码如下:

filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()

这会将LIMIT $N附加到每个CQL请求.与DataFrame的限制不同,如果多次指定CassandraRDD限制(.limit(10).limit(20)) – 只会追加最后一个.此外,我使用n而不是n / numPartitions 1(即使Spark和Cassandra分区是一对一的),每个分区可能返回较少的结果.结果,我不得不添加take(n)以便将< = numPartitions * n减少到n. 警告请仔细检查您的位置是否可以转换为CQL(使用explain()) – 否则将在过滤之前应用LIMIT. 附:您还可以尝试使用sparkSession.sql(…)(like here)直接运行CQL并比较结果.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读