1. MR实例开发整体流程
最简单的MapReduce应用程序至少包含 3 个部分:一个 Map 函数、一个 Reduce 函数和一个 main 函数。在运行一个mapreduce计算任务时候,任务过程被分为两个阶段:map阶段和reduce阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output)。main 函数将作业控制和文件输入/输出结合起来。
2. 环境准备
请参见之前小节的Hadoop集群搭建、windows上部署hadoop包、HDFS API开发等文档;
eclipse JavaSE 开发环境,hadoop-eclipse-plugin插件准备;
Hadoop jar 包准备:
找到windows上部署的hadoop根目录(hadoop开发包下载),需要jar包:
Hadoop-2.6.5\share\hadoop\hdfs\hadoop-hdfs-2.6.5.jar hadoop-2.6.5\share\hadoop\hdfs\lib\所有jar包 hadoop-2.6.5\share\hadoop\common\hadoop-common-2.6.5.jar hadoop-2.6.5\share\hadoop\common\lib\所有jar包 hadoop-2.6.5\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.6.5.jar之外的jar包 hadoop-2.6.5\share\hadoop\mapreduce\lib\所有jar包
将上述jar包新建user library导入eclipse中。(另:hadoop源码包下载,可深入研究hadoop框架底层实现。)
开启Zookeeper集群、Hadoop HA集群、MapReduce集群。
3. 实例开发
3.1 实例介绍
统计每一个单词在整个数据集中出现的总次数(wordcount)。
3.1 数据流程
3.2 上传数据word.txt
MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster
3.3 代码编写
3.3.1 mapper类实现
/*
* KEYIN:输入kv数据对中key的数据类型
* VALUEIN:输入kv数据对中value的数据类型
* KEYOUT:输出kv数据对中key的数据类型
* VALUEOUT:输出kv数据对中value的数据类型
*/
public class Mapwc extends Mapper<LongWritable, Text, Text, IntWritable>{
/*
* map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次自定义的map方法
* map task在调用map方法时,传递的参数:
* 一行的起始偏移量LongWritable作为key
* 一行的文本内容Text作为value
*/
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer words = new StringTokenizer(line);
while(words.hasMoreTokens()){
context.write(new Text(words.nextToken()), new IntWritable(1));
}
}
}
3.3.2 reducer类实现
public class Reducewc extends Reducer<Text, IntWritable, Text, IntWritable>{
/*
* reduce方法提供给reduce task进程来调用
*
* reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,其机制是相同key的kv对聚合为一组
* 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法
* 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1>
* hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理
* 调用时传递的参数:
* key:一组kv中的key
* values:一组kv中所有value的迭代器
*/
@Override
protected void reduce(Text words, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable s : values){
sum+=s.get();
}
context.write(words, new IntWritable(sum));
}
}
3.3.3 job提交客户端实现
public class Jobwc {
public static void main(String[] args) throws IOException {
//1.环境变量
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://node01:8020");
configuration.set("yarn.resourcemanager.hostname", "node02:8088");
//2.设置Job任务的相关信息
Job job = Job.getInstance(configuration);
job.setJarByClass(Jobwc.class);
job.setJobName("wc");
job.setMapperClass(Mapwc.class);
job.setReducerClass(Reducewc.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//3.输入数据文件,读取HDFS上的文件
FileInputFormat.addInputPaths(job, "/wc/input/word.txt");
//4.输出结果到制定地方
Path path = new Path("/wc/output");
FileSystem fs = FileSystem.get(configuration);
if (fs.exists(path)) {
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path);
//5.结束
boolean f;
try {
f = job.waitForCompletion(true);
if (f) {
System.out.println("job success !");
} else {
System.out.println("-------------");
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.3.4 运行结果
MapReduce 1
a 3
algorithm 1
an 1
and 2
associated 1
big 1
cluster 1
data 1
distributed 1
for 1
generating 1
implementation 1
is 1
model 1
on 1
parallel, 1
processing 1
programming 1
sets 1
with 1