加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 通过对String的反射定义spark udf

发布时间:2020-12-16 09:51:30 所属栏目:安全 来源:网络整理
导读:我试图从包含 scala函数定义的字符串中定义spark(2.0)中的udf.这是片段: val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universeimport universe._import scala.reflect.runtime.currentMirrorimport scala.tools.reflect.To
我试图从包含 scala函数定义的字符串中定义spark(2.0)中的udf.这是片段:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

这给了我一个错误:

Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
   at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
   at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
   at org.apache.spark.scheduler.Task.run(Task.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

但是当我将udf定义为:

val f = udf((s:String) => 5)

它工作得很好.这里有什么问题?最终目标是获取一个具有scala函数defn的字符串并将其用作udf.

解决方法

正如Giovanny所观察到的,问题在于类加载器是不同的(你可以通过在任何对象上调用.getClass.getClassLoader来更多地研究它).然后,当工作人员尝试反序列化你反射的函数时,所有的地狱都会崩溃.

这是一个不涉及任何类加载器hackery的解决方案.我们的想法是将反思步骤转移给工人.我们最终不得不重做反射步骤,但每个工人只需要重做一次.我认为这是非常优化的 – 即使你只在主节点上进行一次反射,你也必须为每个工作人员做一些工作才能让他们识别这个功能.

val f = udf {
  new Function1[String,Int] with Serializable {
    import scala.reflect.runtime.universe._
    import scala.reflect.runtime.currentMirror
    import scala.tools.reflect.ToolBox

    lazy val toolbox = currentMirror.mkToolBox()
    lazy val func = {
      println("reflected function") // triggered at every worker
      toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
    }

    def apply(s: String): Int = func(s)
  }
}

然后,调用sc.parallelize(Seq(“1”,“5”)).toDF.select(f(col(“value”))).show工作得很好.

随意注释println – 这只是计算反射发生次数的简单方法.在spark-shell –master’local’中只有一次,但是在spark-shell –master’local [2]中它只有两次.

这个怎么运作

UDF会立即得到评估,但在它到达工作节点之前永远不会被使用,所以只能在工作者上评估惰性值工具箱和func.此外,由于它们很懒惰,因此每个工人只能评估一次.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读