问题
输入数据有两种类型的记录,我们称它们为RW
我需要按从上到下的顺序遍历这些数据,这样,如果当前记录的类型是W,则必须将其与映射合并(我们称之为workMap)。如果该W-type记录的键已经存在于映射中,则该记录的值将被添加到其中,否则将在workMap中创建一个新条目。
如果当前记录的类型为R,则在此记录之前计算的workMap将附加到当前记录。
例如,如果这是记录的顺序-

W1-   a -> 2
W2-   b -> 3
W3-   a -> 4
R1
W4-   c -> 1
R2
W5-   c -> 4

其中,w1、w2、w3、w4和w5为W型;r1和r2为R型。
在这个函数的末尾,我应该有-
R1 - { a -> 6,
       b -> 3 } //merged(W1, W2, W3)
R2 - { a -> 6,
       b -> 3,
       c -> 1 } //merged(W1, W2, W3, W4)
{ a -> 6,
  b -> 3,
  c -> 5 } //merged(W1, W2, W3, W4, W5)

我希望所有附加到中间workMaps的r-type记录都计算到该点;最后一个记录处理完之后的最后一个workMap
这是我写的代码-
def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
  Iterator[(ReportKey, ReportVal)] = {

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
    val reportList = mutable.ArrayBuffer.empty[(ReportKey, Reportval)]

    while (itr.hasNext) {
      val temp = itr.next()
      val (iKey, iVal) = (temp._1, temp._2)

      if (iKey.recordType == reportType) {
       //creates a new (ReportKey, Reportval)
        reportList += getNewReportRecord(workMap, iKey, iVal)
      }
      else {
        //if iKey is already present, merge the values
        //other wise adds a new entry
        updateWorkMap(workMap, iKey, iVal)
      }
    }
    val workList: Seq[(ReportKey, ReportVal)] = workMap.toList.map(convertToReport)

    reportList.iterator ++ workList.iterator
  }

课堂是这样的-
case class ReportKey (
                        // the type of record - report or work
                        rType: Int,
                        date: String,
                      .....
                       )

这种方法有两个问题需要我的帮助-
我必须跟踪一个ReportKey-一个带有中间reportLists的R类型记录列表。随着数据的增长,workMap也会增长,我会遇到reportLists。
我必须将OutOfMemoryExceptionreportList记录合并到同一数据结构中,然后返回它们如果有其他优雅的方式,我一定会考虑改变这个设计。
为了完整起见-我正在使用spark函数workMap作为RDD上映射分区的参数传递。我需要每个分区的calcPerPartitions在以后做一些额外的计算。
我知道,如果不必从每个分区返回workMaps,问题就会变得简单得多,如下所示-
...
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
itr.scanLeft[Option[(ReportKey, Reportval)]](
  None)((acc: Option[(ReportKey, Reportval)],
  curr: (InputKey, InputVal)) => {

  if (curr._1.recordType == reportType) {
    val rec = getNewReportRecord(workMap, curr._1, curr._2)
    Some(rec)
  }
  else {
    updateWorkMap(workMap, curr._1, curr._2)
    None
  }
})

val reportList = scan.filter(_.isDefined).map(_.get)
//workMap is still empty after the scanLeft.
...

当然,我可以对输入数据执行workMap操作以导出最终的reduce,但我需要看两次数据考虑到输入数据集很大,我也希望避免这种情况。
但不幸的是,我需要在后面的步骤workMaps。
那么,有没有更好的方法来解决上述问题呢?如果我根本无法解决问题2(according to this),是否有其他方法可以避免在列表中存储workMap记录(R)或多次扫描数据?

最佳答案

对于第二个问题,我还没有更好的设计——如果您可以避免将reportListworkMap组合到一个数据结构中,但我们当然可以避免在列表中存储R类型的记录。
下面是我们如何根据上述问题重新编写calcPerPartition-

def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
  Iterator[Option[(ReportKey, ReportVal)]] = {

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
    var finalWorkMap = true

    new Iterator[Option[(ReportKey, ReportVal)]](){
        override def hasNext: Boolean = itr.hasNext

        override def next(): Option[(ReportKey, ReportVal)] = {
            val curr = itr.next()
            val iKey = curr._1
            val iVal = curr._2
            val eventKey = EventKey(openKey.date, openKey.symbol)

            if (iKey.recordType == reportType) {
              Some(getNewReportRecord(workMap, iKey, iVal))
            }
            else {
              //otherwise update the generic interest map but don't accumulate anything
              updateWorkMap(workMap, iKey, iVal)
              if (itr.hasNext) {
                next()
              }
              else {
                  if(finalWorkMap){
                    finalWorkMap = false //because we want a final only once
                    Some(workMap.map(convertToReport))
                  }
                  else {
                    None
                  }

              }
            }
        }
    }
  }

我们没有将结果存储在列表中,而是定义了一个迭代器。这解决了我们在这个问题上的大部分记忆问题。

关于algorithm - 使用Scala和Spark扫描数据的更好方法,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43011052/

10-10 05:18