基于mllib的spark中文文本分类(朴素贝叶斯)

本文参考博客 https://blog.csdn.net/github_36326955/article/details/54891204

首先介绍一下文本分类的大致流程

  • 预处理
  • 中文分词
  • 构建词向量空间
  • 训练模型
  • 用训练好的模型进行预测
  • 通过预测结果对模型进行评估

预处理

  • 语料库
  • 文本格式转换

    语料库

    要进行文本分类,首先要有文本,复旦中文文本语料库

    百度云盘链接:https://pan.baidu.com/s/1nKAmM8EuF54sgtMGhZN9tw

    密码 ns8e

    文本格式转换

    由于下载的语料库是GBK格式的,为了处理方便,需要转成UTF-8的格式,转换代码如下
    ***
package com.classification.text

import java.io.File

import org.apache.commons.io.FileUtils //Java的文件处理工具包

object GBK2UTF {

  def GBK2UTF8(GBKCorpusPath: String, UTF8CorpusPath: String): Unit = {
    //打开根目录
    val GBKCorpusDir: Array[File] = new File(GBKCorpusPath).listFiles()
    //对应的UTF-8格式的目录是否存在,不存在新建
    val UTFCorpusDir: File = new File(UTF8CorpusPath);
    if (!UTFCorpusDir.exists()) {
      UTFCorpusDir.mkdir()
    }

    //打开类别目录
    for (gbkClassDir: File <- GBKCorpusDir) {
      //记录目录路径,为创建UTF-8格式的文件夹和文件提供路径
      val UTFClassDirPath: String = UTF8CorpusPath + gbkClassDir.getName
      //UTF-8格式的类别目录是否存在,不存在新建
      val UTFClassDir: File = new File(UTFClassDirPath)
      if (!UTFClassDir.exists()) {
        UTFClassDir.mkdir()
      }

      for (gbkText: File <- gbkClassDir.listFiles()) {
        //将文件以GBK格式读取为字符串,转为UTF-8格式后写入新文件
        FileUtils.write(new File(UTFClassDirPath + "/" + gbkText),
          FileUtils.readFileToString(gbkText, "GBK"), "UTF-8")
      }
    }

  }


  def main(args: Array[String]): Unit = {
    GBK2UTF8("./train_corpus/", "./utf_train_corpus/")
    GBK2UTF8("./test_corpus/", "./utf_test_corpus/")
  }

}

中文分词

  • 分词工具介绍
  • 选择Ansj作为分词工具,以及注意事项
  • Ansj中文分词实现

分词工具介绍

中文分词的理论部分很多博客都有介绍,这里主要介绍代码实现(理论咱现在也不会,就会调用API)。如果用Python,分词一般选择jieba分词,jieba分词也有Java版的,但是用起来不是很方便。如果用Java或Scala,就要选择Java版的中文分词工具,我用的主要是Ansj和HanLP,两个分词工具可以百度,感觉HanLP比较强大。

选择Ansj作为分词工具,以及注意事项

本次实验选择Ansj作为中文文本分类的工具。注意:如果需要添加自定义词典,词典内的空白都必须是tab,但是如果使用Idea编辑词典文件,tab键默认为4个空格(这种细节注意不到会让人崩溃)

Ansj中文分词的实现


package com.classification.text

import java.io.File
import java.util

import org.ansj.domain.Result
import org.ansj.recognition.impl.StopRecognition
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.commons.io.FileUtils

//scala集合与java集合转换的包,按住Ctrl点进源码,可以查看转换规则
import scala.collection.JavaConversions._


object WordSplit {

  //分词函数
  def corpusSegment(utfCorpusPath: String, utfSegmentPath: String, trainLabelListPath: String, trainSegmentPath: String): Unit = {
    //计数用,统计样本个数
    var count = 0
    //存放标签的Java数组,这里使用java数组是为了方便写入文件
    val labelList = new util.ArrayList[String]()
    //存放分词后字符串的数组,同样为了方便写入文件
    val contextList = new util.ArrayList[String]()
    //打开根目录
    val corpusDir: Array[File] = new File(utfCorpusPath).listFiles()
    //类别目录
    for (corpusClassDir: File <- corpusDir) {
      //每一个文件
      for (utfText <- corpusClassDir.listFiles()) {

        count = count + 1
        //调用分词方法
        val textSeg: Result = ToAnalysis.parse(FileUtils.readFileToString(utfText)
          .replace("\r\n", "") //去除换行和回车
          .replace("\r", "") //去除单独的回车
          .replace("\n", "") //去除单独的换行
          .replace(" ", "") //去除空格
          .replace("\u3000", "") //去除全角空格(中文空格)
          .replace("\t", "") //去除制表符
          .replaceAll(s"\\pP|\\pS|\\pC|\\pN|\\pZ", "") //通过设置Unicode类别的相关正则去除符号
          .trim
        )
        //读取停用词,就是一些对分类没有作用的词,去除可以对特征向量降维
        val stopWordList: Seq[String] = FileUtils.readFileToString(new File("./stopWords/stop_word_chinese.txt"))
          .split("\r\n").toSeq
        //新建停用词对象
        val filter = new StopRecognition()
        //加载停用词列表
        filter.insertStopWords(stopWordList)
        //去除停用词
        val result: Result = textSeg.recognition(filter)

        /**
         *这里如果将每篇文章的分词单独写入一个文件,则在构建词向量时,spark
         * 就要分别读取每篇文章的分词,而spark每读一个文件,就会就会产生一个RDD,
         * 这样读取所有文本的分词就会产生巨量的RDD,这时把这些分词合并到一个集合中(巨量的RDD
         * 合并成一个RDD)时,spark在构建DAG时就会爆掉(亲身经历,当时用的时RDD的union方法)
         */

        //将分词内容加入列表
        contextList.add(result.toStringWithOutNature)
        //将标签加入列表,标签的顺序和文本分词后的顺序是对应的
        labelList.add(corpusClassDir.getName)

      }
    }
    println(count)
    //将分词写入文件
    FileUtils.writeLines(new File(trainSegmentPath), "UTF-8", contextList)
    //将文本标签写入文件
    FileUtils.writeLines(new File(trainLabelListPath), "UTF-8", labelList)

  }

  def main(args: Array[String]): Unit = {
    //这里该了一些目录结构,对代码的功能没有影响
    corpusSegment("./train/utf_train_corpus/", "./train/utf_train_segment/",
      "./train/train_label_list.txt", "./train/train_seg.txt")

    corpusSegment("./test/utf_test_corpus/", "./test/utf_test_segment/",
      "./test/test_label_list.txt", "./test/test_seg.txt")
  }
}

构建词向量空间、训练、预测、评估

分词完成后就可以构建词向量、训练、预测、评估了
***

package com.classification.text

import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.feature.{HashingTF, IDF, IDFModel}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Classification {
  //读取分词文件和标签文件,两个文件读取后都是RDD形式,元组的形式返回
  def getDocumentsAndLabels(sc: SparkContext, segPath: String, labelListPath: String): (RDD[Seq[String]], Iterator[String]) = {
    (sc.textFile(segPath).map(_.split(",").toSeq), sc.textFile(labelListPath).collect().toSeq.toIterator)
  }

  //训练函数
  def train(sc: SparkContext, trainSegPath: String, trainLabelListPath: String): NaiveBayesModel = {
    //读取训练集的分词和标签
    val (documents, labelList) = getDocumentsAndLabels(sc, trainSegPath, trainLabelListPath)
    //新建HashingTF类
    val hashingTF: HashingTF = new HashingTF()
    //计算TF值
    val tf: RDD[Vector] = hashingTF.transform(documents)

    //缓存,为了计算快,对功能没有影响
    tf.cache()
    //计算IDF值
    val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
    //计算TF-IDF值
    val tfIdf: RDD[Vector] = idf.transform(tf)
    //将TFIDF数据,结合标签,转为LabelPoint数据,LabelPoint是训练函数NaiveBayes.train()的输入数据格式
    val training: RDD[LabeledPoint] = tfIdf.map {
      vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
    }
    //训练函数训练,
    NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")
  }

  //测试函数,参数model为训练集训练后的模型
  def test(sc: SparkContext, testSegPath: String, testLabelListPath: String, model: NaiveBayesModel): Double = {

    //读取测试数据集分词和标签数据
    val (documents, labelList) = getDocumentsAndLabels(sc, testSegPath, testLabelListPath)

    //和训练的步骤差不多
    val hashingTF: HashingTF = new HashingTF()
    val tf: RDD[Vector] = hashingTF.transform(documents)
    tf.cache()
    val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
    val tfIdf: RDD[Vector] = idf.transform(tf)
    val test: RDD[LabeledPoint] = tfIdf.map {
      vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
    }
    //预测
    val predictionAndLabel: RDD[(Double, Double)] = test.map((p: LabeledPoint) => (model.predict(p.features), p.label))
    //计算准确率
    1.0 * predictionAndLabel.filter((x: (Double, Double)) => x._1 == x._2).count() / test.count()
  }

  //获取标签对应的Double数值,将标签中的数组作为标签对应的数值
  //C11Space -> 11.0
  def getDoubleOfLabel(label: String): Double = {
    label.split("-")(0).tail.toDouble
  }

  def main(args: Array[String]): Unit = {
    //新建spark上下文
    val conf: SparkConf = new SparkConf().setAppName("Classification").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    //调用处理函数
    println(test(sc, "./test/test_seg.txt",
      "./test/test_label_list.txt",
      train(sc,
        "./train/train_seg.txt",
        "./train/train_label_list.txt"
      )
    )
    )
  }
}

到此分词的步骤就结束了,要想提高分词的准确率可以尝试不同的分词工具和文本分类算法

01-08 05:30
查看更多