package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class OneSort {
    public static class Map extends Mapper<Object , Text , IntWritable,Text >{
    private static Text goods=new Text();
    private static IntWritable num=new IntWritable();
    public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
    String line=value.toString();
    String arr[]=line.split("\t");
    num.set(Integer.parseInt(arr[1]));
    goods.set(arr[0]);
    context.write(num,goods);
    }
    }
    public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text>{
    private static IntWritable result= new IntWritable();
    public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
        for(Text val:values){
        context.write(key,val);
        }
        }
    }
    public static class IntWritableDecreasingComparator extends    IntWritable.Comparator
    {
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
        {
            return -super.compare(b1, s1, l1, b2, s2, l2);
            }
        }


        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
        Configuration conf=new Configuration();
        Job job =new Job(conf,"OneSort");
        job.setJarByClass(OneSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        Path in=new Path("hdfs://192.168.43.114:9000/mymapreduce3/in/one");
        Path out=new Path("hdfs://192.168.43.114:9000/mymapreduce3/out");
        FileInputFormat.addInputPath(job,in);
        FileOutputFormat.setOutputPath(job,out);
        job.setSortComparatorClass(IntWritableDecreasingComparator.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);

        }
        }

利用mapreduce的wordcount程序进行id的计算,相同id合并并计数,之后 将输出的文件根据次数降序,
因为mapreduce的排序是是默认升序排序,所以需要写排序类重写降序类,最后将输出结果存到hive与mysql中。

今天进行的课堂测试我完成了数据的清洗,并把数据也成功导入到了hive数据库中,这次有一个很大的不足,就是没有在课前将hive配置好,所以用了很长的时间去配置,以后一定会吸取教训,提前做好准备工作。

01-08 10:47
查看更多