加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何使用orElse组成的部分函数作为spark中的udf

发布时间:2020-12-16 18:14:25 所属栏目:安全 来源:网络整理
导读:正如问题所述,我想使用由orElse组成的部分函数作为spark中的udf.这是一个可以在spark shell中运行的示例: val df = sc.parallelize(1 to 15).toDF("num")df.show//Testing out a normal udf - this worksval gt5: (Int = String) = num = (num 5).toStringv
正如问题所述,我想使用由orElse组成的部分函数作为spark中的udf.这是一个可以在spark shell中运行的示例:

val df = sc.parallelize(1 to 15).toDF("num")
df.show

//Testing out a normal udf - this works
val gt5: (Int => String) = num => (num > 5).toString
val gt5Udf = udf(gt5)
df.withColumn("gt5",gt5Udf(col("num"))).show

//Now create a udf of a partial function composed with orElse
val baseline: PartialFunction[Int,String] = { case _ => "baseline" }
val ge3: PartialFunction[Int,String] = { case x if x >= 3 => ">=3" }
val ge7: PartialFunction[Int,String] = { case x if x >= 7 => ">=7" }
val ge12: PartialFunction[Int,String] = { case x if x >= 12 => ">=12" }

val composed: PartialFunction[Int,String] = ge12 orElse ge7 orElse ge3 orElse baseline
val composedUdf = udf(composed)

//This fails (but this is what I'd like to do)
df.withColumn("pf",composedUdf(col("num"))).show

//Use a partial function not composed with orElse - this works
val baselineUdf = udf(baseline)
df.withColumn("pf",baselineUdf(col("num"))).show

我目前在具有以下配置的三节点独立群集上运行此操作:

>火花:1.6.0
> hdfs:2.4.1
>斯卡拉:2.10.5

我在这个答案中找到了我认为的线索:Why Scala can serialize Function but not PartialFunction?

所以我试过了:

scala> composed.isInstanceOf[Serializable]
res: Boolean = false

scala> composedUdf.isInstanceOf[Serializable]
res: Boolean = true

scala> baseline.isInstanceOf[Serializable]
res: Boolean = true

scala> baselineUdf.isInstanceOf[Serializable]
res: Boolean = true

我在这里变得模糊,但似乎用orElse组成一个部分函数会删除序列化?

我认为最具信息性的错误是:

org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse
...

我该如何解决这个问题?还是我离开基地?

在此先感谢您的帮助!

解决方法

如果你抬起它并将其包裹在另一个功能中,它应该可以工作.

val composed: Int => Option[String] = 
  x => (ge12 orElse ge7 orElse ge3 orElse baseline).lift.apply(x)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读