首先打开dfs与yarn。

主要就是三步走:

     1.map
2.reduce
3.driver写驱动将以上两个类关联,运行

一、编写WordcountMapper

package cn.mark.mrWordCount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
//map阶段
//Mapper<keyIn,value,keyOut,value>
//keyOut输出key的类型 <mark,1>,<fun,1>等  按照方法说明来
public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text k = new Text();
        IntWritable v = new IntWritable(1);//设置默认为1
//        1.获取文件中的一行内容(默认都是这样)
//          比如第一行为mark fun
        String line = value.toString();
//        2.切割单词
        String[] words = line.split(" ");
//        3.循环写出
        for (String word: words) {
//            写出的内容要按照上面KEYOU的类型,有Text和IntWritable

            k.set(word);    //k为mark
//            v.set(1);       //v为1
            context.write(k,v);

        }
    }
}

二、编写WordcountReducer

package cn.mark.mrWordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//KEYIN,VALUEIN : 为map阶段输出的(k,v)
//KEYOUT,VALUEOUT : 最终输出的(k,v)

public class WordcountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
//   由于int与IntWritable(为一个类)类型不同,需要特别设置
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
//        1.累加求和
        int sum = 0;
        for (IntWritable value:values) {
            sum=sum+value.get();
        }
//        特别设置v
        v.set(sum);//类型转换
//        2.写出
        context.write(key,v);
    }
}

三、编写WordcountDriver

package cn.mark.mrWordCount;

//三步走:1.map
//       2.reduce
//       3.driver写驱动将以上两个类关联,运行

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

public class WordcountDriver {
    public static void main(@NotNull String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//              此处标红的地方为在运行配置中所加入的参数,main里的参数很少用到,要用就要在运行时配置,如下图
//        1.获取Job对象
        Configuration conf =new Configuration();
        Job job = Job.getInstance(conf);

//        2.设置jar存储位置:有两种:
//         setJar(String jar) : 参数即为要存储的 固定的  路径,不灵活,不能修改
//         setJarByClass(Class<?> cls):Set the Jar by finding where a given class came from
//                                    按类反射,灵活,可动态变化
        job.setJarByClass(WordcountDriver.class);//存储位置根据WordcountDriver类的位置

//        3.关联map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

//        4.(固定格式)设置mapper阶段输出数据的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//        5.(固定格式)设置最终数据输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
//        6.设置输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));//args为main中的形参,输入路径设置为第一个参数,下图
        FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出路径设置为第二个参数,这种设置方式,可以便于打包后在集群上运行!

        System.out.println(args[0]+" 利好六! "+args[1]);
//        7.提交Job
//        job.submit();
        boolean result = job.waitForCompletion(true);//设置为true,提交成功会打印相应信息
   }
}

 多数都是固定写法,会用即可!

顺便一提:如果在集群上运行:

  1.maven install 打包之前的java程序为jar文件并改名为: wc.jar

  2.上传该文件到集群的一个节点上

  2.运行指令:hadoop jar    wc.jar   cn.mark.mrWordCount.WordcountDriver    /user/mark/input/hello.txt     /user/atguigu/output

欢迎留言交流!我的大数据菜鸟交流群,有你更精彩:875910806

01-15 11:19