MapReduce的优势在于处理大规模数据集

# 2.1 气象数据集

本章给了一个从气象数据中计算每年最高气温的例子。原始数据来自ncdc,见下方ftp。数据的格式是每年一个文件,每个文件中有多条气象数据的记录。

ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2010/
ncdc的气象数据

书中的全套源码可以自行百度,下下来导进去就可以运行。
至此我们还不需要搭建hadoop集群就可以运行这个气象处理的代码。看网上大部分都是把代码丢到hadoop集群上去跑,作为初学者来说,一上来就搭建hadoop,不是很好,而且日常开发如果开发完就要放到hadoop上去跑好麻烦的。

其实Hadoop提供了三种运行模式(参见附录A):

1.独立(或本地)模式,无需任何守护进程,所有程序都在同一个JVM上执行,因此此模式适合开发阶段。
2.伪分布式,Hadoop 守护进程运行在本地机器上,模拟一个小规模集群。
3.全分布模式, 守护进程运行在一个集群上。

若是开发工具是IDEA,只需要下载Hadoop安装包,解压至本地磁盘,配置环境变量即可使用独立模式运行书中源码。

  • 注意:
  • Hadoop安装包不能放到有空格的目录。
  • 每次运行代码前,输出目录必须是不存在的才能执行。(hadoop怕你覆盖上次的执行结果)
public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;

  /**
   *从天气文件的每一行数据中截取年份和温度存入context
    * @param key
   * @param value
   * @param context
   * @throws IOException
   * @throws InterruptedException
     */
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}
public class MaxTemperatureReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {

    int maxValue = Integer.MIN_VALUE;
    /**
     * 循环对比每一条数据,将最大的温度输出
     */
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}
public class MaxTemperature {

  static {
    try {
      System.load("D:/hadoop-2.6.5/bin/hadoop.dll");
    } catch (UnsatisfiedLinkError e) {
      System.err.println("Native code library failed to load.\n" + e);
      System.exit(1);
    }
  }

  /**
   * 执行计算任务
   * @param args
   * @throws Exception
     */
  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: MaxTemperature <input path> <output path>");
      System.exit(-1);
    }

    Job job = new Job();
    job.setJarByClass(MaxTemperature.class);
    job.setJobName("Max temperature");

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(MaxTemperatureMapper.class);
    job.setReducerClass(MaxTemperatureReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
# 2.4 横向扩展(scaling out)
# 2.4.1 数据流

术语:MapReduce作业(job)是客户端需要执行的一个工作单元,包括:输入数据、MapReduce程序、配置信息。
Hadoop将作业(job)分成若干个任务(task)来执行,包括:map任务和reduce任务。这些任务运行在集群的接点上,并通过YARN来调度。如果一个任务失败,将在另一个不同的接点上重新运行。

Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split),简称分片。为每个分片构建一个map任务,由该任务来运行用户的map函数。

分片越多,处理整个输入的时间就越短,如果分片太小,那么分片和构建map任务的时间将决定整个作业的时间。合理的分片趋向于HDFS一个块的大小,默认128M(可调整)。

Hadoop在存储输入数据的节点上运行map任务可以获得最佳的性能。

map任务将输出写入本地磁盘,而非HDFS,因为map的输出是中间结果,而非最终结果。若失败,Hadoop会在其他节点上重新运行该任务。reduce的输出结果,第一个副本会存在本地节点,其他副本出于可靠性考虑存在其他机架上的节点。

# 2.4.2 combiner函数

尽量减少Map和Reduce任务之间的数据传输可提高计算效率。combiner函数能减少map和reduce之间传输的数据量。

# 2.5 Hadoop Streaming

Hadoop Streaming 使用Unix标准流作为Hadoop和应用程序之间的接口,因此可以使用任何编程语言通过标注输入/输出流来写MapReduce程序。

  • 没看懂这个小节
03-11 19:23