scala – 是否可以使用Apache Spark进行递归计算?
发布时间:2020-12-16 18:39:19 所属栏目:安全 来源:网络整理
导读:我正在使用 Scala和Apache Spark开发国际象棋引擎(我需要强调的是,我的理智不是这个问题的主题).我的问题是Negamax算法本质上是递归的,当我尝试天真的方法时: class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable { val movesOrderi
我正在使用
Scala和Apache Spark开发国际象棋引擎(我需要强调的是,我的理智不是这个问题的主题).我的问题是Negamax算法本质上是递归的,当我尝试天真的方法时:
class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable { val movesOrdering = new Ordering[Tuple2[Move,Double]]() { override def compare(x: (Move,Double),y: (Move,Double)): Int = Ordering[Double].compare(x._2,y._2) } def negaMaxSparkHelper(game: Game,color: PieceColor,depth: Int,previousMovesPar: RDD[Move]): (Move,Double) = { val board = game.board if (depth == 0) { (null,NegaMax.evaluateDefault(game,color)) } else { val moves = board.possibleMovesForColor(color) val movesPar = previousMovesPar.context.parallelize(moves) val moveMappingFunc = (m: Move) => { negaMaxSparkHelper(new Game(board.boardByMakingMove(m),color.oppositeColor,null),depth - 1,movesPar) } val movesWithScorePar = movesPar.map(moveMappingFunc) val move = movesWithScorePar.min()(movesOrdering) (move._1,-move._2) } } def negaMaxSpark(game: Game,depth: Int): (Move,Double) = { if (depth == 0) { (null,color)) } else { val movesPar = sc.parallelize(new Array[Move](0)) negaMaxSparkHelper(game,color,depth,movesPar) } } } class NegaMaxSparkBot(val maxDepth: Int,sc: SparkContext) extends Bot { def nextMove(game: Game): Move = { val nms = new NegaMaxSparc(sc) nms.negaMaxSpark(game,game.colorToMove,maxDepth)._1 } } 我明白了: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver,not inside of other transformations; for example,rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information,see SPARK-5063. 问题是:这个算法可以使用Spark递归实现吗?如果没有,那么解决该问题的适当Spark方式是什么? 解决方法
这是一个在实现方面有意义的限制,但使用起来可能很麻烦.
您可以尝试将递归拉出到顶级,只需在创建和运行RDD的“驱动程序”代码中?就像是: def step(rdd: Rdd[Move],limit: Int) = if(0 == limit) rdd else { val newRdd = rdd.flatMap(...) step(newRdd,limit - 1) } 或者,通过手动显式管理“堆栈”,总是可以将递归转换为迭代(尽管它可能导致更繁琐的代码). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |