scala – 关于spark的序列化异常
我在Spark遇到一个关于序列化的一个非常奇怪的问题.
代码如下: class PLSA(val sc : SparkContext,val numOfTopics : Int) extends Serializable { def infer(document: RDD[Document]): RDD[DocumentParameter] = { val docs = documents.map(doc => DocumentParameter(doc,numOfTopics)) docs } } 其中Document定义为: class Document(val tokens: SparseVector[Int]) extends Serializable 和DocumentParameter是: class DocumentParameter(val document: Document,val theta: Array[Float]) extends Serializable object DocumentParameter extends Serializable { def apply(document: Document,numOfTopics: Int) = new DocumentParameter(document,Array.ofDim[Float](numOfTopics)) } SparseVector是breeze.linalg.SparseVector中的可序列化类. 这是一个简单的映射过程,所有类都是可序列化的,但是我得到了这个异常: org.apache.spark.SparkException: Task not serializable 但是当我删除numOfTopics参数时,即: object DocumentParameter extends Serializable { def apply(document: Document) = new DocumentParameter(document,Array.ofDim[Float](10)) } 并称之为: val docs = documents.map(DocumentParameter.apply) 似乎没问题. Int类型不可序列化吗?但我确实看到一些代码是这样编写的. 我不知道如何修复这个bug. #更新#: 谢谢@samthebest.我将添加更多有关它的细节. stack trace: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at com.topicmodel.PLSA.infer(PLSA.scala:13) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37) at $iwC$$iwC$$iwC.<init>(<console>:39) at $iwC$$iwC.<init>(<console>:41) at $iwC.<init>(<console>:43) at <init>(<console>:45) at .<init>(<console>:49) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 46 more 由于堆栈跟踪提供了异常的一般信息,因此我将其删除. 我在spark-shell中运行代码. // suppose I have get RDD[Document] for docs val numOfTopics = 100 val plsa = new PLSA(sc,numOfTopics) val docPara = plsa.infer(docs) 你能给我一些关于可序列化的教程或技巧吗? 解决方法
匿名函数序列化其包含的类.当您映射{doc => DocumentParameter(doc,numOfTopics)},它赋予该函数访问numOfTopics的唯一方法是序列化PLSA类.并且该类实际上不能被序列化,因为(正如您从堆栈跟踪中看到的)它包含不可序列化的SparkContext(如果各个集群节点可以访问上下文并且可以例如创建新的作业,则会发生错误在映射器内).
通常,尽量避免将SparkContext存储在类中(编辑:或者至少,确保它非常清楚哪种类包含SparkContext以及哪种类不包含);最好将它作为(可能是隐式的)参数传递给需要它的各个方法.或者,移动函数{doc => DocumentParameter(doc,numOfTopics)}与PLSA成一个不同的类,一个真正可以序列化的类. (正如多人建议的那样,可以将SparkContext保留在类中但标记为@transient以便它不会被序列化.我不推荐这种方法;这意味着类在序列化时会“神奇地”改变状态(丢失SparkContext),因此当您尝试从序列化作业中访问SparkContext时,最终可能会遇到NPE.最好在仅在“控制”代码中使用的类之间保持清晰的区别(并且可能会使用SparkContext)和序列化以在集群上运行的类(不能有SparkContext)). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- 如何查看一个Application是32位的还是64位的?
- angular – 错误:类型typeof Observable上不存在属性计时器
- AngularJS和谷歌云端点:走完所需
- 即使根文件系统不存在,如何删除docker容器?
- bash – docker:shell脚本中的参考格式无效
- Bootstrap 网格系统 介绍
- Basic Tutorials of Redis(5) - Sorted Set
- 如何在Bluemix中将公共路由器分配给Docker组
- twitter-bootstrap – Bootstrap 4浏览器支持版本
- j2m2访问dotnet Webservice--以前写过,被到处转载,绝对原版