map reduce程序示例
package test2; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /**
样例数据中包含了年份和温度,提出年份里温度最大的
(0, 0067011990999991950051507+0000+),
(33, 0043011990999991950051512+0022+),
(66, 0043011990999991950051518-0011+),
(99, 0043012650999991949032412+0111+),
(132, 0043012650999991949032418+0078+),
(165, 0067011990999991937051507+0001+),
(198, 0043011990999991937051512-0002+),
(231, 0043011990999991945051518+0001+),
(264, 0043012650999991945032412+0002+),
(297, 0043012650999991945032418+0078+),
* */
public class mytest { static String INPUT_PATH="input/t1_num.txt"; //待统计的文件路径
static String OUTPUT_PATH="output/t1_num"; //统计结果存放的路径 static class MyMapper extends Mapper <Object,Object,Text,IntWritable> { //定义继承mapper类
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{ //定义map方法 String[] arr=value.toString().split("\\),"); //文件中的单词是以“),”分割的,并将每一行定义为一个数组
for(int i=0;i<arr.length;i++){ //遍历循环每一行,统计单词出现的数量
String line = arr[i].toString();
String year = line.substring(line.length()-16, line.length()-12);
String airTemperature = line.substring(line.length()-6, line.length()-1);
context.write(new Text(year),new IntWritable(Integer.valueOf(airTemperature)));
}
/**
map过程中,通过对字符串的解析,得到年-温度的key-value对作为输出
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(1937, 1)
(1937, -2)
(1945, 1)
(1945, 2)
(1945, 78)
*/
}
} static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ //定义继承reducer类
protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{ //定义reduce方法
int max = 0;
for(IntWritable c:values){ //统计同一个单词的数量
if(c.get()>max){
max = c.get();//获取value值
}
}
IntWritable outValue=new IntWritable(max);//挨个输出
context.write(key,outValue);
}
/**
在reduce过程,将map过程中的输出,按照相同的key(年份)将value放到同一个列表中作为reduce的输入
(1950, [0, 22, –11])
(1949, [111, 78])
(1937, [1, -2])
(1945, [1, 2, 78]) 在reduce过程中,在列表中选择出最大的温度,将年-max温度的key-value作为输出:
(1950, 22)
(1949, 111)
(1937, 1)
(1945, 78)
*/ } public static void main(String[] args) throws Exception{ //main函数
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.6");//这一行一定要
Path outputpath=new Path(OUTPUT_PATH); //输出路径
Configuration conf=new Configuration();
Job job=Job.getInstance(conf); //定义一个job,启动任务
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath); job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
} }