一:准备
1.源数据
2.上传数据
二:TopN程序编码
1.程序
package com.ibeifeng.bigdata.spark.core import java.util.concurrent.ThreadLocalRandom import org.apache.spark.{SparkConf, SparkContext} /**
* 分组TopN:按照第一个字段分组;同一组中,按照第二个字段进行排序;每一组中,获取出现最多的前K个数据。
* Created by ibf on 01/15.
*/
object GroupedTopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("grouped-topn")
//.set("spark.eventLog.enabled", "true")
//.set("spark.eventLog.dir", "hdfs://hadoop-senior01:8020/spark-history") val sc = SparkContext.getOrCreate(conf) // ==========具体代码逻辑========================
// 原始数据存储的路径, 需要自己上传
val path = "/user/beifeng/spark/groupedtopk/groupsort.txt"
val K = 3 // 构建rdd
val rdd = sc.textFile(path) // rdd操作
val word2CountRDD = rdd
.filter((line: String) => {
// 过滤空字符串,所以非空的返回true
!line.isEmpty
})
.map(line => {
// 按照空格分隔字段
val arr = line.split(" ")
// 将数据转换为二元组
(arr(0), arr(1).toInt)
}) // 如果一个RDD被多次使用,该RDD需要进行缓存操作
word2CountRDD.cache() // 直接使用groupByKey函数进行统计,这种方式存在OOM的情况
/*
val resultRDD = word2CountRDD
.groupByKey() // 按照第一个字段进行分组
.map(tuple => {
// 同一组的数据中获取前K个元素
// 获取对应分组
val word = tuple._1
// 获取前K个元素(最大的k个元素), list默认排序是升序, 所以采用takeRight从后往前获取K个元素(此时的K个元素就是最大的K个元素); 最后对K个元素进行反转,最终结果元素是从大到小排序的
val topk = tuple._2.toList.sorted.takeRight(K).reverse
// 返回结果
(word, topk)
})
*/ /*
* groupByKey存在OOM异常
* 解决方案:采用两阶段聚合操作
* 两阶段聚合可以解决的一些常见:
* 1. 聚合操作中存储的OOM异常
* 2. 聚合操作中存在的数据倾斜问题
* 聚合操作:分区、排序、reduceByKey.....
* */
val random = ThreadLocalRandom.current()
val resultRDD2 = word2CountRDD
.map(tuple => {
// 第一阶段第一步:在key前加一个随机数
((random.nextInt(100), tuple._1), tuple._2)
})
.groupByKey() // 第一阶段的第二步:按照修改后的key进行聚合操作
.flatMap(tuple => {
// 第一阶段的第三步:对一组value进行聚合操作
// 获取对应分组
val word = tuple._1._2
// 获取前K个
val topk = tuple._2.toList.sorted.takeRight(K).reverse
// 返回结果
topk.map(count => (word, count))
})
.groupByKey() // 第二阶段第一步:按照原本的key进行聚合操作
.map(tuple => {
// 第二阶段第二步: 获取前k个元素
val word = tuple._1
val topk = tuple._2.toList.sorted.takeRight(K).reverse
// 返回结果
(word, topk)
}) // 结果输出
resultRDD2.foreach(println)
/*
resultRDD2.foreachPartition(iter => {
// foreachPartition该函数常用于将RDD的数据输出到第三方的数据存储系统中,比如:redis、mongoDB
/*
* 1. 创建连接
* 2. 对iter进行迭代,进行数据输出
* 3. 关闭连接
* */
iter.foreach(println)
})
*/ // 如果RDD有cache,需要去除cache
word2CountRDD.unpersist() // ==========具体代码逻辑======================== sc.stop()
}
}
2.结果
3.注意点
Spark中不支持二次排序,如果想实现二次排序,需要根据业务的执行逻辑使用两阶段聚合来进行操作
二:优化
1.两阶段聚合