我正在使用带有pyspark的Spark DataFrame
模型降低PCA
的维数(使用spark
ml
库),如下所示:
pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)
其中data
是Spark DataFrame
,其中一列标记为features
,这是3维的DenseVector
:data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')
拟合后,我转换数据:transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))
我的问题是:如何提取此PCA的特征 vector ?我如何计算他们解释了多少差异? 最佳答案
[更新:从Spark 2.2开始,PySpark中都可以使用PCA和SVD-请参见JIRA票证SPARK-6227和Sparkt ML 2.2的PCA和PCAModel;以下原始答案仍适用于旧版Spark。]
好吧,这似乎令人难以置信,但是实际上没有办法从PCA分解中提取此类信息(至少从Spark 1.5开始)。但是,同样有许多类似的“投诉”,例如here,因为它们无法从CrossValidatorModel
中提取最佳参数。
幸运的是,几个月前,我参加了AMPLab(Berkeley)和Databricks(即Spark的创建者)的'Scalable Machine Learning' MOOC,在其中,我们“手工”实现了完整的PCA管道,作为作业的一部分。从那时起,我就修改了我的函数(放心,我功不可没:-),以便将数据框作为输入(而不是RDD的)与您的格式相同(即,包含数字特征的DenseVectors
行) 。
我们首先需要定义一个中间函数estimatedCovariance
,如下所示:
import numpy as np
def estimateCovariance(df):
"""Compute the covariance matrix for a given dataframe.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input dataframe.
"""
m = df.select(df['features']).map(lambda x: x[0]).mean()
dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean
return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
然后,我们可以编写一个主要的
pca
函数,如下所示:from numpy.linalg import eigh
def pca(df, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(df)
col = cov.shape[1]
eigVals, eigVecs = eigh(cov)
inds = np.argsort(eigVals)
eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
components = eigVecs[0:k]
eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvals
score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
# Return the `k` principal components, `k` scores, and all eigenvalues
return components.T, score, eigVals
测试
让我们首先使用Spark ML PCA documentation中的示例数据(将它们修改为
DenseVectors
),使用现有方法查看结果: from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
model.transform(df).collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
然后,使用我们的方法:
comp, score, eigVals = pca(df)
score.collect()
[array([ 1.64857282, 4.0132827 ]),
array([-4.64510433, 1.11679727]),
array([-6.42888054, 5.33795143])]
让我强调一下,我们不要在我们定义的函数中使用任何
collect()
方法-score
是RDD
,应该正确。注意,第二列的符号与现有方法得出的符号完全相反。但这不是问题:根据由Hastie&Tibshirani合着的第An Introduction to Statistical Learning(可免费下载)一书。 382
最后,既然我们有可用的特征值,那么为解释的方差百分比编写一个函数很简单:
def varianceExplained(df, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(df, k)
return sum(eigenvalues[0:k])/sum(eigenvalues)
varianceExplained(df,1)
# 0.79439325322305299
作为测试,我们还检查了示例数据中解释的方差是否为1.0(对于k = 5)(因为原始数据是5维的):
varianceExplained(df,5)
# 1.0
这应该可以有效地完成您的工作随时提出您可能需要的任何澄清。
[使用Spark 1.5.0和1.5.1开发和测试]
关于apache-spark - Pyspark和PCA:如何提取此PCA的特征向量?我如何计算他们解释了多少差异?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33428589/