目录

1. 小项目                                                              2. Map开发

3. Reduce 开发                                                    4. 反转键值操作Map开发

5. 排序                                                                  6. SequenceFileOutputFormat 和 SequenceFileInputFormat

7. 配置Combiner(合并)优化                                  8. Partitioner(分区)优化

9. 分区的job和task拓展                                        10. 完整代码


       继上一篇:https://blog.csdn.net/gaofengyan/article/details/86300388 对mapreduce运算框架初步学习之后,这里对MapReduce框架做一个优化案例,以音乐排行榜为例。案例涉及知识:Top K排序;Combiner(合并)优化处理;Partitioner(分区)优化等。具体用一下案例做引导:

1. 小项目

   题材:音乐排行榜播放记录 数据格式:

    统计信息:

        1)       歌名      播放次数(这是第一次运算后得到的文件内容结果
           如:  music1    2
                    music2    3
                    music3    1

        2)按照播放次数降序来显示步骤1的统计信息(这是最终要达到的没有分区的目的效果
           如:  3    music2
                    2    music1
                    1    music3

   准备以上题材,在桌面创建music.txt文件,并编辑内容:

[hduser@node1 桌面]$ vi music.txt
10001 music1 20180101 sing1
10002 music2 20180301 sing2
10003 music3 20180301 sing1
10004 music2 20180709 sing2
10005 music2 20180621 sing2
10006 music1 20180511 sing1

    保存后退出,将music.txt文件copy到分布式文件夹存储(我的路径是在 【/input/下】),发布后检查我这里就不写了:

[hduser@node1 ~]$ ./hadoop/bin/hadoop  dfs -put ./桌面/music.txt /input/

    Hadoop综合实战之MapReduce运算优化——音乐排行榜-LMLPHP

     跟之前一样准备工作(使用虚拟机中安装的eclipse作业):
         1)新建一个mapreduce项目:musicdemo【new】->【project】->【Map/Reduce Project】->【project name:musicdemo】->【finish】

         2)新建资源文件夹:【右键项目名】->【new】->【source folder】->【folder name:resource】->【finish】
         3)增加配置文件  core-site.xml  , log4j.properties (两个配置文件在hadoop安装目录下的conf文件都能找到,可直接使用)
         4)新建一个class 【MusicJob】(开发map、开发reduce、创建job并执行)

2. Map开发

        这里需要截取我们需要操作的有效数据字段,生成我们需要的单独的文件输入进行计算。即下面的将每一行需要切片的数据按照空格进行拆分成数组【方法:因为Mapper切片操作本身就是对文本的每一行进行切片,所以,相当于一个循环操作;那这里先将每一行的数据按照文本格式的空格进行拆分成数组,需要先转化成字符串数组;然后进行筛选(筛选的方法有很多种,面对复杂的文本进行有效数据的拆分就会有对应的拆分格式);因为我们这里给的文本格式很整齐,并且去的是数组第2号元素;再将Java字符串转化成hadoop格式的字符串类型(Text);然后作为输出值即可】:

/**
* 开发map
*
* @author hduser 第1个参数:map 输入键的类型 第2个参数:map 输入值的类型 第3个参数:map 输出键的类型
*  第4个参数:map 输出值的类型
*/
public static class MusicMapper extends Mapper<Object, Text, Text, IntWritable> {
	/**
	* 每一个切片 会执行一次map方法, keyIn是每一行的键, valueIn 是每一行的值, context
	* 是上下文容器,用于将map的结果输出到下一步 wordcount map 把value拆成单个单词
	*
	* @throws InterruptedException
	* @throws IOException
	*/
    public void map(Object keyIn, Text valueIn, Context ctx) throws IOException, InterruptedException {
			// 固定值1 作为输出值
			IntWritable valueOut = new IntWritable(1);
			Text keyOut = null;

			// 将每一行需要切片的数据按照空格进行拆分成数组
			String[] str = valueIn.toString().split("\\s+");
			if (str.length >= 3) {//这一步是做判断筛选,即长度大于等于3的行才是下面要进行切片操作的
				ctx.write(new Text(str[1]), valueOut);
			}
		}

	}

3. Reduce 开发

	// 开发reduce
	public static class MusicReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text keyIn, Iterable<IntWritable> valuesIn, Context ctx)
				throws IOException, InterruptedException {
			Text keyOut = keyIn;
			// 输出值
			IntWritable valueOut = new IntWritable();
			int sum = 0;
			// 循环混洗后的数字数组,如[1,1,1,1,1]
			for (IntWritable val : valuesIn) {
				sum += val.get(); // 转成int型 , 做累加
			}
			valueOut.set(sum); // 转成字符串输出去,将累加的结果转化为IntWritable类型
			ctx.write(keyOut, valueOut); // 输出到下一步
		}

	}

4. 反转键值操作Map开发

       在第一次运算的时候map的键值(key/value)组合是字符串(Text)/数字(IntWritable)方式,而mapreduce的排序指的是主要用于 mapreduce 中key的排序;所以,我们是利用第一次运算得到的需求文件再进行一次排序操作运算,第一次操作后得到的文件内容格式如下:

        1)       歌名      播放次数(这是第一次运算后得到的文件内容结果
                    music1    2
                    music2    3
                    music3    1

        为了方便后面混洗和reduce的计算,我们将键值对进行反转操作:

// 反转键值
public static class SoutMusicMapper extends Mapper<Text, IntWritable, IntWritable, Text> {
	public void map(Text keyIn, IntWritable valueIn, Context ctx) throws InterruptedException, IOException {
			ctx.write(valueIn, keyIn);
	}
}

5. 排序

5.1 概念
         mapreduce的排序指的是主要用于 mapreduce 中key的排序,默认按照key升序排序(从小到大)。排序主要用于最终输出之前。
5.2 实现排序
5.2.1 定义一个排序类
      1)定义一个静态排序类,继承需要的排序父类(我们样例使用的是数字排序父类

	  // 排序(降序)
	public static class MusicCountDesc extends IntWritable.Comparator 

      2)重写排序父类中的核心方法

		// 重写两个排序方法
		public int compara(Object a, Object b) {
			return -super.compare(a, b); // 比较结果取负数即可降序排序
		}

		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return -super.compare(b1, s1, l1, b2, s2, l2);
		}

5.2.2 配置排序类到 scjob 【注意,这里的配置排序类到job是第二次的,所以是新创建的 scjob】

// 排序不是用默认排序 而是用我们自定义的排序类
scjob.setSortComparatorClass(MusicCountDesc.class);

6. SequenceFileOutputFormat 和 SequenceFileInputFormat

6.1 SequenceFileOutputFormat
主要是用于 mapreduce 最终输出阶段设置输出的格式类型为SequenceFile类型(字节对象型),类似于ObjectOutputStream(对象输出流)
     样例:

// 8)设置输出内容是sequence 类似与对象输出流
job.setOutputFormatClass(SequenceFileOutputFormat.class);

6.2 SequenceFileInputFormat
     主要用于 mapreduce 录入的数据是 SequenceFile 类型,该类型必须设置job的inputFormat为SequenceFileInputFormat。
     样例:

// 3)设置job的map自定义静态类
scjob.setMapperClass(SoutMusicMapper.class);
scjob.setInputFormatClass(SequenceFileInputFormat.class);

7. 配置Combiner(合并)优化

       在第一次运算的map之后,混洗之前的工作,表面上是看不出来有什么效果,但是可以从运算时间上去探索他的奥秘,合并的目的本身就是为了节约网络运算成本,提高效率。
7.1 概念

       map处理后的数据(因为文件是被切割成64MB的块级进行运算,每一个块级里面有可能会存在多个相同的切割数据,而map输出的键值格式是单一的字符串和数字的方式,而下一步shuffle混洗的输入是运算上一个map输出的多个块级的相同字符串进行值为【1】的固定输入格式。)直接混洗,网络成本会太大,如map有1亿条数据,那么map就需要将处理过的这1亿条数据都传输给混洗节点。优化:如果将map处理后的数据(每一块级里面的相同字符串数据)先做以下合并,如:

     没有合并的操作计算方式:  Hadoop综合实战之MapReduce运算优化——音乐排行榜-LMLPHP      增加合并配置的计算方式:  Hadoop综合实战之MapReduce运算优化——音乐排行榜-LMLPHP

     合并后混洗需要运算的总数据量(所有块级里的数据总量)就会少了很多,再进行shuffle混洗操作,就会节省很多网络开支。

7.2 语法  注意:【合并一般配置都是reduce处理类

// 配置combiner合并
job.setCombinerClass(MusicReducer.class);//一般配置都是reduce处理类

8. Partitioner(分区)优化

8.1 概念
     mapreduce job 混洗的数据交给 reduce task(任务),而默认只有一个分区,所以只有一个reduce task(任务),一个reduce task(任务)产生一个输出文件。

8.2 分区目的

    简单地说:分区的目的是让多个分机进行运算,提高运算效率。作用域也是在最终的输出阶段才有明显效果。

8.3 语法
     1)定义一个静态分区类;混洗后的数据交给reduce运算是用的第一个参数值“key”【IntWritable key】,因此用第一个参数值进行分区操作:

	// 定义一个分区类
	public static class MyPartitioner extends Partitioner<IntWritable, Text> {

		@Override
		public int getPartition(IntWritable key, Text arg1, int arg2) {
			if (key.get() >= 2) {// 分区2个
				return 0;
			} else {
				return 1;
			}
		}
	}

     2)将自定义的静态分区类配置给job

// 配置分区类
scjob.setPartitionerClass(MyPartitioner.class);

     3)设置job的reduce task 数量(即有几个分区)。

// 设置reduce数量
scjob.setNumReduceTasks(2);

9. 分区的job和task拓展

       默认情况下:一个job(作业)-->对应多个map task(任务) --> shuffle(混洗)-->对应一个reduce-->每一个reduce对应一个输出文件;分区操作之后的结果是,分了几个区,就会有几台机子做运算(前提是有足够多的分机),就会得到几个运算输出文件。

10. 完整代码

      当然以上过程到这里就算结束了,这里还是可以继续进行优化操作的,根据操作的数据量大小选择合适的方式即可。

package org.kgc1803.musicdemo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/**
 * 音乐排行榜统计小实战
 *
 * @author hduser
 *
 */
public class MusicJob {
	/**
	 * 开发map
	 *
	 * @author hduser 第1个参数:map 输入键的类型 第2个参数:map 输入值的类型 第3个参数:map 输出键的类型
	 *  第4个参数:map 输出值的类型
	 */
	public static class MusicMapper extends Mapper<Object, Text, Text, IntWritable> {
		/**
		 * 每一个切片 会执行一次map方法, keyIn是每一行的键, valueIn 是每一行的值, context
		 * 是上下文容器,用于将map的结果输出到下一步 wordcount map 把value拆成单个单词
		 *
		 * @throws InterruptedException
		 * @throws IOException
		 */
		public void map(Object keyIn, Text valueIn, Context ctx) throws IOException, InterruptedException {
			// 固定值1 作为输出值
			IntWritable valueOut = new IntWritable(1);
			Text keyOut = null;
			// this is hadoop application.
			// StringTokenizer token = new StringTokenizer(valueIn.toString());
			// 按照迭代器用法使用
			// while (token.hasMoreTokens()) {
			// String key = token.nextToken();
			// keyOut = new Text(key);
			// ctx.write(keyOut, valueOut);
			// }

			// 将每一行需要切片的数据按照空格进行拆分成数组
			String[] str = valueIn.toString().split("\\s+");
			if (str.length >= 3) {//这一步是做判断筛选,即长度大于等于3的行才是下面要进行切片操作的
				ctx.write(new Text(str[1]), valueOut);
			}
		}

	}

	// 开发reduce
	public static class MusicReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text keyIn, Iterable<IntWritable> valuesIn, Context ctx)
				throws IOException, InterruptedException {
			Text keyOut = keyIn;
			// 输出值
			IntWritable valueOut = new IntWritable();
			int sum = 0;
			// 循环混洗后的数字数组,如[1,1,1,1,1]
			for (IntWritable val : valuesIn) {
				sum += val.get(); // 转成int型 , 做累加
			}
			valueOut.set(sum); // 转成字符串输出去,将累加的结果转化为IntWritable类型
			ctx.write(keyOut, valueOut); // 输出到下一步
		}

	}

	// 反转键值
	public static class SoutMusicMapper extends Mapper<Text, IntWritable, IntWritable, Text> {
		public void map(Text keyIn, IntWritable valueIn, Context ctx) throws InterruptedException, IOException {
			ctx.write(valueIn, keyIn);
		}
	}

	/**
	 * 排序(降序)
	 * 这里用的是数字排序IntWritable.Comparator,
	 * 在排序过程中进行两两比较时会去调用此类的compara()方法,返回值-1,0,1分别表示小于,等于,大于。
	 * 所以只需要重写compara()方法,并将返回值取反就可以实现倒序比较器。
	 *
	 * @author hduser
	 *
	 */
	public static class MusicCountDesc extends IntWritable.Comparator {
		// 重写两个排序方法
		public int compara(Object a, Object b) {
			return -super.compare(a, b); // 比较结果取负数即可降序排序
		}

		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return -super.compare(b1, s1, l1, b2, s2, l2);
		}
	}

	// 定义一个分区类
	public static class MyPartitioner extends Partitioner<IntWritable, Text> {

		@Override
		public int getPartition(IntWritable key, Text arg1, int arg2) {
			if (key.get() >= 2) {// 分区2个
				return 0;
			} else {
				return 1;
			}
		}
	}

	public static void main(String[] args) throws Exception {
		// 创建job 执行job
		// 1)加载hdfs配置文件(配置hdfs访问入口)
		Configuration conf = new Configuration();

		// 2)创建一个job并确定设置job(运算作业)的主启动类。
		Job job = Job.getInstance(conf);
		job.setJarByClass(MusicJob.class);

		// 3)设置job的map自定义静态类
		job.setMapperClass(MusicMapper.class);

		// 4)设置job的reduce自定义静态类
		job.setReducerClass(MusicReducer.class);

		// 5) 配置最终输出(reduce)的输出键和值的类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// 配置combiner合并
		job.setCombinerClass(MusicReducer.class);

		// 6)mapreduce 作业需要的资源位置(总输入位置)
		Path inputPath1 = new Path("hdfs://node1:9000/input/music.txt");
		FileInputFormat.addInputPath(job, inputPath1);

		// 7)mapreduce 作业结果的保存位置(总输出位置)
		Path outputPath = new Path("hdfs://node1:9000/output/music");
		// 删除已存在的输出文件夹(因为输出文件夹是唯一的,启动后自动创建)
		if (FileSystem.get(conf).exists(outputPath)) {
			FileSystem.get(conf).delete(outputPath, true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);

		// 8)设置输出内容是sequence 类似与对象输出流
		job.setOutputFormatClass(SequenceFileOutputFormat.class);

		// 9) 第二次排序运算启动
		if (job.waitForCompletion(true)) {
			// 2)创建一个job并设置job(运算作业)的主启动类
			Job scjob = Job.getInstance(conf);
			scjob.setJarByClass(MusicJob.class);

			// 3)设置job的map自定义静态类
			scjob.setMapperClass(SoutMusicMapper.class);
			scjob.setInputFormatClass(SequenceFileInputFormat.class);

			// 5)配置最终输出(reduce)的输出键和值的类型
			scjob.setOutputKeyClass(IntWritable.class);
			scjob.setOutputValueClass(Text.class);
			FileInputFormat.addInputPath(scjob, outputPath);
			Path outputPath1 = new Path("hdfs://node1:9000/output/music2");
			// 删除已存在的输出文件夹(因为输出文件夹是唯一的,启动后自动创建)
			if (FileSystem.get(conf).exists(outputPath1)) {
				FileSystem.get(conf).delete(outputPath1, true);
			}
			FileOutputFormat.setOutputPath(scjob, outputPath1);

			// 排序不是用默认排序 而是用我们自定义的排序类
			scjob.setSortComparatorClass(MusicCountDesc.class);

			// 配置分区类
			scjob.setPartitionerClass(MyPartitioner.class);

			// 设置reduce数量
			scjob.setNumReduceTasks(2);

			System.exit(scjob.waitForCompletion(true) ? 0 : 1);
		}

	}

}

       里面加了一个小工具,因为我们在写的时候会随时测试,为了不让输出的文件随时改动,且要保持唯一性,即加了一个删除的小工具:

// 删除已存在的输出文件夹(因为输出文件夹是唯一的,启动后自动创建)
if (FileSystem.get(conf).exists(outputPath1)) {
	FileSystem.get(conf).delete(outputPath1, true);
}

    结果:

Hadoop综合实战之MapReduce运算优化——音乐排行榜-LMLPHP

    备注:完整代码是可以直接贴进去使用的,自己准备材料即可。

01-18 03:40