问题描述
我有这样的DenseVector
RDD
>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]
我想将其转换为Dataframe
.我这样尝试过
I want to convert this into a Dataframe
. I tried like this
>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()
它给出了这样的错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
schema = _infer_schema(first)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>
旧解决方案
frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))
编辑1-可复制代码
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')
sentenceData = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()
vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
推荐答案
您不能直接转换RDD[Vector]
.应该将其映射到对象的RDD
,这些对象可以解释为structs
,例如RDD[Tuple[Vector]]
:
You cannot convert RDD[Vector]
directly. It should be mapped to a RDD
of objects which can be interpreted as structs
, for example RDD[Tuple[Vector]]
:
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
否则,Spark将尝试转换对象__dict__
并创建使用不受支持的NumPy数组作为字段.
Otherwise Spark will try to convert object __dict__
and create use unsupported NumPy array as a field.
from pyspark.ml.linalg import DenseVector
from pyspark.sql.types import _infer_schema
v = DenseVector([1, 2, 3])
_infer_schema(v)
TypeError Traceback (most recent call last)
...
TypeError: not supported type: <class 'numpy.ndarray'>
vs.
_infer_schema((v, ))
StructType(List(StructField(_1,VectorUDT,true)))
注释:
-
在Spark 2.0中,您必须使用正确的本地类型:
In Spark 2.0 you have to use correct local types:
- 使用基于
-
pyspark.ml.linalg
. 使用基于 -
pyspark.mllib.linalg
.
DataFrame
的pyspark.ml
API时的RDD
的pyspark.mllib
API时的pyspark.ml.linalg
when workingDataFrame
basedpyspark.ml
API.pyspark.mllib.linalg
when workingRDD
basedpyspark.mllib
API.
这两个名称空间不再兼容,需要进行显式转换(例如如何从org.apache.spark.mllib.linalg进行转换.VectorUDT转换为ml.linalg.VectorUDT ).
These two namespaces can no longer compatible and require explicit conversions (for example How to convert from org.apache.spark.mllib.linalg.VectorUDT to ml.linalg.VectorUDT).
编辑中提供的代码与原始问题中的代码不相同.您应该意识到tuple
和list
的语义不相同.如果将向量映射为配对,请使用tuple
并直接转换为DataFrame
:
Code provided in the edit is not equivalent to the one from the original question. You should be aware that tuple
and list
don't have the same semantics. If you map vector to pair use tuple
and convert directly to DataFrame
:
tfidf.rdd.map(
lambda row: (row[0], DenseVector(row[1].toArray()))
).toDF()
使用tuple
(产品类型)也适用于嵌套结构,但我怀疑这是您想要的:
using tuple
(product type) would work for nested structure as well but I doubt this is what you want:
(tfidf.rdd
.map(lambda row: (row[0], DenseVector(row[1].toArray())))
.map(lambda x: (x, ))
.toDF())
在顶层row
之外的其他任何地方,
list
都被解释为ArrayType
.
list
at any other place than the top level row
is interpreted as an ArrayType
.
使用UDF进行转换要干净得多( Spark Python:Standard scaler error'不支持.. .SparseVector" ).
It is much cleaner to use an UDF for conversion (Spark Python: Standard scaler error "Do not support ... SparseVector").
这篇关于如何在pyspark中将密集向量的RDD转换为DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!