数据集albums.csv包含了10万条音乐专辑的数据。主要字段说明如下:

  1. album_title:音乐专辑名称
  2. genre:专辑类型
  3. year_of_pub: 专辑发行年份
  4. num_of_tracks: 每张专辑中单曲数量
  5. num_of_sales:专辑销量
  6. rolling_stone_critic:滚石网站的评分
  7. mtv_critic:全球最大音乐电视网MTV的评分
  8. music_maniac_critic:音乐达人的评分

Map阶段代码

/**
 * @author HuangShen
 * Map阶段代码
 * @date 2021-06-21 11:09
 */
public class AlbumsPro1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        //排除第一行
        if (!split[0].equals("id")) {
            Albums albums = new Albums(
                    Long.parseLong(split[0]),
                    Long.parseLong(split[1]),
                    split[2],
                    split[3],
                    Integer.parseInt(split[4]),
                    Integer.parseInt(split[5]),
                    Integer.parseInt(split[6]),
                    Double.parseDouble(split[7]),
                    Double.parseDouble(split[8]),
                    Double.parseDouble(split[9]));

            //k2 专辑类型 v2 评分集合
            context.write(new Text(albums.getGenre()),new DoubleWritable(albums.getRolling_stone_critic()));

        }
    }
}

Reducer阶段代码

/**
 * @author ShuangShen
 * Reducer阶段代码。
 * @date 2021-06-21 11:10
 */
public class AlbumsPro1Reducer extends Reducer<Text, DoubleWritable, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
        double  count=0.0;
        int i=0;
        for (DoubleWritable value : values) {
            count+=value.get();
            i++;
        }
        DecimalFormat df = new DecimalFormat("#.00");
        context.write(key,new Text(df.format(count/i)+"分"));
    }
}

执行任务

/**
 * @author HuangShen
 * @date 2021-06-21 12:07
 */
public class AlbumsMain extends Configured implements Tool {
    public static void main(String[] args) throws Exception {

        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new AlbumsMain(), args);
        System.out.println("统计完成 请到 http://localhost:50070/explorer.html#/albums_out 进行下载");
        System.exit(run);
    }
    @Override
    public int run(String[] strings) throws Exception {
        //1 创建一个job 任务对象
        Job job = Job.getInstance(super.getConf(), "Albums");
        //防止 jar 包运行出错
        job.setJarByClass(AlbumsMain.class);
        // 2 配置Job 任务对象 (8个步骤)
        job.setInputFormatClass(TextInputFormat.class);
        //第一步:指定Map 文件的读取方式 和读取路径
        TextInputFormat.addInputPath(job, new Path("hdfs://10.158.83.240:9000/albums_input"));
        //第二步 指定Map 阶段的处理方式和数据类型
        System.out.println("请选择抽到的题目:");
        System.out.println("1)\t统计各类型专辑的滚石网站的平均评分:");
        System.out.println("2)\t统计各类型专辑的销量总数。:");
        System.out.println("3)\t统计每年销售的专辑数量。:");
        System.out.println("4)\t统计各类型专辑的MTV平均评分。:");
        System.out.println("5)\t统计各类型专辑的数量。:");
        System.out.println("请输入题号:");
        Scanner scanner = new Scanner(System.in);
        String num  = scanner.nextLine();
         switch (num){
             case "1":{
                 System.out.println("选择了    1)\t统计各类型专辑的滚石网站的平均评分:");
                 creatJob(job, AlbumsPro1Mapper.class,Text.class,DoubleWritable.class,AlbumsPro1Reducer.class,Text.class,DoubleWritable.class);
                 break;
             }
             case "2":{
                 System.out.println("选择了    2)\t统计各类型专辑的销量总数。:");
                 creatJob(job, AlbumsPro2Mapper.class,Text.class, LongWritable.class, AlbumsPro2Reducer.class,Text.class,LongWritable.class);
                 break;
             }
             case "3":{
                 System.out.println("选择了    3)\t统计每年销售的专辑数量。:");
                 creatJob(job, AlbumsPro3Mapper.class,LongWritable.class, LongWritable.class, AlbumsPro3Reducer.class,LongWritable.class,LongWritable.class);
                 break;
             }
             case "4":{
                 System.out.println("选择了    4)\t统计各类型专辑的MTV平均评分。:");
                 creatJob(job, AlbumsPro4Mapper.class,Text.class, DoubleWritable.class, AlbumsPro4Reducer.class,Text.class,DoubleWritable.class);
                 break;
             }
             case "5":{
                 System.out.println("选择了    5)\t统计各类型专辑的数量。:");
                 creatJob(job, AlbumsPro5Mapper.class,Text.class, LongWritable.class, AlbumsPro5Reducer.class,Text.class,LongWritable.class);
                 break;
             }
             default:{
                 System.out.println("输入有误! 请输入题号");
                 break;
             }
         }
        //第八步设置输出类型
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输出的路径
        Path outputDir = new Path("hdfs://10.158.83.240:9000/albums_out");
        TextOutputFormat.setOutputPath(job, outputDir);
        //获取文件系统  目标文件存在就删除
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://10.158.83.240:9000"), new Configuration());
        if (fileSystem.exists(outputDir)) {
            fileSystem.delete(outputDir,true);
        }
        //等待任务结束
        return  job.waitForCompletion(true)?0:1;
    }
    //创建 job
     private  Job creatJob(Job job,Class<? extends Mapper> mapper,Class<?> k1,Class<?> v1,Class<? extends Reducer> reducer,Class<?> k3,Class<?> v3){
         //设置Map 阶段处理方式
         job.setMapperClass(mapper);
         //设置 Map 阶段K2 的数据类型
         job.setMapOutputKeyClass(k1);
         //设置 Map 阶段 V2 的数据类型
         job.setMapOutputValueClass(v1);
         //第三  四 五 六 采用默认方式
         //第七步指定Reduce阶段的处理方式和数据类型
         job.setReducerClass(reducer);
         //设置K3 类型
         job.setMapOutputKeyClass(k3);
         //设置V3 类型
         job.setMapOutputValueClass(v3);
        return  job;

     }
}

完整代码 点击下载

大数据期末考试一篇通 ---10万条音乐专辑的数据分析-LMLPHP

运行效果

大数据期末考试一篇通 ---10万条音乐专辑的数据分析-LMLPHP

大数据期末考试一篇通 ---10万条音乐专辑的数据分析-LMLPHP

06-23 04:47