加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 是否可以使用Apache Spark进行递归计算?

发布时间:2020-12-16 18:39:19 所属栏目:安全 来源:网络整理
导读:我正在使用 Scala和Apache Spark开发国际象棋引擎(我需要强调的是,我的理智不是这个问题的主题).我的问题是Negamax算法本质上是递归的,当我尝试天真的方法时: class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable { val movesOrderi
我正在使用 Scala和Apache Spark开发国际象棋引擎(我需要强调的是,我的理智不是这个问题的主题).我的问题是Negamax算法本质上是递归的,当我尝试天真的方法时:

class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable  {
  val movesOrdering = new Ordering[Tuple2[Move,Double]]() {
    override def compare(x: (Move,Double),y: (Move,Double)): Int =
      Ordering[Double].compare(x._2,y._2)
  }

  def negaMaxSparkHelper(game: Game,color: PieceColor,depth: Int,previousMovesPar: RDD[Move]): (Move,Double) = {
    val board = game.board

    if (depth == 0) {
      (null,NegaMax.evaluateDefault(game,color))
    } else {
      val moves = board.possibleMovesForColor(color)
      val movesPar = previousMovesPar.context.parallelize(moves)

      val moveMappingFunc = (m: Move) => { negaMaxSparkHelper(new Game(board.boardByMakingMove(m),color.oppositeColor,null),depth - 1,movesPar) }
      val movesWithScorePar = movesPar.map(moveMappingFunc)
      val move = movesWithScorePar.min()(movesOrdering)

      (move._1,-move._2)
    }
  }

  def negaMaxSpark(game: Game,depth: Int): (Move,Double) = {
    if (depth == 0) {
      (null,color))
    } else {
      val movesPar = sc.parallelize(new Array[Move](0))

      negaMaxSparkHelper(game,color,depth,movesPar)
    }
  }
}

class NegaMaxSparkBot(val maxDepth: Int,sc: SparkContext) extends Bot {
  def nextMove(game: Game): Move = {
    val nms = new NegaMaxSparc(sc)
    nms.negaMaxSpark(game,game.colorToMove,maxDepth)._1
  }
}

我明白了:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver,not inside of other transformations; for example,rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information,see SPARK-5063.

问题是:这个算法可以使用Spark递归实现吗?如果没有,那么解决该问题的适当Spark方式是什么?

解决方法

这是一个在实现方面有意义的限制,但使用起来可能很麻烦.

您可以尝试将递归拉出到顶级,只需在创建和运行RDD的“驱动程序”代码中?就像是:

def step(rdd: Rdd[Move],limit: Int) =
  if(0 == limit) rdd
  else {
    val newRdd = rdd.flatMap(...)
    step(newRdd,limit - 1)
  }

或者,通过手动显式管理“堆栈”,总是可以将递归转换为迭代(尽管它可能导致更繁琐的代码).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读