我有以下代码,其中错误位于sc.parallelize()
val pairs = ret.cartesian(ret)
.map {
case ((k1,v1),(k2,v2)) => ((k1,k2),(v1.toList,v2.toList))
}
for (pair <- pairs) {
val test = sc.parallelize(pair._2._1.map(_._1 ))
}
哪里
> k1,k2是字符串 > v1,v2是双打列表
每当我尝试访问sc时,我都会收到以下错误.我在这做错了什么?
Exception in thread “main” org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) at CorrelationCalc$.main(CorrelationCalc.scala:33) at CorrelationCalc.main(CorrelationCalc.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: – object not serializable (class: org.apache.spark.SparkContext,value: org.apache.spark.SparkContext@40bee8c5) – field (class: CorrelationCalc$$anonfun$main$1,name: sc$1,type: class org.apache.spark.SparkContext) – object (class CorrelationCalc$$anonfun$main$1,) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) … 20 more
解决方法
for-comprehension就是在做pair.map()
RDD操作由工作人员执行,并让他们完成这项工作,您发送给他们的任何内容都必须是可序列化的. SparkContext附加到master:它负责管理整个集群.
如果你想创建一个RDD,你必须知道整个集群(这是第二个“D”—分布式),所以你不能在worker上创建一个新的RDD.并且您可能不希望将每对成对转换为RDD(并且每个都具有相同的名称!).
从你的代码中很难说你想做什么,但它可能看起来像
val test = pairs.map( r => r._2._1)
这将是一个RDD,其中每一行都是v1.toList中的任何一行
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|