加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

序列化的Scala反射(通过Spark) – 符号不可序列化

发布时间:2020-12-16 18:31:26 所属栏目:安全 来源:网络整理
导读:首先,我使用的是 scala 2.10.4,上面的例子是在Spark 1.6中运行的(尽管我怀疑Spark与此有什么关系,但它只是一个序列化问题). 所以这是我的问题:假设我有一个特性基础,由两个B1和B2类实现.现在我有一个通用的特征,它由一组类扩展,其中一个类是Base的子类型,例
首先,我使用的是 scala 2.10.4,上面的例子是在Spark 1.6中运行的(尽管我怀疑Spark与此有什么关系,但它只是一个序列化问题).

所以这是我的问题:假设我有一个特性基础,由两个B1和B2类实现.现在我有一个通用的特征,它由一组类扩展,其中一个类是Base的子类型,例如(这里我保留了Spark的RDD概念,但实际上只要序列化它就可能是其他的东西;无论实际上是什么,都只是结果):

trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something  = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something  = ... }
...

现在我需要一个对象,它将采用RDD [T](假设这里没有模糊,它只是一个简化版本),它返回对应于类型T对应的函数结果的Something.但是它也适用于Array [T]采用合并策略.到目前为止它看起来像:

object Obj {
   def compute[T: TypeTag](input: RDD[T]): Something = {
      typeOf[T] match {
         case t if t <:< typeOf[A] => 
            val foo = new Foo[T]
            foo.function(rdd)
         case t if t <:< typeOf[Array[A]] => 
            val foo = new Foo[A]
            foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
         case t if t <:< typeOf[Base] => 
            val foo = new Foo[T]
            foo.function(rdd)
         // here it gets ugly...
         case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
            val tt = getSubInfo[T](0)
            val tpe = tt.tpe
            val foo = new Foo[tpe.type]
            foo.function(rdd.map(x => (x._1,mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
      }
   }

   // strategy to transform arrays of T into a T object when possible
   private def mergeArray[T: TypeTag](a: Array[T]): T = ... 

  // extract the subtype,e.g. if Array[Int] then at position 0 extracts a type tag for Int,I can provide the code but not fondamental for the comprehension of the problem though
   private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ... 
}

不幸的是,它似乎在本地机器上工作正常,但是当它被发送到Spark(序列化)时,我得到一个org.apache.spark.SparkException:任务不可序列化:

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol,value: package types)
    - field (class: scala.reflect.internal.Types$ThisType,name: sym,type: class scala.reflect.internal.Symbols$Symbol)

我确实有一个解决方法(很明显,列举了可能性),但是出于我的好奇心,有没有办法解决这个问题?为什么Symbol不能序列化,而它们在Manifests中的等价物呢?

谢谢您的帮助.

解决方法

TypeTags现在通常可以在scala中序列化,但奇怪的是,不是直接类型(这很奇怪,因为typetags包含的符号不是: – /).

这可能会做你想要的

// implicit constructor TypeTag parameter is serialized.
abstract class TypeAware[T:TypeTag] extends Serializable {
  def typ:Type = _typeCached

  @transient
  lazy val _typeCached:Type = typeOf[T]
}

trait Foo[T] extends Serializable { 
  def function(rdd: RDD[T]): Something  {... impl here?...}
  def typ:Type 
}

class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
   def function(rdd: RDD[T]): Something  {... impl here?...}
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读