问题描述
我正在尝试使用
- Google Dataproc + Spark
- Google Bigquery
- 使用 Spark ML KMeans+pipeline 创建作业
如下:
在 bigquery 中创建基于用户级别的特征表
示例:特征表的样子
Create user level based feature table in bigquery
Example: How the feature table looks like
#Print the cluster centers:
for model in models:
print vars(model)
print model.stages[0].clusterCenters()
print model.extractParamMap()
输出:[数组([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25820206e-04,06e-06e-06,706e-06,706e-03, 0.000000000e+00,03,1.60941306e-02],阵列([2.36494105e-06,1.87719732e-02,3.73829379e-03,0.00000000e + 00,4.20724542e-02,2.28675684e-02,0.00000000e + 00,5.45002249e-06, 1.17331153e-02, 1.24364600e-02])
这里是问题和需要帮助的列表:
Here it the list of questions and need help with:
- 我得到一个列表,其中只有 2 个聚类中心作为所有模型的数组,
- 当我尝试访问管道时,KMeans 模型似乎默认为 k=2?为什么会发生这种情况?
- 最后一个循环应该访问 pipelineModel 和第 0 阶段并运行 clusterCenter() 方法?这是正确的方法吗?
- 为什么会出现数据未缓存的错误?
- 这违背了使用管道并行运行 KMeans 模型和模型选择的目的,但是我尝试了以下代码:
#computeError def computeCost(model, rdd):` """Return the K-means cost (sum of squared distances of points to their nearest center) for this model on the given data.""" cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector), [_convert_to_vector(c) for c in model.clusterCenters()]) return cost cost= np.zeros(len(paramMap)) for i in range(len(paramMap)): cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data) print cost
这会在循环结束时打印出以下内容:
This prints out the following at the end of the loop:
[ 634035.00294687 634035.00294687 634035.00294687 634035.00294687634035.00294687 634035.00294687 634035.00294687 634035.00294687634035.00294687 634035.00294687 634035.00294687 634035.00294687634035.00294687 634035.00294687 634035.00294687 634035.00294687]
- 每个模型计算的成本/误差是否相同?再次无法使用正确的参数访问管道模型.
非常感谢任何帮助/指导!谢谢!
Any help/ guidance is much appreciated! Thanks!
推荐答案
您的参数定义不正确.它应该从特定参数映射到值,而不是从任意名称映射.你得到
k
等于 2,因为你传递的参数没有被利用,而且每个模型都使用完全相同的默认参数.Your param is not properly defined. It should map from the specific parameters to the values, not from arbitrary names. You get
k
equal 2 because parameters you pass are not utilized and every model uses exactly the same default parameters.让我们从示例数据开始:
Lets start with example data:
import numpy as np from pyspark.mllib.linalg import Vector df = (sc.textFile("data/mllib/kmeans_data.txt") .map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" "))) .zipWithIndex() .toDF(["features", "id"]))
和一个
Pipeline
:from pyspark.ml.clustering import KMeans from pyspark.ml import Pipeline km = KMeans() pipeline = Pipeline(stages=[km])
如上所述,参数映射应使用特定参数作为键.例如:
As mentioned above parameter map should use specific parameters as the keys. For example:
params = [ {km.k: 2, km.initMode: "k-means||"}, {km.k: 3, km.initMode: "k-means||"}, {km.k: 4, km.initMode: "k-means||"} ] models = pipeline.fit(df, params=params) assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]
注意事项:
- 正确的
initMode
用于 K-means||是k-means||
不是kmeans||
. - 在流水线中使用参数映射并不意味着模型是并行训练的.Spark 在数据而非参数上并行化训练过程.这只不过是一种方便的方法.
- 您收到有关未缓存数据的警告,因为 K-Means 的实际输入不是
DataFrame
而是转换后的 RDD.
- correct
initMode
for K-means|| isk-means||
notkmeans||
. - using parameter map in a Pipeline doesn't mean that model are trained in parallel. Spark parallelizes training process over data not over params. It is nothing more than a convenience method.
- you get the warning about not cached data because actual input to K-Means is not a
DataFrame
but transformed RDD.
这篇关于如何将参数传递给 ML Pipeline.fit 方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!