加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 由元素类引发的spark rdd过滤器

发布时间:2020-12-16 18:37:04 所属栏目:安全 来源:网络整理
导读:我有一个RDD与不同类型的元素,我想按他们的类型计算它们,例如,下面的代码将正常工作. scala val rdd = sc.parallelize(List(1,2.0,"abc"))rdd: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[0] at parallelize at console:24scala rdd.filter{cas
我有一个RDD与不同类型的元素,我想按他们的类型计算它们,例如,下面的代码将正常工作.

scala> val rdd = sc.parallelize(List(1,2.0,"abc"))
rdd: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.filter{case z:Int => true; case _ => false}.count
res0: Long = 1

scala> rdd.filter{case z:String => true; case _ => false}.count
res1: Long = 1

现在,如果元素是用户定义的类型,下面的代码将无法按预期工作.

scala> class TypeA extends Serializable              // this is the base class
defined class TypeA

scala> case class TypeB(id:Long) extends TypeA       // derived class 1
defined class TypeB

scala> case class TypeC(name:String) extends TypeA   // derived class 2
defined class TypeC

scala> val rdd1 = sc.parallelize(List(TypeB(123),TypeC("jack"),TypeB(456)))   // create an rdd with different types of elements
rdd1: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[3] at parallelize at <console>:29

scala> rdd1.count              // total size is correct
res2: Long = 3

scala> rdd1.filter{case z:TypeB => true; case _ => false}.count   // what the hell?
res3: Long = 0

scala> rdd1.filter{case z:TypeC => true; case _ => false}.count   // again ?
res4: Long = 0

scala> rdd1.filter{case z:TypeA => true; case _ => false}.count   // only works for the base class?
res5: Long = 3

我在这里错过了吗?请帮忙!

解决方法

这看起来像是 Spark-1199的变体,可能是REPL错误.

这会在IDEA内部运行时产生预期的行为:

import org.apache.spark.SparkContext

class TypeA extends Serializable
case class TypeB(id:Long) extends TypeA
case class TypeC(name:String) extends TypeA

val sc = new SparkContext("local[*]","swe")
val rdd = sc.parallelize(List(TypeB(12),TypeC("Hsa")))

rdd.filter { case x: TypeB => true; case _ => false }.count()

产量:

import org.apache.spark.SparkContext

defined class TypeA
defined class TypeB
defined class TypeC   

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@10a1410d
rdd: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[0] at parallelize at <console>:18

[Stage 0:>....... (0 + 0) / 4]
res0: Long = 1

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读