问题描述
我正在尝试在JSON文件上创建LDA模型.
I am trying to create a LDA model on a JSON file.
使用JSON文件创建spark上下文:
Creating a spark context with the JSON file :
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.getOrCreate()
val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")
显示df
应该显示DataFrame
display(df)
标记文本
import org.apache.spark.ml.feature.RegexTokenizer
// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setPattern("[\\W_]+")
.setMinTokenLength(4) // Filter away tokens with length < 4
.setInputCol("text")
.setOutputCol("tokens")
// Tokenize document
val tokenized_df = tokenizer.transform(df)
这应该显示tokenized_df
display(tokenized_df)
获取stopwords
%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords
可选:将停用词复制到tmp文件夹
Optional: copying the stopwords to the tmp folder
%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords
收集所有stopwords
val stopwords = sc.textFile("/tmp/stopwords").collect()
过滤出stopwords
import org.apache.spark.ml.feature.StopWordsRemover
// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional
.setInputCol("tokens")
.setOutputCol("filtered")
// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)
显示已过滤的df
应该验证stopwords
是否已删除
Displaying the filtered df
should verify the stopwords
got removed
display(filtered_df)
向量化单词出现的频率
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.CountVectorizer
// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.fit(filtered_df)
验证vectorizer
vectorizer.transform(filtered_df)
.select("id", "text","features","filtered").show()
在此之后,我看到了在LDA中安装此vectorizer
的问题.我认为是CountVectorizer
的问题正在提供稀疏向量,但LDA需要密集向量.仍在尝试找出问题所在.
After this I am seeing an issue in fitting this vectorizer
in LDA. The issue which I believe is CountVectorizer
is giving sparse vector but LDA requires dense vector. Still trying to figure out the issue.
这是地图无法转换的例外情况.
Here is the exception where map is not able to convert.
import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
display(ldaDF)
例外:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
LDA有一个有效的示例,不会引发任何问题
There is a working sample for LDA which is not throwing any issue
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF
val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) }
val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)
唯一的区别是在第二个片段中,我们有一个密集矩阵.
The only difference is in the second snippet we are having a dense matrix.
推荐答案
这与稀疏无关.从Spark 2.0.0 ML Transformers
开始不再生成o.a.s.mllib.linalg.VectorUDT
,而是生成o.a.s.ml.linalg.VectorUDT
,并将其本地映射到o.a.s.ml.linalg.Vector
的子类.这些与旧的MLLib API不兼容,而旧的MLLib API正在Spark 2.0.0中弃用.
This has nothing to do with sparsity. Since Spark 2.0.0 ML Transformers
no longer generate o.a.s.mllib.linalg.VectorUDT
but o.a.s.ml.linalg.VectorUDT
and are mapped locally to subclasses of o.a.s.ml.linalg.Vector
. These are not compatible with old MLLib API which is moving towards deprecation in Spark 2.0.0.
您可以使用Vectors.fromML
转换为旧":
You can convert between to "old" using Vectors.fromML
:
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}
OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))
,但如果您已经使用ML转换器,则使用LDA的ML
实现更有意义.
but it make more sense to use ML
implementation of LDA if you already use ML transformers.
为方便起见,您可以使用隐式转换:
For convenience you can use implicit conversions:
import scala.languageFeature.implicitConversions
object VectorConversions {
import org.apache.spark.mllib.{linalg => mllib}
import org.apache.spark.ml.{linalg => ml}
implicit def toNewVector(v: mllib.Vector) = v.asML
implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}
这篇关于在Spark 2.0中访问向量列时出现MatchError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!