本文介绍了Spark MLlib:为每个数据组构建分类器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用某个组号标记了向量(LabeledPoint-s).对于每个组,我需要创建一个单独的逻辑回归分类器:

I have labeled vectors (LabeledPoint-s) taged by some group number. For every group I need to create a separate Logistic Regression classifier:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object Scratch {

  val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Scratch")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val trainRDD = sc.parallelize(train)
    val modelByGroup = trainRDD.groupByKey().map({case (group, iter) =>
                           (group, new LogisticRegressionWithLBFGS().run(iter))})
  }

}

LogisticRegressionWithLBFGS().run(iter) 无法编译,因为 runRDD 一起工作,而不是与 groupBy 返回.请告知如何构建与输入数据中的组(标签)一样多的分类器.

LogisticRegressionWithLBFGS().run(iter) does not compile because run works with RDD and not with iterator that groupBy returns.Please advise how to build as many classifiers as there are groups (tags) in the input data.

更新 - 证明嵌套的 RDD 迭代不起作用:

Update - demonstrates that nested RDD iteration does not work:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object Scratch {

  val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Scratch")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val trainRDD = sc.parallelize(train)
    val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct
    for (key <- keys) {
    // key is Int here!
      // Get train data for the current group (key):
      val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache()

      /**
       * Which results in org.apache.spark.SparkException:
       * RDD transformations and actions can only be invoked by the driver,
       * not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid
       * because the values transformation and count action cannot be performed inside of the rdd1.map transformation.
       * For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
       */
    }
  }
}

看起来没有办法在其他转换中使用转换,对吗?

Looks like there is no way to use transformations inside other transformations, correct?

推荐答案

如果您在每个组上使用分类器,您就不需要 mllib.Mllib 旨在与分布式集合一起使用(您的集合不是每个工作人员都拥有的本地集合).您可以在 map 函数中的每个组上使用一些本地机器学习库,例如 weka.

If your using classifier on each group you don't need mllib. Mllib is designed to use with distributed sets (your sets are not you have butch of local sets on each worker). You can just use some local machine learning library like weka on each group in map function.

val keys = wholeRDD.map(_._1).distinct.collect

var models = List()
for (key <- keys) {
  val valuesForKey = wholeRDD.filter(_._1 == key)
  // train model
  ...
  models = model::models
}

这篇关于Spark MLlib:为每个数据组构建分类器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-13 19:03