scala – SparkContext在配对对象中不可序列化
我目前正在尝试扩展使用
Scala和Spark的机器学习应用程序.我正在使用我在
Github上发现的Dieterich Lawson先前项目的结构
https://github.com/dieterichlawson/admm 该项目基本上使用SparkContext构建训练样本块的RDD,然后对每个集合执行局部计算(例如求解线性系统). 我遵循相同的方案,但对于我的本地计算,我需要在每个训练样本块上执行L-BFGS算法.为了做到这一点,我想使用mlLib中的L-BFGS算法,该算法具有以下特征. runLBFGS(RDD<scala.Tuple2<Object,Vector>> data,Gradient gradient,Updater updater,int numCorrections,double convergenceTol,int maxNumIterations,double regParam,Vector initialWeights) 如上所述,该方法将训练样本的RDD [Object,Vector]作为输入.问题是在每个工作者本地我不再保留数据的RDD结构.因此,我正在尝试在矩阵的每个块上使用SparkContext的并行化功能.但是当我这样做时,我得到了一个序列化器异常. (确切的异常消息在问题的最后). 这是我如何处理SparkContext的详细解释. 首先,在主应用程序中,它用于打开文本文件,它在LogRegressionXUpdate类的工厂中使用: val A = sc.textFile("ds1.csv") A.checkpoint val f = LogRegressionXUpdate.fromTextFile(A,params.rho,1024,sc) 在应用程序中,LogRegressionXUpdate类实现如下 class LogRegressionXUpdate(val training: RDD[(Double,NV)],val rho: Double) extends Function1[BDV[Double],Double] with Prox with Serializable{ def prox(x: BDV[Double],rho: Double): BDV[Double] = { val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 20 val regParam = 0.1 val (weights,loss) = LBFGS.runLBFGS( training,new GradientForLogRegADMM(rho,fromBreeze(x)),new SimpleUpdater(),numCorrections,convergenceTol,maxNumIterations,regParam,fromBreeze(x)) toBreeze(weights.toArray).toDenseVector } def apply(x: BDV[Double]): Double = { Math.pow(1,2.0) } } 使用以下配套对象: object LogRegressionXUpdate { def fromTextFile(file: RDD[String],rho: Double,blockHeight: Int = 1024,@transient sc: SparkContext): RDF[LogRegressionXUpdate] = { val fns = new BlockMatrix(file,blockHeight).blocks. map(X => new LogRegressionXUpdate(sc.parallelize((X(*,::).map(fila => (fila(-1),fromBreeze(fila(0 to -2))))).toArray),rho)) new RDF[LogRegressionXUpdate](fns,0L) } } 这个构造函数导致序列化错误,虽然我并不是真的需要SparkContext来在本地构建每个RDD.我已经搜索了这个问题的解决方案,添加@transient并没有解决它. 错误日志: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at admm.functions.LogRegressionXUpdate$.fromTextFile(LogRegressionXUpdate.scala:70) at admm.examples.Lasso$.run(Lasso.scala:96) at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:70) at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:69) at scala.Option.map(Option.scala:145) at admm.examples.Lasso$.main(Lasso.scala:69) at admm.examples.Lasso.main(Lasso.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext,value: org.apache.spark.SparkContext@20576557) - field (class: admm.functions.LogRegressionXUpdate$$anonfun$1,name: sc$1,type: class org.apache.spark.SparkContext) - object (class admm.functions.LogRegressionXUpdate$$anonfun$1,<function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 21 more 解决方法
只能从驱动程序访问RDD.每当你打电话的时候
myRDD.map(someObject.someMethod) spark序列化计算someMethod所需的任何内容,并将其发送给worker.在那里,该方法被反序列化,然后它独立地在每个分区上运行. 但是,您尝试使用本身使用spark的方法:您尝试创建新的RDD.但是,这是不可能的,因为它们只能在驱动程序中创建.您看到的错误是spark尝试序列化spark上下文本身,因为每个块的计算都需要它.有关序列化的更多信息可以在this问题的第一个答案中找到. “……虽然我并不是真的需要SparkContext来在本地构建每个RDD” – 实际上这正是你在调用sc.parallelize时所做的.底线 – 您需要找到(或写入)L-BFGS的本地实现. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |