我使用hadoop 3.1.2迈出了第一步,我的数据集如下:
id station; city; temperature
1; New York; 14
3; New York; 20
2; Bristol; 29
8; Rome; -10
30; Bristol; 2
10; Rome; 0
1; New York; 10
8; Rome; 10
通过Hadoop,使用mapreduce,我应该得到:按ID站的分组以及平均温度。
但是,我对所有城市都不感兴趣,例如,仅对站号为1、8的城市感兴趣...
方案:计算城市的总/平均温度
map (键,值)->键:ID站名称为city,值:温度。
降低:按ID站+名称城市分组,并取每个站的平均温度。
得到类似的东西:
City - Station; Average Temperature
New York - 1; 7
Rome - 8; 0
这是代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SingleMapperReducer
{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "City Temperature Job");
job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/*
Id, City, Temperature
1; New York; 14
3; New York; 20
2; Bristol; 29
1; Rome; 20
2; Rome; -10
2; Bristol; 2
3; New YOrk; 10
1; Rome; 10
*/
private static class TemperatureMapper
extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(";");
String id = tokens[0].trim();
String temperature = tokens[2].trim();
if(id.equals("1"))
{
id="New York - 1";
}
else if(cat.equals("8"))
{
id="Rome - 8";
}
if (temperature.compareTo("Temperature") != 0)
context.write(new Text(id), new IntWritable(Integer.parseInt(temperature)));
}
}
private static class TemperatureReducer //on id city
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int n = 0;
for (IntWritable val : values) {
sum += val.get();
n +=1;
}
result.set(sum/n);
context.write(key, result);
}
}
}
您认为这行得通吗?
我在ID站上开发过滤器的那部分代码,对吗?
还有其他方法可以应用此过滤器吗?
感谢那些想帮助我的人!
更新26/11
@ cricket_007 @ amey-shirke
谢谢 !我试图执行代码,进行建议的更改:
if (id.equals ("1") || id.equals ("8")) {
id = id + "-" + tokens [1];
context.write (new Text (id), new IntWritable (Integer.parseInt (temperature)));
}
还有更多这些:
Configuration conf = new Configuration ();
Job job = Job.getInstance (conf, "word count");
job.setJarByClass(SingleMapperReducer.class);
--
系统执行该过程,但给了我一个空的输出文件。
ps我在一个简单的“wordcount”案例中尝试了hadoop的mapreduce框架,它可以工作。
会发生什么?
谢谢
最佳答案
需要注意的几点:
if(id.equals("1") || id.equals("8")){
id=id + " - "+ tokens[1];
context.write(new Text(id), new IntWritable(Integer.parseInt(temperature)));
}
(temperature.compareTo("Temperature") != 0)