MapReduce的优势在于处理大规模数据集
# 2.1 气象数据集
本章给了一个从气象数据中计算每年最高气温的例子。原始数据来自ncdc,见下方ftp。数据的格式是每年一个文件,每个文件中有多条气象数据的记录。
ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2010/
ncdc的气象数据
书中的全套源码可以自行百度,下下来导进去就可以运行。
至此我们还不需要搭建hadoop集群就可以运行这个气象处理的代码。看网上大部分都是把代码丢到hadoop集群上去跑,作为初学者来说,一上来就搭建hadoop,不是很好,而且日常开发如果开发完就要放到hadoop上去跑好麻烦的。
其实Hadoop提供了三种运行模式(参见附录A):
1.独立(或本地)模式,无需任何守护进程,所有程序都在同一个JVM上执行,因此此模式适合开发阶段。
2.伪分布式,Hadoop 守护进程运行在本地机器上,模拟一个小规模集群。
3.全分布模式, 守护进程运行在一个集群上。
若是开发工具是IDEA,只需要下载Hadoop安装包,解压至本地磁盘,配置环境变量即可使用独立模式运行书中源码。
- 注意:
- Hadoop安装包不能放到有空格的目录。
- 每次运行代码前,输出目录必须是不存在的才能执行。(hadoop怕你覆盖上次的执行结果)
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
/**
*从天气文件的每一行数据中截取年份和温度存入context
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
/**
* 循环对比每一条数据,将最大的温度输出
*/
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public class MaxTemperature {
static {
try {
System.load("D:/hadoop-2.6.5/bin/hadoop.dll");
} catch (UnsatisfiedLinkError e) {
System.err.println("Native code library failed to load.\n" + e);
System.exit(1);
}
}
/**
* 执行计算任务
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
# 2.4 横向扩展(scaling out)
# 2.4.1 数据流
术语:MapReduce作业(job)是客户端需要执行的一个工作单元,包括:输入数据、MapReduce程序、配置信息。
Hadoop将作业(job)分成若干个任务(task)来执行,包括:map任务和reduce任务。这些任务运行在集群的接点上,并通过YARN来调度。如果一个任务失败,将在另一个不同的接点上重新运行。
Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split),简称分片。为每个分片构建一个map任务,由该任务来运行用户的map函数。
分片越多,处理整个输入的时间就越短,如果分片太小,那么分片和构建map任务的时间将决定整个作业的时间。合理的分片趋向于HDFS一个块的大小,默认128M(可调整)。
Hadoop在存储输入数据的节点上运行map任务可以获得最佳的性能。
map任务将输出写入本地磁盘,而非HDFS,因为map的输出是中间结果,而非最终结果。若失败,Hadoop会在其他节点上重新运行该任务。reduce的输出结果,第一个副本会存在本地节点,其他副本出于可靠性考虑存在其他机架上的节点。
# 2.4.2 combiner函数
尽量减少Map和Reduce任务之间的数据传输可提高计算效率。combiner函数能减少map和reduce之间传输的数据量。
# 2.5 Hadoop Streaming
Hadoop Streaming 使用Unix标准流作为Hadoop和应用程序之间的接口,因此可以使用任何编程语言通过标注输入/输出流来写MapReduce程序。
- 没看懂这个小节