我已经用某些组号标记了标记的向量(LabeledPoint-s)。对于每个组,我需要创建一个单独的Logistic回归分类器:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val modelByGroup = trainRDD.groupByKey().map({case (group, iter) =>
(group, new LogisticRegressionWithLBFGS().run(iter))})
}
}
LogisticRegressionWithLBFGS().run(iter)
无法编译,因为run
与RDD
一起使用,而不与groupBy
返回的迭代器一起使用。请建议如何构建与输入数据中的组(标签)一样多的分类器。
更新-演示嵌套的RDD迭代不起作用:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct
for (key <- keys) {
// key is Int here!
// Get train data for the current group (key):
val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache()
/**
* Which results in org.apache.spark.SparkException:
* RDD transformations and actions can only be invoked by the driver,
* not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid
* because the values transformation and count action cannot be performed inside of the rdd1.map transformation.
* For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
*/
}
}
}
看起来没有办法在其他转换中使用转换,对吗?
最佳答案
如果在每个组上使用分类器,则不需要mllib。 Mllib设计为与分布式集一起使用(您的集不是每个工作人员上都有大量本地集)。您可以在 map 函数的每个组上仅使用一些本地机器学习库,例如weka。
编辑:
val keys = wholeRDD.map(_._1).distinct.collect
var models = List()
for (key <- keys) {
val valuesForKey = wholeRDD.filter(_._1 == key)
// train model
...
models = model::models
}