我正在做一个大型hadoop项目,并且有一个小的KPI,在这里我只需要编写reduces输出中的前10个值。
为了满足此要求,我使用了一个计数器,并在计数器等于11时中断了循环,但reducer仍将所有值写入HDFS。
这是一个非常简单的Java代码,但我被卡住了:(
为了进行测试,我创建了一个独立的类(java应用程序)来执行此操作,并且该类已经在这里工作。我想知道为什么它不能在 reducer 代码中工作。
请有人帮我,并建议我是否想念一些东西。
map -减少代码
package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ValueSortExp2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "Test commond");
job.setJarByClass(ValueSortExp2.class);
// Setup MapReduce
job.setMapperClass(MapTask2.class);
job.setReducerClass(ReduceTask2.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(IntComparator2.class);
// Input
FileInputFormat.addInputPath(job, new Path(arguments[0]));
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
job.setOutputFormatClass(TextOutputFormat.class);
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
public static class IntComparator2 extends WritableComparator {
public IntComparator2() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}
public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {
String tokens[]= value.toString().split("\\t");
// int empId = Integer.parseInt(tokens[0]) ;
int count = Integer.parseInt(tokens[2]) ;
context.write(new IntWritable(count), new Text(value));
}
}
public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
int cnt=0;
public void reduce(IntWritable key, Iterable<Text> list, Context context)
throws java.io.IOException, InterruptedException {
for (Text value : list ) {
cnt ++;
if (cnt==11)
{
break;
}
context.write(new IntWritable(cnt), value);
}
}
}
}
简单的Java代码工作精细
package comparableTest;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class TestData {
//static int cnt=0;
public static void main(String args[]) throws IOException, InterruptedException {
ArrayList<String> list = new ArrayList<String>() {{
add("A");
add("B");
add("C");
add("D");
}};
reduce(list);
}
public static void reduce(Iterable<String> list)
throws java.io.IOException, InterruptedException {
int cnt=0;
for (String value : list ) {
cnt ++;
if (cnt==3)
{
break;
}
System.out.println(value);
}
}
}
样本数据- header 仅是更多信息,实际数据来自第二行
` ID NAME COUNT(需要显示前10个desc)
1玩具总动员(1995)2077
10 GoldenEye(1995)888
100市政厅(1996)128
1000 curl (1996)20
1001 Associate,The(L'Associe)(1982)0
1002埃德的下一步行动(1996)8
1003极端措施(1996)121
1004微光人(1996)101
1005 D3:猛鸭(1996)142
1006室(1996)78
1007苹果饺子帮(1975)232
1008狂野边疆之王戴维·克罗基特(1955)97
1009逃到巫婆山(1975)291
101瓶火箭(1996)253
1010爱情虫(1969)242
1011赫比再骑(1974)135
1012老耶勒(1957)301
1013 parent 陷阱(1961)258
1014波莉安娜(1960)136
1015 Homeward Bound:The Incredible Journey(1993)234
1016毛茸茸的狗(1959)156
1017瑞士家庭鲁宾逊(1960)276
第1018章那该死的猫! (1965)123
1019 20,000海底联盟(1954)575
102 Mr.Wrong(1996)60
1020酷跑(1993)392
1021外野天使(1994)247
1022灰姑娘(1950)577
1023小熊维尼与模糊日(1968)221
1024三卡巴耶罗斯(1945)126
1025剑在石头上(1963)293
1026 So Dear to My Heart(1949)第8章
1027罗宾汉:盗贼王子(1991)344
1028玛丽·波平斯(Mary Poppins)(1964)1011
1029小飞象(1941)568
103难忘(1996)33
1030皮特的龙(1977)323
1031 Bedknobs and Broomsticks(1971)319`
最佳答案
如果将int cnt=0;
移动到reduce方法中(作为该方法的第一条语句),您将获得每个键的前10个值(我想这就是您想要的)。
否则,如现在一样,您的计数器将继续增加,并且您将仅跳过第11个值(与键无关),并继续第12个值。
如果只想打印10个值(与键无关),则将cnt
初始化保留在原处,并将if
条件更改为if (cnt > 10)
...
但是,这不是一个好习惯,因此您可能需要重新考虑算法。 (假设您不希望有10个随机值,当您拥有1个以上的reducer和一个hash分区器时,如何知道在分布式环境中首先要处理哪个键?)
关于java - 计数器在 reducer 代码中不起作用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46087100/