数据集albums.csv包含了10万条音乐专辑的数据。主要字段说明如下:
- album_title:音乐专辑名称
- genre:专辑类型
- year_of_pub: 专辑发行年份
- num_of_tracks: 每张专辑中单曲数量
- num_of_sales:专辑销量
- rolling_stone_critic:滚石网站的评分
- mtv_critic:全球最大音乐电视网MTV的评分
- music_maniac_critic:音乐达人的评分
Map阶段代码
/**
* @author HuangShen
* Map阶段代码
* @date 2021-06-21 11:09
*/
public class AlbumsPro1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
//排除第一行
if (!split[0].equals("id")) {
Albums albums = new Albums(
Long.parseLong(split[0]),
Long.parseLong(split[1]),
split[2],
split[3],
Integer.parseInt(split[4]),
Integer.parseInt(split[5]),
Integer.parseInt(split[6]),
Double.parseDouble(split[7]),
Double.parseDouble(split[8]),
Double.parseDouble(split[9]));
//k2 专辑类型 v2 评分集合
context.write(new Text(albums.getGenre()),new DoubleWritable(albums.getRolling_stone_critic()));
}
}
}
Reducer阶段代码
/**
* @author ShuangShen
* Reducer阶段代码。
* @date 2021-06-21 11:10
*/
public class AlbumsPro1Reducer extends Reducer<Text, DoubleWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double count=0.0;
int i=0;
for (DoubleWritable value : values) {
count+=value.get();
i++;
}
DecimalFormat df = new DecimalFormat("#.00");
context.write(key,new Text(df.format(count/i)+"分"));
}
}
执行任务
/**
* @author HuangShen
* @date 2021-06-21 12:07
*/
public class AlbumsMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new AlbumsMain(), args);
System.out.println("统计完成 请到 http://localhost:50070/explorer.html#/albums_out 进行下载");
System.exit(run);
}
@Override
public int run(String[] strings) throws Exception {
//1 创建一个job 任务对象
Job job = Job.getInstance(super.getConf(), "Albums");
//防止 jar 包运行出错
job.setJarByClass(AlbumsMain.class);
// 2 配置Job 任务对象 (8个步骤)
job.setInputFormatClass(TextInputFormat.class);
//第一步:指定Map 文件的读取方式 和读取路径
TextInputFormat.addInputPath(job, new Path("hdfs://10.158.83.240:9000/albums_input"));
//第二步 指定Map 阶段的处理方式和数据类型
System.out.println("请选择抽到的题目:");
System.out.println("1)\t统计各类型专辑的滚石网站的平均评分:");
System.out.println("2)\t统计各类型专辑的销量总数。:");
System.out.println("3)\t统计每年销售的专辑数量。:");
System.out.println("4)\t统计各类型专辑的MTV平均评分。:");
System.out.println("5)\t统计各类型专辑的数量。:");
System.out.println("请输入题号:");
Scanner scanner = new Scanner(System.in);
String num = scanner.nextLine();
switch (num){
case "1":{
System.out.println("选择了 1)\t统计各类型专辑的滚石网站的平均评分:");
creatJob(job, AlbumsPro1Mapper.class,Text.class,DoubleWritable.class,AlbumsPro1Reducer.class,Text.class,DoubleWritable.class);
break;
}
case "2":{
System.out.println("选择了 2)\t统计各类型专辑的销量总数。:");
creatJob(job, AlbumsPro2Mapper.class,Text.class, LongWritable.class, AlbumsPro2Reducer.class,Text.class,LongWritable.class);
break;
}
case "3":{
System.out.println("选择了 3)\t统计每年销售的专辑数量。:");
creatJob(job, AlbumsPro3Mapper.class,LongWritable.class, LongWritable.class, AlbumsPro3Reducer.class,LongWritable.class,LongWritable.class);
break;
}
case "4":{
System.out.println("选择了 4)\t统计各类型专辑的MTV平均评分。:");
creatJob(job, AlbumsPro4Mapper.class,Text.class, DoubleWritable.class, AlbumsPro4Reducer.class,Text.class,DoubleWritable.class);
break;
}
case "5":{
System.out.println("选择了 5)\t统计各类型专辑的数量。:");
creatJob(job, AlbumsPro5Mapper.class,Text.class, LongWritable.class, AlbumsPro5Reducer.class,Text.class,LongWritable.class);
break;
}
default:{
System.out.println("输入有误! 请输入题号");
break;
}
}
//第八步设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出的路径
Path outputDir = new Path("hdfs://10.158.83.240:9000/albums_out");
TextOutputFormat.setOutputPath(job, outputDir);
//获取文件系统 目标文件存在就删除
FileSystem fileSystem = FileSystem.get(new URI("hdfs://10.158.83.240:9000"), new Configuration());
if (fileSystem.exists(outputDir)) {
fileSystem.delete(outputDir,true);
}
//等待任务结束
return job.waitForCompletion(true)?0:1;
}
//创建 job
private Job creatJob(Job job,Class<? extends Mapper> mapper,Class<?> k1,Class<?> v1,Class<? extends Reducer> reducer,Class<?> k3,Class<?> v3){
//设置Map 阶段处理方式
job.setMapperClass(mapper);
//设置 Map 阶段K2 的数据类型
job.setMapOutputKeyClass(k1);
//设置 Map 阶段 V2 的数据类型
job.setMapOutputValueClass(v1);
//第三 四 五 六 采用默认方式
//第七步指定Reduce阶段的处理方式和数据类型
job.setReducerClass(reducer);
//设置K3 类型
job.setMapOutputKeyClass(k3);
//设置V3 类型
job.setMapOutputValueClass(v3);
return job;
}
}