加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用匿名函数时Spark TaskNotSerializable

发布时间:2020-12-16 09:51:11 所属栏目:安全 来源:网络整理
导读:背景 这是我的情况:我正在尝试创建一个基于内容的某些功能过滤RDD的类,但该功能在不同的场景中可能有所不同,所以我想用函数对其进行参数化.不幸的是,我似乎遇到了Scala捕获其闭包的问题.即使我的函数是可序列化的,但类不是. 从spark source on closure clea
背景

这是我的情况:我正在尝试创建一个基于内容的某些功能过滤RDD的类,但该功能在不同的场景中可能有所不同,所以我想用函数对其进行参数化.不幸的是,我似乎遇到了Scala捕获其闭包的问题.即使我的函数是可序列化的,但类不是.

从spark source on closure cleaning中的例子来看,它似乎表明我的情况无法解决,但我确信有一种方法可以通过创建正确(较小)的闭包来实现我想要做的事情.

我的守则

class MyFilter(getFeature: Element => String,other: NonSerializable) {
  def filter(rdd: RDD[Element]): RDD[Element] = {
    // All my complicated logic I want to share
    rdd.filter { elem => getFeature(elem) == "myTargetString" }     
}

简化示例

class Foo(f: Int => Double,rdd: RDD[Int]) { 
  def go(data: RDD[Int]) = data.map(f) 
}

val works = new Foo(_.toDouble,otherRdd)
works.go(myRdd).collect() // works

val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x,0)
val doesntWork = new Foo(complicatedButSerializableFunc,otherRdd)
doesntWork.go(myRdd).collect() // craps out

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo
Serialization stack:
    - object not serializable (class: $iwC$$iwC$Foo,value: $iwC$$iwC$Foo@61e33118)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,name: foo,type: class $iwC$$iwC$Foo)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@47d6a31a)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1,name: $outer,type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1,<function1>)

// Even though
val out = new ObjectOutputStream(new FileOutputStream("test.obj"))
out.writeObject(complicatedButSerializableFunc) // works

问题

>为什么第一个简单示例不会尝试序列化所有Foo,但第二个示例呢?
>如何在我的闭包中不包含对Foo的引用的情况下获取对可序列化函数的引用?

解决方法

在 this article的帮助下找到了答案.

本质上,当为给定函数创建闭包时,Scala将包含引用的任何复杂字段的整个对象(如果有人对第一个简单示例中为什么不会发生这种情况有一个很好的解释,我会接受这个答案).解决方案是将可序列化值传递给不同的函数,以便仅保留最小引用,非常类似于事件侦听器的ol’javascript for-loop范例.

def enclose[E,R](enclosed: E)(func: E => R): R = func(enclosed)

class Foo(f: Int => Double,somethingNonserializable: RDD[String]) { 
 def go(data: RDD[Int]) = enclose(f) { actualFunction => data.map(actualFunction) } 
}

或者使用JS风格的自执行匿名函数

def go(data: RDD[Int]) = ((actualFunction: Int => Double) => data.map(actualFunction))(f)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读