我有DocsRDD:RDD [String,String]

val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)

DocsRDD:
Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n  .....\n bla bla bla bla \n ... bla

是否有一种有效,优雅的方法通过mapPartitions从中提取n-gram?
到目前为止,我已经尝试了所有方法,已经阅读了至少5次以上关于mapPartitions的内容,但是我仍然不知道如何使用它!似乎有点难以操作。
总之我想:
val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )

但使用mapPartitions却很有效。
我对mapPartitions的基本误解是:

OneDocRDD:RDD [字符串]
 val OneDocRDD = sc.textFile("myDoc1.txt" , 2)
                   .mapPartitions(s1 : Iterator[String] => s2 : Iterator[String])

我无法理解这!从s1开始是Iterator [String]? s1是sc.textfile之后的String。

好吧,我的第二个问题是:在这种情况下,mapPartitions是否可以改善我克服map的能力?

最后但并非最不重要:
f()可以是:
     f(Iterator[String]) : Iterator[Something else?]

最佳答案

我不确定.mapPartitions是否会有所帮助(至少没有给出示例),但是使用.mapPartitions看起来像:

val OneDocRDD = sc.textFile("myDoc1.txt", 2)
  .mapPartitions(iter => {
    // here you can initialize objects that you would need
    // that you want to create once by worker and not for each x in the map.
    iter.map(x => (x._1 , x._2.sliding(n)))
  })

通常,您想使用.mapPartitions创建/初始化您不想要的对象(例如:太大)或无法序列化到工作节点。如果没有.mapPartitions,则需要在.map中创建它们,但是这样做效率不高,因为将为每个x创建对象。

07-24 13:51