本文介绍了计数器在减速器代码中不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做一个大型hadoop项目,并且有一个小的KPI,在这里我只需要编写reduces输出中的前10个值.为了满足这一要求,我使用了一个计数器,并在计数器等于11时中断了循环,但是reducer仍然将所有值写入HDFS.

I am working on a Big hadoop project and there is a small KPI, where I have to write only the top 10 values in reduces output.To complete this requirement, I have used a counter and break the loop when counter is equal to 11, but still reducer writes all of the values to HDFS.

这是一个非常简单的Java代码,但是我被卡住了:(

This is a pretty simple java code, but I am stuck :(

为了进行测试,我创建了一个独立的类(java应用程序)来执行此操作,并且该类已经在这里工作了;我想知道为什么它不能在减速器代码中工作.

For testing, I have created one stand alone class (java application) to do this and this is working there; I'm wondering why it is not working in reducer code.

请有人帮我,并建议我是否想念一些东西.

Please some one help me out and suggest if I missing something.

package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ValueSortExp2 {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration(true);

        String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = new Job(conf, "Test commond");
        job.setJarByClass(ValueSortExp2.class);

        // Setup MapReduce
        job.setMapperClass(MapTask2.class);
        job.setReducerClass(ReduceTask2.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setSortComparatorClass(IntComparator2.class);
        // Input
        FileInputFormat.addInputPath(job, new Path(arguments[0]));
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
        job.setOutputFormatClass(TextOutputFormat.class);


        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

    public static class IntComparator2 extends WritableComparator {

        public IntComparator2() {
            super(IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

            Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
            Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

            return v1.compareTo(v2) * (-1);
        }
    }

    public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {

            public void  map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {

                String tokens[]= value.toString().split("\\t");

            //    int empId = Integer.parseInt(tokens[0])    ;
                int count = Integer.parseInt(tokens[2])    ;

                context.write(new IntWritable(count), new Text(value));

            }

        }


    public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
        int cnt=0;
        public void reduce(IntWritable key, Iterable<Text> list, Context context)
                throws java.io.IOException, InterruptedException {


            for (Text value : list ) {
                cnt ++;

                if (cnt==11)
                {
                    break;
                }

                context.write(new IntWritable(cnt), value);




            }

        }
}
}

简单的Java代码可正常工作

package comparableTest;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class TestData {

    //static int cnt=0;


    public static void main(String args[]) throws IOException, InterruptedException {

        ArrayList<String> list = new ArrayList<String>() {{
            add("A");
            add("B");
            add("C");
            add("D");
        }};


        reduce(list);


    }

    public static void reduce(Iterable<String> list)
            throws java.io.IOException, InterruptedException {


        int cnt=0;
        for (String value : list ) {
            cnt ++;

            if (cnt==3)
            {
                break;
            }

            System.out.println(value);


        }

    }
}

样本数据-标头只是更多信息,实际数据来自第二行

` ID NAME COUNT(需要显示前10位数字)

1玩具总动员(1995)2077

1 Toy Story (1995) 2077

10 GoldenEye(1995)888

10 GoldenEye (1995) 888

100市政厅(1996)128

100 City Hall (1996) 128

1000 Curdled(1996)20

1000 Curdled (1996) 20

1001 Associate,The(L'Associe)(1982)0

1001 Associate, The (L'Associe)(1982) 0

1002 Ed的下一步行动(1996)8

1002 Ed's Next Move (1996) 8

1003极端措施(1996)121

1003 Extreme Measures (1996) 121

1004 Glimmer Man,(1996)101

1004 Glimmer Man, The (1996) 101

1005 D3:威猛的鸭子(1996)142

1005 D3: The Mighty Ducks (1996) 142

1006 Chamber,The(1996)78

1006 Chamber, The (1996) 78

1007苹果饺子(1975)232

1007 Apple Dumpling Gang, The (1975) 232

1008狂野边疆之王戴维·克罗基特(1955)97

1008 Davy Crockett, King of the Wild Frontier (1955) 97

1009逃到魔女山(1975)291

1009 Escape to Witch Mountain (1975) 291

101 Bottle Rocket(1996)253

101 Bottle Rocket (1996) 253

1010 Love Bug,The(1969)242

1010 Love Bug, The (1969) 242

1011赫比又骑(1974)135

1011 Herbie Rides Again (1974) 135

1012年老耶勒(1957)301

1012 Old Yeller (1957) 301

1013父母的陷阱,(1961)258

1013 Parent Trap, The (1961) 258

1014 Pollyanna(1960)136

1014 Pollyanna (1960) 136

1015 Homeward Bound:The Incredible Journey(1993)234

1015 Homeward Bound: The Incredible Journey (1993) 234

1016毛毛狗(1959)156

1016 Shaggy Dog, The (1959) 156

1017瑞士家庭鲁宾逊(1960)276

1017 Swiss Family Robinson (1960) 276

1018那该死的猫! (1965)123

1018 That Darn Cat! (1965) 123

1019 20,000海底联盟(1954)575

1019 20,000 Leagues Under the Sea (1954) 575

102 Mr. Wrong(1996)60

102 Mr. Wrong (1996) 60

1020酷跑(1993)392

1020 Cool Runnings (1993) 392

1021外野天使(1994)247

1021 Angels in the Outfield (1994) 247

1022灰姑娘(1950)577

1022 Cinderella (1950) 577

1023小熊维尼与模糊天(1968)221

1023 Winnie the Pooh and the Blustery Day (1968) 221

1024三卡巴耶罗斯(1945)126

1024 Three Caballeros, The (1945) 126

1025剑在石头上(1963)293

1025 Sword in the Stone, The (1963) 293

1026亲爱的我如此(1949)8

1026 So Dear to My Heart (1949) 8

1027罗宾汉(Robin Hood):盗贼王子(1991)344

1027 Robin Hood: Prince of Thieves (1991) 344

1028玛丽·波平斯(Mary Poppins)(1964)1011

1028 Mary Poppins (1964) 1011

1029小飞象(1941)568

1029 Dumbo (1941) 568

103刻骨铭心(1996)33

103 Unforgettable (1996) 33

1030皮特的巨龙(1977)323

1030 Pete's Dragon (1977) 323

1031床把和扫帚(1971)319`

1031 Bedknobs and Broomsticks (1971) 319`

推荐答案

如果在reduce方法内移动int cnt=0;(作为该方法的第一条语句),则将为每个键获取前10个值(我想这就是您想要的).

If you move int cnt=0; inside the reduce method (as the first statement of this method), you will get the first 10 values for each key (I guess this is what you want).

否则,现在,您的计数器将继续增加,并且您将仅跳过第11个值(与键无关),并继续第12个值.

Otherwise, as it is now, your counter will keep increasing and you will skip the 11th value only (regardless of key), continuing with the 12th.

如果只想打印10个值(与键无关),则将cnt初始化保留在原处,并将if条件更改为if (cnt > 10).但是,这不是一个好习惯,因此您可能需要重新考虑算法. (假设您不希望有10个随机值,当您拥有1个以上的reducer和一个hash分区器时,您如何知道在分布式环境中首先要处理哪个键?)

If you want to print only 10 values (regardless of key), you leave the cnt initialization where it is, and change your if condition to if (cnt > 10)...However, this is not a good practice, so you may need to reconsider your algorithm. (assuming you don't want 10 random values, how do you know which key will be processed first in a distributed environment, when you have more than 1 reducers and a hash partitioner?)

这篇关于计数器在减速器代码中不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 16:19