我正在阅读有关mapreduce的信息,并且想知道一个特定的场景。假设我们有几个文件(例如,fileA,fileB,fileC),每个文件都由多个整数组成。如果我们想对所有文件中的数字进行排序以创建如下所示的内容:

23 fileA
34 fileB
35 fileA
60 fileA
60 fileC

map 和简化流程将如何工作?

目前,这是我所拥有的,但还不完全正确;
  • (fileName, fileContent) -> (map to) (Number, fileName)
  • 对临时键,值对进行排序并获取(Number, (list of){fileName1, fileName2...})
  • 减少临时对并获得
    (Number, fileName1)
    (Number, fileName2)
    

    依此类推

  • 问题在于,在排序阶段,文件名可能不是按字母顺序排列的,因此reduce部分将不会生成正确的输出。有人可以针对这种情况的正确方法提供一些见解吗?

    最佳答案

    实现此目的的最佳方法是通过辅助排序。您需要同时对键(在您的案例编号中)和值(在您的案例文件名中)进行排序。在Hadoop中,映射器输出仅按键排序。

    这可以通过使用复合键来实现:复合键是数字和文件名的组合。例如对于第一个记录,键将是(23,fileA),而不是(23)。

    您可以在此处阅读有关二级排序的信息:https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch01.html

    您还可以阅读“ Hadoop权威指南”一书中的“ Secondary Sort ”部分。

    为了简单起见,我编写了一个程序来实现相同的目的。

    在此程序中,默认情况下,映射器对键进行排序。我已经编写了一种逻辑来对化简器端的值进行排序。因此,它需要同时对键和值进行排序,并产生所需的输出。

    以下是程序:

    package com.myorg.hadooptests;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.*;
    
    public class SortedValue {
    
    
        public static class SortedValueMapper
                extends Mapper<LongWritable, Text , Text, IntWritable>{
    
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String[] tokens = value.toString().split(" ");
    
                if(tokens.length == 2) {
                    context.write(new Text(tokens[1]), new IntWritable(Integer.parseInt(tokens[0])));
                }
            }
        }
    
        public static class SortedValueReducer
                extends Reducer<Text, IntWritable, IntWritable, Text> {
    
            Map<String, ArrayList<Integer>> valueMap = new HashMap<String, ArrayList<Integer>>();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context) throws IOException, InterruptedException {
    
                String keyStr = key.toString();
                ArrayList<Integer> storedValues = valueMap.get(keyStr);
    
                for (IntWritable value : values) {
                    if (storedValues == null) {
                        storedValues = new ArrayList<Integer>();
                        valueMap.put(keyStr, storedValues);
                    }
                    storedValues.add(value.get());
                }
    
                Collections.sort(storedValues);
                for (Integer val : storedValues) {
                    context.write(new IntWritable(val), key);
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf, "CompositeKeyExample");
            job.setJarByClass(SortedValue.class);
            job.setMapperClass(SortedValueMapper.class);
            job.setReducerClass(SortedValueReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path("/in/in1.txt"));
            FileOutputFormat.setOutputPath(job, new Path("/out/"));
    
            System.exit(job.waitForCompletion(true) ? 0:1);
    
        }
    }
    

    映射器逻辑:
  • 解析每一行。假定键和值由空白字符(“”)分隔。
  • 如果该行包含2个标记,则发出(文件名,整数值)。例如对于第一个记录,它发出(fileA,23)。

  • reducer 逻辑:
  • 它将(key,value)对放入HashMap中,其中key是文件名,value是该文件的整数列表。例如对于fileA,存储的值为23、34和35
  • 最后,它对特定键的值进行排序,并为每个值从reducer发出(值,键)。例如对于fileA,记录输出为:(23,fileA),(34,fileA)和(35,fileA)

  • 我为以下输入运行了该程序:
    34 fileB
    35 fileA
    60 fileC
    60 fileA
    23 fileA
    

    我得到以下输出:
    23      fileA
    35      fileA
    60      fileA
    34      fileB
    60      fileC
    

    关于hadoop - MapReduce示例,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34259487/

    10-16 01:37