我正在使用Scala和Apache Spark开发国际象棋引擎(我需要强调的是,我的理智不是这个问题的主题)。我的问题是Negamax算法本质上是递归的,当我尝试朴素的方法时:

class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable  {
  val movesOrdering = new Ordering[Tuple2[Move, Double]]() {
    override def compare(x: (Move, Double), y: (Move, Double)): Int =
      Ordering[Double].compare(x._2, y._2)
  }

  def negaMaxSparkHelper(game: Game, color: PieceColor, depth: Int, previousMovesPar: RDD[Move]): (Move, Double) = {
    val board = game.board

    if (depth == 0) {
      (null, NegaMax.evaluateDefault(game, color))
    } else {
      val moves = board.possibleMovesForColor(color)
      val movesPar = previousMovesPar.context.parallelize(moves)

      val moveMappingFunc = (m: Move) => { negaMaxSparkHelper(new Game(board.boardByMakingMove(m), color.oppositeColor, null), color.oppositeColor, depth - 1, movesPar) }
      val movesWithScorePar = movesPar.map(moveMappingFunc)
      val move = movesWithScorePar.min()(movesOrdering)

      (move._1, -move._2)
    }
  }

  def negaMaxSpark(game: Game, color: PieceColor, depth: Int): (Move, Double) = {
    if (depth == 0) {
      (null, NegaMax.evaluateDefault(game, color))
    } else {
      val movesPar = sc.parallelize(new Array[Move](0))

      negaMaxSparkHelper(game, color, depth, movesPar)
    }
  }
}

class NegaMaxSparkBot(val maxDepth: Int, sc: SparkContext) extends Bot {
  def nextMove(game: Game): Move = {
    val nms = new NegaMaxSparc(sc)
    nms.negaMaxSpark(game, game.colorToMove, maxDepth)._1
  }
}

我得到:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

问题是:可以使用Spark递归实现此算法吗?如果没有,那么解决该问题的正确方法是什么?

最佳答案

只有驱动程序才能在RDD上启动计算。原因是,尽管RDD像常规数据收集一样“感觉”,但在幕后它们仍然是分布式收集,因此在它们上启动操作需要协调所有远程从站上的任务执行,这在大多数情况下会向我们隐瞒。

因此无法从从站进​​行递归操作,即直接从从站动态启动新的分布式任务是不可能的:只有驱动器才能处理这种协调工作。

这是简化您的问题的一种可能选择(如果我没事的话)。这个想法是先建立Moves实例,每个实例代表从初始状态开始的Move的完整序列。

每个Moves实例都能够将自身转换为一组Moves,每个实例都对应于相同的Move序列加上一个可能的下一个Move

从那里开始,驱动程序只需要连续地对Moves进行flatMap,就可以达到我们想要的深度,然后生成的RDD [Moves]将为我们并行执行所有操作。

该方法的缺点是所有深度级别都保持同步,即,在进入下一个级别之前,我们必须以n级别(即RDD[Moves]级别的n)计算所有移动。

下面的代码未经测试,可能存在缺陷,甚至无法编译,但希望它提供了有关如何解决此问题的想法。

/* one modification to the board */
case class Move(from: String, to: String)

case class PieceColor(color: String)

/* state of the game */
case class Board {

    // TODO
    def possibleMovesForColor(color: PieceColor): Seq[Move] =
        Move("here", "there") :: Move("there", "over there") :: Move("there", "here") :: Nil

    // TODO: compute a new instance of board here, based on current + this move
    def update(move: Move): Board = new Board
}


/** Solution, i.e. a sequence of moves*/
case class Moves(moves: Seq[Move], game: Board, color: PieceColor) {
    lazy val score = NegaMax.evaluateDefault(game, color)

    /** @return all valid next Moves  */
    def nextPossibleMoves: Seq[Moves] =
        board.possibleMovesForColor(color).map {
            nextMove =>
              play.copy(moves = nextMove :: play.moves,
                        game = play.game.update(nextMove)
        }

}

/** Driver code: negaMax: looks for the best next move from a give game state */
def negaMax(sc: SparkContext, game: Board, color: PieceColor, maxDepth: Int):Moves = {

    val initialSolution = Moves(Seq[moves].empty, game, color)

    val allPlays: rdd[Moves] =
        (1 to maxDepth).foldLeft (sc.parallelize(Seq(initialSolution))) {
        rdd => rdd.flatMap(_.nextPossibleMoves)
    }

    allPlays.reduce { case (m1, m2) => if (m1.score < m2.score) m1 else m2}

}

10-05 23:18