昨天较完整地搭建了一个Hadoop环境,并仔细研究了几个配置参数。
直到晚上,整个环境跑起来,web监控等也正常。于是用Scala写了一个word count的测试程序。
Scala可以很好地调用Java代码,所以,唯一要做的就是配置好编译环境,并把scala-library.jar和
应用程序一并打包。通过简单的探索,就得到了可以运行的配置。
这里给出简单的编译和打包方法,供参考。

编译脚本:这个脚本的主要作用是设置正确的classpath。
  1. #!/bin/bash

  2. # fast scala compiler for hadoop

  3. # accept every options fsc supported, except '-classpath'

  4. class_path=.:$JAVA_HOME/lib/tools.jar:$HADOOP_HOME/hadoop-2-core.jar
  5. class_path=${class_path}:$HADOOP_HOME/lib/commons-cli-2.0-SNAPSHOT.jar
  6. class_path=${class_path}:$HADOOP_HOME/lib/commons-codec-1.3.jar
  7. class_path=${class_path}:$HADOOP_HOME/lib/commons-httpclient-3.0.1.jar
  8. class_path=${class_path}:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
  9. class_path=${class_path}:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar
  10. class_path=${class_path}:$HADOOP_HOME/lib/commons-net-1.4.1.jar
  11. class_path=${class_path}:$HADOOP_HOME/lib/derbyclient.jar
  12. class_path=${class_path}:$HADOOP_HOME/lib/derby.jar
  13. class_path=${class_path}:$HADOOP_HOME/lib/hadoop-2-baidu-sos.jar
  14. class_path=${class_path}:$HADOOP_HOME/lib/hsqldb-1.8.0.10.jar
  15. class_path=${class_path}:$HADOOP_HOME/lib/jets3t-0.6.1.jar
  16. class_path=${class_path}:$HADOOP_HOME/lib/jetty-6.1.14.jar
  17. class_path=${class_path}:$HADOOP_HOME/lib/jetty-util-6.1.14.jar
  18. class_path=${class_path}:$HADOOP_HOME/lib/json-org.jar
  19. class_path=${class_path}:$HADOOP_HOME/lib/junit-3.8.1.jar
  20. class_path=${class_path}:$HADOOP_HOME/lib/kfs-0.2.2.jar
  21. class_path=${class_path}:$HADOOP_HOME/lib/log4j-1.2.15.jar
  22. class_path=${class_path}:$HADOOP_HOME/lib/oro-2.0.8.jar
  23. class_path=${class_path}:$HADOOP_HOME/lib/servlet-api.jar
  24. class_path=${class_path}:$HADOOP_HOME/lib/slf4j-api-1.4.3.jar
  25. class_path=${class_path}:$HADOOP_HOME/lib/slf4j-log4j12-1.4.3.jar
  26. class_path=${class_path}:$HADOOP_HOME/lib/xmlenc-0.52.jar
  27. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/commons-el.jar
  28. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/jasper-compiler.jar
  29. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/jasper-runtime.jar
  30. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/jsp-api.jar

  31. fsc -classpath ${class_path} "$@"
下面这个脚本主要是打包成一个.jar文件:
  1. #!/bin/bash

  2. # package up a jar file to sumit for scala
  3. if [ $# -ne 2 ]; then
  4.     echo "Usage: `basename $0` jar_file classes_dir"
  5.     exit 1
  6. fi

  7. jarfile=$1
  8. classes_dir=$2

  9. if [ -e ${classes_dir}/lib ]; then
  10.     echo "adding libraries: "
  11.     ls ${classes_dir}/lib
  12. else
  13.     mkdir ${classes_dir}/lib
  14. fi

  15. cp $SCALA_HOME/lib/scala-library.jar ${classes_dir}/lib/ &&
  16. jar -cvf ${jarfile}.jar -C ${classes_dir} .
最后,用于测试的Scala代码
  1. package net.liangkun
  2.             
  3. import java.io.IOException
  4. import java.util._
  5.                 
  6. import org.apache.hadoop.fs.Path
  7. import org.apache.hadoop.conf._
  8. import org.apache.hadoop.io._
  9. import org.apache.hadoop.mapred._
  10. import org.apache.hadoop.util._
  11.         
  12. class Map extends MapReduceBase
  13. with Mapper[LongWritable, Text, Text, IntWritable] {
  14.     private val one = new IntWritable(1);
  15.     private val word = new Text();
  16.         
  17.     def map(key: LongWritable, value: Text,
  18.         output: OutputCollector[Text, IntWritable],
  19.         reporter: Reporter
  20.     ) {
  21.         
  22.         val line = value.toString
  23.         val tokenizer = new StringTokenizer(line)
  24.         while(tokenizer.hasMoreTokens) {
  25.             word.set(tokenizer.nextToken)
  26.             output.collect(word, one)
  27.         }
  28.     }
  29. }

  30. class Reduce extends MapReduceBase
  31. with Reducer[Text, IntWritable, Text, IntWritable] {
  32.     def reduce(key: Text, values: Iterator[IntWritable],
  33.         output: OutputCollector[Text, IntWritable],
  34.         reporter: Reporter
  35.     ) {
  36.         output.collect(key, new IntWritable(count(0, values)))

  37.         def count(sum: Int, vs: Iterator[IntWritable]): Int =
  38.             if(vs.hasNext)
  39.                 count(sum + vs.next.get, vs)
  40.             else
  41.                 sum
  42.     }
  43. }

  44. object WordCount {
  45.     def main(args: Array[String]) {
  46.         val conf = new JobConf(this.getClass)
  47.         conf.setJobName("WordCount")
  48.         conf.setOutputKeyClass(classOf[Text])
  49.         conf.setOutputValueClass(classOf[IntWritable])

  50.         conf.setMapperClass(classOf[Map])
  51.         conf.setCombinerClass(classOf[Reduce])
  52.         conf.setReducerClass(classOf[Reduce])
  53.         conf.setInputFormat(classOf[TextInputFormat])
  54.         conf.setOutputFormat(classOf[TextOutputFormat[Text, IntWritable]])

  55.         FileInputFormat.setInputPaths(conf, new Path(args(0)))
  56.         FileOutputFormat.setOutputPath(conf, new Path(args(1)))

  57.         JobClient.runJob(conf)
  58.     }
  59. }


09-25 22:40