加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala Spark – 任务不可序列化

发布时间:2020-12-16 09:03:17 所属栏目:安全 来源:网络整理
导读:我有以下代码,其中错误位于sc.parallelize() val pairs = ret.cartesian(ret) .map { case ((k1,v1),(k2,v2)) = ((k1,k2),(v1.toList,v2.toList)) }for (pair - pairs) { val test = sc.parallelize(pair._2._1.map(_._1 ))} 哪里 k1,k2是字符串 v1,v2是双打
我有以下代码,其中错误位于sc.parallelize()

val pairs = ret.cartesian(ret)
    .map {
        case ((k1,v1),(k2,v2)) => ((k1,k2),(v1.toList,v2.toList))
    }
for (pair <- pairs) {
    val test = sc.parallelize(pair._2._1.map(_._1 ))
}

哪里

> k1,k2是字符串
> v1,v2是双打列表

每当我尝试访问sc时,我都会收到以下错误.我在这做错了什么?

Exception in thread “main” org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at CorrelationCalc$.main(CorrelationCalc.scala:33)
at CorrelationCalc.main(CorrelationCalc.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
– object not serializable (class: org.apache.spark.SparkContext,value: org.apache.spark.SparkContext@40bee8c5)
– field (class: CorrelationCalc$$anonfun$main$1,name: sc$1,type: class org.apache.spark.SparkContext)
– object (class CorrelationCalc$$anonfun$main$1,)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
… 20 more

解决方法

for-comprehension就是在做pair.map()

RDD操作由工作人员执行,并让他们完成这项工作,您发送给他们的任何内容都必须是可序列化的. SparkContext附加到master:它负责管理整个集群.

如果你想创建一个RDD,你必须知道整个集群(这是第二个“D”—分布式),所以你不能在worker上创建一个新的RDD.并且您可能不希望将每对成对转换为RDD(并且每个都具有相同的名称!).

从你的代码中很难说你想做什么,但它可能看起来像

val test = pairs.map( r => r._2._1)

这将是一个RDD,其中每一行都是v1.toList中的任何一行

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读