问题描述
我正在做一个大型hadoop项目,并且有一个小的KPI,在这里我只需要编写reduces输出中的前10个值.为了满足这一要求,我使用了一个计数器,并在计数器等于11时中断了循环,但是reducer仍然将所有值写入HDFS.
I am working on a Big hadoop project and there is a small KPI, where I have to write only the top 10 values in reduces output.To complete this requirement, I have used a counter and break the loop when counter is equal to 11, but still reducer writes all of the values to HDFS.
这是一个非常简单的Java代码,但是我被卡住了:(
This is a pretty simple java code, but I am stuck :(
为了进行测试,我创建了一个独立的类(java应用程序)来执行此操作,并且该类已经在这里工作了;我想知道为什么它不能在减速器代码中工作.
For testing, I have created one stand alone class (java application) to do this and this is working there; I'm wondering why it is not working in reducer code.
请有人帮我,并建议我是否想念一些东西.
Please some one help me out and suggest if I missing something.
package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ValueSortExp2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "Test commond");
job.setJarByClass(ValueSortExp2.class);
// Setup MapReduce
job.setMapperClass(MapTask2.class);
job.setReducerClass(ReduceTask2.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(IntComparator2.class);
// Input
FileInputFormat.addInputPath(job, new Path(arguments[0]));
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
job.setOutputFormatClass(TextOutputFormat.class);
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
public static class IntComparator2 extends WritableComparator {
public IntComparator2() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}
public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {
String tokens[]= value.toString().split("\\t");
// int empId = Integer.parseInt(tokens[0]) ;
int count = Integer.parseInt(tokens[2]) ;
context.write(new IntWritable(count), new Text(value));
}
}
public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
int cnt=0;
public void reduce(IntWritable key, Iterable<Text> list, Context context)
throws java.io.IOException, InterruptedException {
for (Text value : list ) {
cnt ++;
if (cnt==11)
{
break;
}
context.write(new IntWritable(cnt), value);
}
}
}
}
简单的Java代码可正常工作
package comparableTest;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class TestData {
//static int cnt=0;
public static void main(String args[]) throws IOException, InterruptedException {
ArrayList<String> list = new ArrayList<String>() {{
add("A");
add("B");
add("C");
add("D");
}};
reduce(list);
}
public static void reduce(Iterable<String> list)
throws java.io.IOException, InterruptedException {
int cnt=0;
for (String value : list ) {
cnt ++;
if (cnt==3)
{
break;
}
System.out.println(value);
}
}
}
样本数据-标头只是更多信息,实际数据来自第二行
` ID NAME COUNT(需要显示前10位数字)
1玩具总动员(1995)2077
1 Toy Story (1995) 2077
10 GoldenEye(1995)888
10 GoldenEye (1995) 888
100市政厅(1996)128
100 City Hall (1996) 128
1000 Curdled(1996)20
1000 Curdled (1996) 20
1001 Associate,The(L'Associe)(1982)0
1001 Associate, The (L'Associe)(1982) 0
1002 Ed的下一步行动(1996)8
1002 Ed's Next Move (1996) 8
1003极端措施(1996)121
1003 Extreme Measures (1996) 121
1004 Glimmer Man,(1996)101
1004 Glimmer Man, The (1996) 101
1005 D3:威猛的鸭子(1996)142
1005 D3: The Mighty Ducks (1996) 142
1006 Chamber,The(1996)78
1006 Chamber, The (1996) 78
1007苹果饺子(1975)232
1007 Apple Dumpling Gang, The (1975) 232
1008狂野边疆之王戴维·克罗基特(1955)97
1008 Davy Crockett, King of the Wild Frontier (1955) 97
1009逃到魔女山(1975)291
1009 Escape to Witch Mountain (1975) 291
101 Bottle Rocket(1996)253
101 Bottle Rocket (1996) 253
1010 Love Bug,The(1969)242
1010 Love Bug, The (1969) 242
1011赫比又骑(1974)135
1011 Herbie Rides Again (1974) 135
1012年老耶勒(1957)301
1012 Old Yeller (1957) 301
1013父母的陷阱,(1961)258
1013 Parent Trap, The (1961) 258
1014 Pollyanna(1960)136
1014 Pollyanna (1960) 136
1015 Homeward Bound:The Incredible Journey(1993)234
1015 Homeward Bound: The Incredible Journey (1993) 234
1016毛毛狗(1959)156
1016 Shaggy Dog, The (1959) 156
1017瑞士家庭鲁宾逊(1960)276
1017 Swiss Family Robinson (1960) 276
1018那该死的猫! (1965)123
1018 That Darn Cat! (1965) 123
1019 20,000海底联盟(1954)575
1019 20,000 Leagues Under the Sea (1954) 575
102 Mr. Wrong(1996)60
102 Mr. Wrong (1996) 60
1020酷跑(1993)392
1020 Cool Runnings (1993) 392
1021外野天使(1994)247
1021 Angels in the Outfield (1994) 247
1022灰姑娘(1950)577
1022 Cinderella (1950) 577
1023小熊维尼与模糊天(1968)221
1023 Winnie the Pooh and the Blustery Day (1968) 221
1024三卡巴耶罗斯(1945)126
1024 Three Caballeros, The (1945) 126
1025剑在石头上(1963)293
1025 Sword in the Stone, The (1963) 293
1026亲爱的我如此(1949)8
1026 So Dear to My Heart (1949) 8
1027罗宾汉(Robin Hood):盗贼王子(1991)344
1027 Robin Hood: Prince of Thieves (1991) 344
1028玛丽·波平斯(Mary Poppins)(1964)1011
1028 Mary Poppins (1964) 1011
1029小飞象(1941)568
1029 Dumbo (1941) 568
103刻骨铭心(1996)33
103 Unforgettable (1996) 33
1030皮特的巨龙(1977)323
1030 Pete's Dragon (1977) 323
1031床把和扫帚(1971)319`
1031 Bedknobs and Broomsticks (1971) 319`
推荐答案
如果在reduce方法内移动int cnt=0;
(作为该方法的第一条语句),则将为每个键获取前10个值(我想这就是您想要的).
If you move int cnt=0;
inside the reduce method (as the first statement of this method), you will get the first 10 values for each key (I guess this is what you want).
否则,现在,您的计数器将继续增加,并且您将仅跳过第11个值(与键无关),并继续第12个值.
Otherwise, as it is now, your counter will keep increasing and you will skip the 11th value only (regardless of key), continuing with the 12th.
如果只想打印10个值(与键无关),则将cnt
初始化保留在原处,并将if
条件更改为if (cnt > 10)
.但是,这不是一个好习惯,因此您可能需要重新考虑算法. (假设您不希望有10个随机值,当您拥有1个以上的reducer和一个hash分区器时,您如何知道在分布式环境中首先要处理哪个键?)
If you want to print only 10 values (regardless of key), you leave the cnt
initialization where it is, and change your if
condition to if (cnt > 10)
...However, this is not a good practice, so you may need to reconsider your algorithm. (assuming you don't want 10 random values, how do you know which key will be processed first in a distributed environment, when you have more than 1 reducers and a hash partitioner?)
这篇关于计数器在减速器代码中不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!