任何人都可以帮助接受返回给mapPartitions的Iterator listWords()方法。

object MapPartitionExample {

  def main(args: Array[String]): Unit = {

    val conf= new SparkConf().setAppName("MapPartitionExample").setMaster("local[*]")
    val sc= new SparkContext(conf)

    val input:RDD[String] = sc.parallelize(List("ABC","DEF","GHU","YHG"))

    val x= input.mapPartitions(word => listWords(word))


  }

  def listWords(words: Iterator[String]) : util.Iterator[String] = {

    val arrList = new util.ArrayList[String]()
    while( words.hasNext ) {
      arrList.add( words.next())
    }
    return arrList.iterator()
  }

}

最佳答案

mapPartitions中使用的函数的返回类型应该是scala.collection.Iterator,而不是java.util.Iterator。我看不到您当前代码的重点,但是您可以使用Scala可变集合:

import scala.collection.mutable.ArrayBuffer

def listWords(words: Iterator[String]) : Iterator[String] = {
  val arr = ArrayBuffer[String]()
  while( words.hasNext ) {
    arr += words.next()
  }
  arr.toIterator
}

我个人只是map:
def listWords(words: Iterator[String])  : Iterator[String] = {
   // Some init code
   words.map(someFunction)
}

关于scala - 映射分区迭代器返回,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45648913/

10-15 08:01