问题描述
DEF trainBestSeller(事件:RDD [BuyEvent],N:智力,itemStringIntMap:BIMAP [字符串,INT]):地图[字符串,数组[(INT,INT)] = {
VAL itemTemp =事件
//地图从字符串整数索引项
.flatMap {
案例BuyEvent(用户,项目,类别,计数)如果itemStringIntMap.contains(项目)=>
有些((itemStringIntMap(项目),类别),计数)
案例_ =>无
}
//缓存以用于下一时间
.cache()
//每个分类顶视图:
VAL bestSeller_Category:地图[字符串,数组[(INT,INT)] = itemTemp.reduceByKey(_ + _)
.MAP(行=>(row._1._2,(row._1._1,row._2)))
.groupByKey
.MAP {情况(c,itemCounts)=>
(C,itemCounts.toArray.sortBy(_._ 2)(Ordering.Int.reverse)。取(N))
}
.collectAsMap.toMap
//所有类别= GT俯视图。 cateogory ALL
VAL bestSeller_All:地图[字符串,数组[(INT,INT)] = itemTemp.reduceByKey(_ + _)
.MAP(行=>(ALL,(row._1._1,row._2)))
.groupByKey
.MAP {
情况(c,itemCounts)=>
(C,itemCounts.toArray.sortBy(_._ 2)(Ordering.Int.reverse)。取(N))
}
.collectAsMap.toMap
//合并2地图bestSeller_All和bestSeller_Category
VAL畅销书= bestSeller_Category ++ bestSeller_All
畅销书
}
列表处理
您列表处理,似乎不错。我做了一个小复检
高清主(参数:数组[字符串]):单位= { 案例类的jstring(X:智力)
案例类CompactBuffer(X:智力,Y:智力) VAL L =名单(的jstring(2435)的jstring(3464))
VAL元组:(列表[的jstring],CompactBuffer)=(名单(的jstring(2435)的jstring(3464)),CompactBuffer(1,4)) VAL结果:列表[(的jstring,CompactBuffer)] = tuple._1.map((_,tuple._2))
VAL结果2:列表[(的jstring,CompactBuffer)] = {
VAL L = tuple._1
VAL CB = tuple._2
l.map(X =>(X,CB))
} 的println(结果)
的println(结果2)
}
结果是(预期)
列表((的jstring(2435),CompactBuffer(1,4)),(的jstring(3464),CompactBuffer(1,4)))
进一步分析
需要分析,如果不解决您的问题:
- 在哪里类型JStream(从org.json4s.JsonAST?)和CompactBuffer(星火我想)从?
- 究竟是如何看起来code,创建
对
?究竟是什么你在干什么?请提供code摘录!
def trainBestSeller(events: RDD[BuyEvent], n: Int, itemStringIntMap: BiMap[String, Int]): Map[String, Array[(Int, Int)]] = { val itemTemp = events // map item from string to integer index .flatMap { case BuyEvent(user, item, category, count) if itemStringIntMap.contains(item) => Some((itemStringIntMap(item),category),count) case _ => None } // cache to use for next times .cache()
// top view with each category:
val bestSeller_Category: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
.map(row => (row._1._2, (row._1._1, row._2)))
.groupByKey
.map { case (c, itemCounts) =>
(c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
}
.collectAsMap.toMap
// top view with all category => cateogory ALL
val bestSeller_All: Map[String, Array[(Int, Int)]] = itemTemp.reduceByKey(_ + _)
.map(row => ("ALL", (row._1._1, row._2)))
.groupByKey
.map {
case (c, itemCounts) =>
(c, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
}
.collectAsMap.toMap
// merge 2 map bestSeller_All and bestSeller_Category
val bestSeller = bestSeller_Category ++ bestSeller_All
bestSeller
}
List processing
Your list processing seems okay. I did a small recheck
def main( args: Array[String] ) : Unit = {
case class JString(x: Int)
case class CompactBuffer(x: Int, y: Int)
val l = List( JString(2435), JString(3464))
val tuple: (List[JString], CompactBuffer) = ( List( JString(2435), JString(3464)), CompactBuffer(1,4) )
val result: List[(JString, CompactBuffer)] = tuple._1.map((_, tuple._2))
val result2: List[(JString, CompactBuffer)] = {
val l = tuple._1
val cb = tuple._2
l.map( x => (x,cb) )
}
println(result)
println(result2)
}
Result is (as expected)
List((JString(2435),CompactBuffer(1,4)), (JString(3464),CompactBuffer(1,4)))
Further analysis
Analysis is required, if that does not solve your problem:
- Where are types JStream (from org.json4s.JsonAST ?) and CompactBuffer ( Spark I suppose ) from?
- How exactly looks the code, that creates
pair
? What exactly are you doing? Please provide code excerpts!
这篇关于错误处理斯卡拉名单的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!