目录
5. 排序 6. SequenceFileOutputFormat 和 SequenceFileInputFormat
7. 配置Combiner(合并)优化 8. Partitioner(分区)优化
继上一篇:https://blog.csdn.net/gaofengyan/article/details/86300388 对mapreduce运算框架初步学习之后,这里对MapReduce框架做一个优化案例,以音乐排行榜为例。案例涉及知识:Top K排序;Combiner(合并)优化处理;Partitioner(分区)优化等。具体用一下案例做引导:
1. 小项目
题材:音乐排行榜播放记录 数据格式:
统计信息:
1) 歌名 播放次数(这是第一次运算后得到的文件内容结果)
如: music1 2
music2 3
music3 1
2)按照播放次数降序来显示步骤1的统计信息(这是最终要达到的没有分区的目的效果)
如: 3 music2
2 music1
1 music3
准备以上题材,在桌面创建music.txt文件,并编辑内容:
[hduser@node1 桌面]$ vi music.txt
10001 music1 20180101 sing1
10002 music2 20180301 sing2
10003 music3 20180301 sing1
10004 music2 20180709 sing2
10005 music2 20180621 sing2
10006 music1 20180511 sing1
保存后退出,将music.txt文件copy到分布式文件夹存储(我的路径是在 【/input/下】),发布后检查我这里就不写了:
[hduser@node1 ~]$ ./hadoop/bin/hadoop dfs -put ./桌面/music.txt /input/
跟之前一样准备工作(使用虚拟机中安装的eclipse作业):
1)新建一个mapreduce项目:musicdemo【new】->【project】->【Map/Reduce Project】->【project name:musicdemo】->【finish】
2)新建资源文件夹:【右键项目名】->【new】->【source folder】->【folder name:resource】->【finish】
3)增加配置文件 core-site.xml , log4j.properties (两个配置文件在hadoop安装目录下的conf文件都能找到,可直接使用)
4)新建一个class 【MusicJob】(开发map、开发reduce、创建job并执行)
2. Map开发
这里需要截取我们需要操作的有效数据字段,生成我们需要的单独的文件输入进行计算。即下面的将每一行需要切片的数据按照空格进行拆分成数组【方法:因为Mapper切片操作本身就是对文本的每一行进行切片,所以,相当于一个循环操作;那这里先将每一行的数据按照文本格式的空格进行拆分成数组,需要先转化成字符串数组;然后进行筛选(筛选的方法有很多种,面对复杂的文本进行有效数据的拆分就会有对应的拆分格式);因为我们这里给的文本格式很整齐,并且去的是数组第2号元素;再将Java字符串转化成hadoop格式的字符串类型(Text);然后作为输出值即可】:
/**
* 开发map
*
* @author hduser 第1个参数:map 输入键的类型 第2个参数:map 输入值的类型 第3个参数:map 输出键的类型
* 第4个参数:map 输出值的类型
*/
public static class MusicMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 每一个切片 会执行一次map方法, keyIn是每一行的键, valueIn 是每一行的值, context
* 是上下文容器,用于将map的结果输出到下一步 wordcount map 把value拆成单个单词
*
* @throws InterruptedException
* @throws IOException
*/
public void map(Object keyIn, Text valueIn, Context ctx) throws IOException, InterruptedException {
// 固定值1 作为输出值
IntWritable valueOut = new IntWritable(1);
Text keyOut = null;
// 将每一行需要切片的数据按照空格进行拆分成数组
String[] str = valueIn.toString().split("\\s+");
if (str.length >= 3) {//这一步是做判断筛选,即长度大于等于3的行才是下面要进行切片操作的
ctx.write(new Text(str[1]), valueOut);
}
}
}
3. Reduce 开发
// 开发reduce
public static class MusicReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text keyIn, Iterable<IntWritable> valuesIn, Context ctx)
throws IOException, InterruptedException {
Text keyOut = keyIn;
// 输出值
IntWritable valueOut = new IntWritable();
int sum = 0;
// 循环混洗后的数字数组,如[1,1,1,1,1]
for (IntWritable val : valuesIn) {
sum += val.get(); // 转成int型 , 做累加
}
valueOut.set(sum); // 转成字符串输出去,将累加的结果转化为IntWritable类型
ctx.write(keyOut, valueOut); // 输出到下一步
}
}
4. 反转键值操作Map开发
在第一次运算的时候map的键值(key/value)组合是字符串(Text)/数字(IntWritable)方式,而mapreduce的排序指的是主要用于 mapreduce 中key的排序;所以,我们是利用第一次运算得到的需求文件再进行一次排序操作运算,第一次操作后得到的文件内容格式如下:
1) 歌名 播放次数(这是第一次运算后得到的文件内容结果)
music1 2
music2 3
music3 1
为了方便后面混洗和reduce的计算,我们将键值对进行反转操作:
// 反转键值
public static class SoutMusicMapper extends Mapper<Text, IntWritable, IntWritable, Text> {
public void map(Text keyIn, IntWritable valueIn, Context ctx) throws InterruptedException, IOException {
ctx.write(valueIn, keyIn);
}
}
5. 排序
5.1 概念
mapreduce的排序指的是主要用于 mapreduce 中key的排序,默认按照key升序排序(从小到大)。排序主要用于最终输出之前。
5.2 实现排序
5.2.1 定义一个排序类
1)定义一个静态排序类,继承需要的排序父类(我们样例使用的是数字排序父类)
// 排序(降序)
public static class MusicCountDesc extends IntWritable.Comparator
2)重写排序父类中的核心方法
// 重写两个排序方法
public int compara(Object a, Object b) {
return -super.compare(a, b); // 比较结果取负数即可降序排序
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
5.2.2 配置排序类到 scjob 【注意,这里的配置排序类到job是第二次的,所以是新创建的 scjob】
// 排序不是用默认排序 而是用我们自定义的排序类
scjob.setSortComparatorClass(MusicCountDesc.class);
6. SequenceFileOutputFormat 和 SequenceFileInputFormat
6.1 SequenceFileOutputFormat
主要是用于 mapreduce 最终输出阶段设置输出的格式类型为SequenceFile类型(字节对象型),类似于ObjectOutputStream(对象输出流)
样例:
// 8)设置输出内容是sequence 类似与对象输出流
job.setOutputFormatClass(SequenceFileOutputFormat.class);
6.2 SequenceFileInputFormat
主要用于 mapreduce 录入的数据是 SequenceFile 类型,该类型必须设置job的inputFormat为SequenceFileInputFormat。
样例:
// 3)设置job的map自定义静态类
scjob.setMapperClass(SoutMusicMapper.class);
scjob.setInputFormatClass(SequenceFileInputFormat.class);
7. 配置Combiner(合并)优化
在第一次运算的map之后,混洗之前的工作,表面上是看不出来有什么效果,但是可以从运算时间上去探索他的奥秘,合并的目的本身就是为了节约网络运算成本,提高效率。
7.1 概念
map处理后的数据(因为文件是被切割成64MB的块级进行运算,每一个块级里面有可能会存在多个相同的切割数据,而map输出的键值格式是单一的字符串和数字的方式,而下一步shuffle混洗的输入是运算上一个map输出的多个块级的相同字符串进行值为【1】的固定输入格式。)直接混洗,网络成本会太大,如map有1亿条数据,那么map就需要将处理过的这1亿条数据都传输给混洗节点。优化:如果将map处理后的数据(每一块级里面的相同字符串数据)先做以下合并,如:
没有合并的操作计算方式: 增加合并配置的计算方式:
合并后混洗需要运算的总数据量(所有块级里的数据总量)就会少了很多,再进行shuffle混洗操作,就会节省很多网络开支。
7.2 语法 注意:【合并一般配置都是reduce处理类】
// 配置combiner合并
job.setCombinerClass(MusicReducer.class);//一般配置都是reduce处理类
8. Partitioner(分区)优化
8.1 概念
mapreduce job 混洗的数据交给 reduce task(任务),而默认只有一个分区,所以只有一个reduce task(任务),一个reduce task(任务)产生一个输出文件。
8.2 分区目的
简单地说:分区的目的是让多个分机进行运算,提高运算效率。作用域也是在最终的输出阶段才有明显效果。
8.3 语法
1)定义一个静态分区类;混洗后的数据交给reduce运算是用的第一个参数值“key”【IntWritable key】,因此用第一个参数值进行分区操作:
// 定义一个分区类
public static class MyPartitioner extends Partitioner<IntWritable, Text> {
@Override
public int getPartition(IntWritable key, Text arg1, int arg2) {
if (key.get() >= 2) {// 分区2个
return 0;
} else {
return 1;
}
}
}
2)将自定义的静态分区类配置给job
// 配置分区类
scjob.setPartitionerClass(MyPartitioner.class);
3)设置job的reduce task 数量(即有几个分区)。
// 设置reduce数量
scjob.setNumReduceTasks(2);
9. 分区的job和task拓展
默认情况下:一个job(作业)-->对应多个map task(任务) --> shuffle(混洗)-->对应一个reduce-->每一个reduce对应一个输出文件;分区操作之后的结果是,分了几个区,就会有几台机子做运算(前提是有足够多的分机),就会得到几个运算输出文件。
10. 完整代码
当然以上过程到这里就算结束了,这里还是可以继续进行优化操作的,根据操作的数据量大小选择合适的方式即可。
package org.kgc1803.musicdemo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
/**
* 音乐排行榜统计小实战
*
* @author hduser
*
*/
public class MusicJob {
/**
* 开发map
*
* @author hduser 第1个参数:map 输入键的类型 第2个参数:map 输入值的类型 第3个参数:map 输出键的类型
* 第4个参数:map 输出值的类型
*/
public static class MusicMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 每一个切片 会执行一次map方法, keyIn是每一行的键, valueIn 是每一行的值, context
* 是上下文容器,用于将map的结果输出到下一步 wordcount map 把value拆成单个单词
*
* @throws InterruptedException
* @throws IOException
*/
public void map(Object keyIn, Text valueIn, Context ctx) throws IOException, InterruptedException {
// 固定值1 作为输出值
IntWritable valueOut = new IntWritable(1);
Text keyOut = null;
// this is hadoop application.
// StringTokenizer token = new StringTokenizer(valueIn.toString());
// 按照迭代器用法使用
// while (token.hasMoreTokens()) {
// String key = token.nextToken();
// keyOut = new Text(key);
// ctx.write(keyOut, valueOut);
// }
// 将每一行需要切片的数据按照空格进行拆分成数组
String[] str = valueIn.toString().split("\\s+");
if (str.length >= 3) {//这一步是做判断筛选,即长度大于等于3的行才是下面要进行切片操作的
ctx.write(new Text(str[1]), valueOut);
}
}
}
// 开发reduce
public static class MusicReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text keyIn, Iterable<IntWritable> valuesIn, Context ctx)
throws IOException, InterruptedException {
Text keyOut = keyIn;
// 输出值
IntWritable valueOut = new IntWritable();
int sum = 0;
// 循环混洗后的数字数组,如[1,1,1,1,1]
for (IntWritable val : valuesIn) {
sum += val.get(); // 转成int型 , 做累加
}
valueOut.set(sum); // 转成字符串输出去,将累加的结果转化为IntWritable类型
ctx.write(keyOut, valueOut); // 输出到下一步
}
}
// 反转键值
public static class SoutMusicMapper extends Mapper<Text, IntWritable, IntWritable, Text> {
public void map(Text keyIn, IntWritable valueIn, Context ctx) throws InterruptedException, IOException {
ctx.write(valueIn, keyIn);
}
}
/**
* 排序(降序)
* 这里用的是数字排序IntWritable.Comparator,
* 在排序过程中进行两两比较时会去调用此类的compara()方法,返回值-1,0,1分别表示小于,等于,大于。
* 所以只需要重写compara()方法,并将返回值取反就可以实现倒序比较器。
*
* @author hduser
*
*/
public static class MusicCountDesc extends IntWritable.Comparator {
// 重写两个排序方法
public int compara(Object a, Object b) {
return -super.compare(a, b); // 比较结果取负数即可降序排序
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
// 定义一个分区类
public static class MyPartitioner extends Partitioner<IntWritable, Text> {
@Override
public int getPartition(IntWritable key, Text arg1, int arg2) {
if (key.get() >= 2) {// 分区2个
return 0;
} else {
return 1;
}
}
}
public static void main(String[] args) throws Exception {
// 创建job 执行job
// 1)加载hdfs配置文件(配置hdfs访问入口)
Configuration conf = new Configuration();
// 2)创建一个job并确定设置job(运算作业)的主启动类。
Job job = Job.getInstance(conf);
job.setJarByClass(MusicJob.class);
// 3)设置job的map自定义静态类
job.setMapperClass(MusicMapper.class);
// 4)设置job的reduce自定义静态类
job.setReducerClass(MusicReducer.class);
// 5) 配置最终输出(reduce)的输出键和值的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 配置combiner合并
job.setCombinerClass(MusicReducer.class);
// 6)mapreduce 作业需要的资源位置(总输入位置)
Path inputPath1 = new Path("hdfs://node1:9000/input/music.txt");
FileInputFormat.addInputPath(job, inputPath1);
// 7)mapreduce 作业结果的保存位置(总输出位置)
Path outputPath = new Path("hdfs://node1:9000/output/music");
// 删除已存在的输出文件夹(因为输出文件夹是唯一的,启动后自动创建)
if (FileSystem.get(conf).exists(outputPath)) {
FileSystem.get(conf).delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
// 8)设置输出内容是sequence 类似与对象输出流
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 9) 第二次排序运算启动
if (job.waitForCompletion(true)) {
// 2)创建一个job并设置job(运算作业)的主启动类
Job scjob = Job.getInstance(conf);
scjob.setJarByClass(MusicJob.class);
// 3)设置job的map自定义静态类
scjob.setMapperClass(SoutMusicMapper.class);
scjob.setInputFormatClass(SequenceFileInputFormat.class);
// 5)配置最终输出(reduce)的输出键和值的类型
scjob.setOutputKeyClass(IntWritable.class);
scjob.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(scjob, outputPath);
Path outputPath1 = new Path("hdfs://node1:9000/output/music2");
// 删除已存在的输出文件夹(因为输出文件夹是唯一的,启动后自动创建)
if (FileSystem.get(conf).exists(outputPath1)) {
FileSystem.get(conf).delete(outputPath1, true);
}
FileOutputFormat.setOutputPath(scjob, outputPath1);
// 排序不是用默认排序 而是用我们自定义的排序类
scjob.setSortComparatorClass(MusicCountDesc.class);
// 配置分区类
scjob.setPartitionerClass(MyPartitioner.class);
// 设置reduce数量
scjob.setNumReduceTasks(2);
System.exit(scjob.waitForCompletion(true) ? 0 : 1);
}
}
}
里面加了一个小工具,因为我们在写的时候会随时测试,为了不让输出的文件随时改动,且要保持唯一性,即加了一个删除的小工具:
// 删除已存在的输出文件夹(因为输出文件夹是唯一的,启动后自动创建)
if (FileSystem.get(conf).exists(outputPath1)) {
FileSystem.get(conf).delete(outputPath1, true);
}
结果:
备注:完整代码是可以直接贴进去使用的,自己准备材料即可。