我正在做一个大型hadoop项目,并且有一个小的KPI,在这里我只需要编写reduces输出中的前10个值。
为了满足此要求,我使用了一个计数器,并在计数器等于11时中断了循环,但reducer仍将所有值写入HDFS。

这是一个非常简单的Java代码,但我被卡住了:(

为了进行测试,我创建了一个独立的类(java应用程序)来执行此操作,并且该类已经在这里工作。我想知道为什么它不能在 reducer 代码中工作。

请有人帮我,并建议我是否想念一些东西。

map -减少代码

package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ValueSortExp2 {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration(true);

        String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = new Job(conf, "Test commond");
        job.setJarByClass(ValueSortExp2.class);

        // Setup MapReduce
        job.setMapperClass(MapTask2.class);
        job.setReducerClass(ReduceTask2.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setSortComparatorClass(IntComparator2.class);
        // Input
        FileInputFormat.addInputPath(job, new Path(arguments[0]));
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
        job.setOutputFormatClass(TextOutputFormat.class);


        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

    public static class IntComparator2 extends WritableComparator {

        public IntComparator2() {
            super(IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

            Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
            Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

            return v1.compareTo(v2) * (-1);
        }
    }

    public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {

            public void  map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {

                String tokens[]= value.toString().split("\\t");

            //    int empId = Integer.parseInt(tokens[0])    ;
                int count = Integer.parseInt(tokens[2])    ;

                context.write(new IntWritable(count), new Text(value));

            }

        }


    public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
        int cnt=0;
        public void reduce(IntWritable key, Iterable<Text> list, Context context)
                throws java.io.IOException, InterruptedException {


            for (Text value : list ) {
                cnt ++;

                if (cnt==11)
                {
                    break;
                }

                context.write(new IntWritable(cnt), value);




            }

        }
}
}

简单的Java代码工作精细
package comparableTest;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class TestData {

    //static int cnt=0;


    public static void main(String args[]) throws IOException, InterruptedException {

        ArrayList<String> list = new ArrayList<String>() {{
            add("A");
            add("B");
            add("C");
            add("D");
        }};


        reduce(list);


    }

    public static void reduce(Iterable<String> list)
            throws java.io.IOException, InterruptedException {


        int cnt=0;
        for (String value : list ) {
            cnt ++;

            if (cnt==3)
            {
                break;
            }

            System.out.println(value);


        }

    }
}

样本数据- header 仅是更多信息,实际数据来自第二行

` ID NAME COUNT(需要显示前10个desc)

1玩具总动员(1995)2077

10 GoldenEye(1995)888

100市政厅(1996)128

1000 curl (1996)20

1001 Associate,The(L'Associe)(1982)0

1002埃德的下一步行动(1996)8

1003极端措施(1996)121

1004微光人(1996)101

1005 D3:猛鸭(1996)142

1006室(1996)78

1007苹果饺子帮(1975)232

1008狂野边疆之王戴维·克罗基特(1955)97

1009逃到巫婆山(1975)291

101瓶火箭(1996)253

1010爱情虫(1969)242

1011赫比再骑(1974)135

1012老耶勒(1957)301

1013 parent 陷阱(1961)258

1014波莉安娜(1960)136

1015 Homeward Bound:The Incredible Journey(1993)234

1016毛茸茸的狗(1959)156

1017瑞士家庭鲁宾逊(1960)276

第1018章那该死的猫! (1965)123

1019 20,000海底联盟(1954)575

102 Mr.Wrong(1996)60

1020酷跑(1993)392

1021外野天使(1994)247

1022灰姑娘(1950)577

1023小熊维尼与模糊日(1968)221

1024三卡巴耶罗斯(1945)126

1025剑在石头上(1963)293

1026 So Dear to My Heart(1949)第8章

1027罗宾汉:盗贼王子(1991)344

1028玛丽·波平斯(Mary Poppins)(1964)1011

1029小飞象(1941)568

103难忘(1996)33

1030皮特的龙(1977)323

1031 Bedknobs and Broomsticks(1971)319`

最佳答案

如果将int cnt=0;移动到reduce方法中(作为该方法的第一条语句),您将获得每个键的前10个值(我想这就是您想要的)。

否则,如现在一样,您的计数器将继续增加,并且您将仅跳过第11个值(与键无关),并继续第12个值。

如果只想打印10个值(与键无关),则将cnt初始化保留在原处,并将if条件更改为if (cnt > 10) ...
但是,这不是一个好习惯,因此您可能需要重新考虑算法。 (假设您不希望有10个随机值,当您拥有1个以上的reducer和一个hash分区器时,如何知道在分布式环境中首先要处理哪个键?)

关于java - 计数器在 reducer 代码中不起作用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46087100/

10-16 03:01