我正在尝试分析零售商店数据,以解决按城市划分的销售情况,这是我的数据
Date Time City Product-Cat Sale-Value Payment-Mode
2012-01-01 09:20 Fort Worth Women's Clothing 153.57 Visa
2012-01-01 09:00 San Jose Mens Clothing 214.05 Rupee
2012-01-01 09:00 San Diego Music 76.43 Amex
2012-01-01 09:00 New York Cameras 45.76 Visa
现在我要计算所有商店中按产品类别划分的销售额
这是Mapper和reducer和主要类
public class RetailDataAnalysis {
public static class RetailDataAnalysisMapper extends Mapper<Text,Text,Text,Text>{
// when trying with LongWritable Key
public void map(LongWritable key,Text Value,Context context) throws IOException, InterruptedException{
String analyser [] = Value.toString().split(",");
Text productCategory = new Text(analyser[3]);
Text salesPrice = new Text(analyser[4]);
context.write(productCategory, salesPrice);
}
// When trying with Text key
public void map(Text key,Text Value,Context context) throws IOException, InterruptedException{
String analyser [] = Value.toString().split(",");
Text productCategory = new Text(analyser[3]);
Text salesPrice = new Text(analyser[4]);
context.write(productCategory, salesPrice);
}
}
public static class RetailDataAnalysisReducer extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{
String csv ="";
for(Text value:values){
if(csv.length()>0){
csv+= ",";
}
csv+=value.toString();
}
context.write(key, new Text(csv));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length<2){
System.out.println("Usage Retail Data ");
System.exit(2);
}
Job job= new Job(conf,"Retail Data Analysis");
job.setJarByClass(RetailDataAnalysis.class);
job.setMapperClass(RetailDataAnalysisMapper.class);
job.setCombinerClass(RetailDataAnalysisReducer.class);
job.setReducerClass(RetailDataAnalysisReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for(int i=0;i<otherArgs.length-1;++i){
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
我得到的异常(exception)是使用LongWritable Key时,
18/04/11 09:15:40 INFO mapreduce.Job: Task Id : attempt_1523355254827_0008_m_000000_2, Status : FAILED
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)
尝试使用文本键时出现异常
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
请帮助我解决这个问题,我对hadoop非常陌生。
最佳答案
您可能需要不同的输入格式类。默认情况下使用的是TextInputFormat,它将文件逐行分割,并将行号指定为LongWritable
,并将行号指定为Text
。
您可以通过以下方式指定输入格式类:
job.setInputFormatClass(TextInputFormat.class);
在您的情况下,如果您不需要键,只需要值,则可以使用
LongWritable
作为键:public static class RetailDataAnalysisMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text Value, Context context) throws IOException, InterruptedException {
//...
}
}
编辑:
这是使用
LongWritable
作为键的修改后的完整代码:public class RetailDataAnalysis {
public static class RetailDataAnalysisMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text Value, Context context) throws IOException, InterruptedException {
String analyser[] = Value.toString().split(",");
Text productCategory = new Text(analyser[3]);
Text salesPrice = new Text(analyser[4]);
context.write(productCategory, salesPrice);
}
}
public static class RetailDataAnalysisReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String csv = "";
for (Text value : values) {
if (csv.length() > 0) {
csv += ",";
}
csv += value.toString();
}
context.write(key, new Text(csv));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("Usage Retail Data ");
System.exit(2);
}
Job job = new Job(conf, "Retail Data Analysis");
job.setJarByClass(RetailDataAnalysis.class);
job.setMapperClass(RetailDataAnalysisMapper.class);
job.setCombinerClass(RetailDataAnalysisReducer.class);
job.setReducerClass(RetailDataAnalysisReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
另外,如果要通过
,
拆分数据,则数据应为csv,如下所示:2012-01-01 09:20,Fort Worth,Women's Clothing,153.57,Visa
2012-01-01 09:00,San Jose,Mens Clothing,214.05,Rupee
2012-01-01 09:00,San Diego,Music,76.43,Amex
2012-01-01 09:00,New York,Cameras,5.76,Visa
在您的问题中未按空格分隔。
关于java - 如何解决org.apache.hadoop.io.LongWritable无法转换为org.apache.hadoop.io.Text,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49753512/