我正在尝试分析零售商店数据,以解决按城市划分的销售情况,这是我的数据

Date       Time   City        Product-Cat      Sale-Value Payment-Mode
2012-01-01 09:20  Fort Worth  Women's Clothing 153.57     Visa
2012-01-01 09:00  San Jose    Mens Clothing    214.05     Rupee
2012-01-01 09:00  San Diego   Music            76.43      Amex
2012-01-01 09:00  New York    Cameras           45.76     Visa

现在我要计算所有商店中按产品类别划分的销售额

这是Mapper和reducer和主要类
public class RetailDataAnalysis {

public static class RetailDataAnalysisMapper extends Mapper<Text,Text,Text,Text>{

   // when trying with LongWritable Key
    public void map(LongWritable key,Text Value,Context context) throws IOException, InterruptedException{

        String analyser [] = Value.toString().split(",");
        Text productCategory = new Text(analyser[3]);
        Text salesPrice = new Text(analyser[4]);
        context.write(productCategory, salesPrice);
    }

 // When trying with Text key

    public void map(Text key,Text Value,Context context) throws IOException, InterruptedException{

        String analyser [] = Value.toString().split(",");
        Text productCategory = new Text(analyser[3]);
        Text salesPrice = new Text(analyser[4]);
        context.write(productCategory, salesPrice);
    }


}


public static class RetailDataAnalysisReducer extends Reducer<Text,Text,Text,Text>{

    protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{
        String csv ="";
        for(Text value:values){

            if(csv.length()>0){
                csv+= ",";
            }
            csv+=value.toString();
        }
        context.write(key, new Text(csv));
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf =  new  Configuration();
    String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
    if(otherArgs.length<2){
        System.out.println("Usage Retail Data ");
        System.exit(2);
    }
    Job job= new Job(conf,"Retail Data Analysis");
    job.setJarByClass(RetailDataAnalysis.class);
    job.setMapperClass(RetailDataAnalysisMapper.class);
    job.setCombinerClass(RetailDataAnalysisReducer.class);
    job.setReducerClass(RetailDataAnalysisReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    for(int i=0;i<otherArgs.length-1;++i){
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
    System.exit(job.waitForCompletion(true)?0:1);
  }
}

我得到的异常(exception)是使用LongWritable Key时,
   18/04/11 09:15:40 INFO mapreduce.Job: Task Id : attempt_1523355254827_0008_m_000000_2, Status : FAILED
  Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)

尝试使用文本键时出现异常
   Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)

请帮助我解决这个问题,我对hadoop非常陌生。

最佳答案

您可能需要不同的输入格式类。默认情况下使用的是TextInputFormat,它将文件逐行分割,并将行号指定为LongWritable,并将行号指定为Text

您可以通过以下方式指定输入格式类:

job.setInputFormatClass(TextInputFormat.class);

在您的情况下,如果您不需要键,只需要值,则可以使用LongWritable作为键:
public static class RetailDataAnalysisMapper extends Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text Value, Context context) throws IOException, InterruptedException {
        //...
    }
}

编辑:

这是使用LongWritable作为键的修改后的完整代码:
public class RetailDataAnalysis {

    public static class RetailDataAnalysisMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text Value, Context context) throws IOException, InterruptedException {
            String analyser[] = Value.toString().split(",");
            Text productCategory = new Text(analyser[3]);
            Text salesPrice = new Text(analyser[4]);
            context.write(productCategory, salesPrice);
        }
    }

    public static class RetailDataAnalysisReducer extends Reducer<Text, Text, Text, Text> {

        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String csv = "";
            for (Text value : values) {
                if (csv.length() > 0) {
                    csv += ",";
                }
                csv += value.toString();
            }
            context.write(key, new Text(csv));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.out.println("Usage Retail Data ");
            System.exit(2);
        }
        Job job = new Job(conf, "Retail Data Analysis");
        job.setJarByClass(RetailDataAnalysis.class);
        job.setMapperClass(RetailDataAnalysisMapper.class);
        job.setCombinerClass(RetailDataAnalysisReducer.class);
        job.setReducerClass(RetailDataAnalysisReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

另外,如果要通过,拆分数据,则数据应为csv,如下所示:
2012-01-01 09:20,Fort Worth,Women's Clothing,153.57,Visa
2012-01-01 09:00,San Jose,Mens Clothing,214.05,Rupee
2012-01-01 09:00,San Diego,Music,76.43,Amex
2012-01-01 09:00,New York,Cameras,5.76,Visa

在您的问题中未按空格分隔。

关于java - 如何解决org.apache.hadoop.io.LongWritable无法转换为org.apache.hadoop.io.Text,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49753512/

10-16 22:36