本文介绍了如何将参数传递给 ML Pipeline.fit 方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用

  • Google Dataproc + Spark
  • Google Bigquery
  • 使用 Spark ML KMeans+pipeline 创建作业

如下:

  1. 在 bigquery 中创建基于用户级别的特征表
    示例:特征表的样子

  1. Create user level based feature table in bigquery
    Example: How the feature table looks like

#Print the cluster centers:
for model in models:
    print vars(model)
    print model.stages[0].clusterCenters()
    print model.extractParamMap()

输出:[数组([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25820206e-04,06e-06e-06,706e-06,706e-03, 0.000000000e+00,03,1.60941306e-02],阵列([2.36494105e-06,1.87719732e-02,3.73829379e-03,0.00000000e + 00,4.20724542e-02,2.28675684e-02,0.00000000e + 00,5.45002249e-06, 1.17331153e-02, 1.24364600e-02])

这里是问题和需要帮助的列表:

Here it the list of questions and need help with:

  • 我得到一个列表,其中只有 2 个聚类中心作为所有模型的数组,
    • 当我尝试访问管道时,KMeans 模型似乎默认为 k=2?为什么会发生这种情况?
    • 最后一个循环应该访问 pipelineModel 和第 0 阶段并运行 clusterCenter() 方法?这是正确的方法吗?
    • 为什么会出现数据未缓存的错误?
    • 这违背了使用管道并行运行 KMeans 模型和模型选择的目的,但是我尝试了以下代码:
    #computeError
    def computeCost(model, rdd):`
    """Return the K-means cost (sum of squared distances of
     points to their nearest center) for this model on the given data."""
        cost = callMLlibFunc("computeCostKmeansModel",
                              rdd.map(_convert_to_vector),
                   [_convert_to_vector(c) for c in model.clusterCenters()])
        return cost
    
    cost= np.zeros(len(paramMap))
    
    for i in range(len(paramMap)):
        cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
    print cost
    

    这会在循环结束时打印出以下内容:

    This prints out the following at the end of the loop:

    [ 634035.00294687 634035.00294687 634035.00294687 634035.00294687634035.00294687 634035.00294687 634035.00294687 634035.00294687634035.00294687 634035.00294687 634035.00294687 634035.00294687634035.00294687 634035.00294687 634035.00294687 634035.00294687]

    • 每个模型计算的成本/误差是否相同?再次无法使用正确的参数访问管道模型.

    非常感谢任何帮助/指导!谢谢!

    Any help/ guidance is much appreciated! Thanks!

    推荐答案

    您的参数定义不正确.它应该从特定参数映射到值,而不是从任意名称映射.你得到 k 等于 2,因为你传递的参数没有被利用,而且每个模型都使用完全相同的默认参数.

    Your param is not properly defined. It should map from the specific parameters to the values, not from arbitrary names. You get k equal 2 because parameters you pass are not utilized and every model uses exactly the same default parameters.

    让我们从示例数据开始:

    Lets start with example data:

    import numpy as np
    from pyspark.mllib.linalg import Vector
    
    df = (sc.textFile("data/mllib/kmeans_data.txt")
      .map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" ")))
      .zipWithIndex()
      .toDF(["features", "id"]))
    

    和一个 Pipeline:

    from pyspark.ml.clustering import KMeans
    from pyspark.ml import Pipeline
    
    km = KMeans()
    
    pipeline = Pipeline(stages=[km])
    

    如上所述,参数映射应使用特定参数作为键.例如:

    As mentioned above parameter map should use specific parameters as the keys. For example:

    params = [
        {km.k: 2, km.initMode: "k-means||"},
        {km.k: 3, km.initMode: "k-means||"},
        {km.k: 4, km.initMode: "k-means||"}
    ]
    
    models = pipeline.fit(df, params=params)
    
    assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]
    

    注意事项:

    • 正确的 initMode 用于 K-means||是 k-means|| 不是 kmeans||.
    • 在流水线中使用参数映射并不意味着模型是并行训练的.Spark 在数据而非参数上并行化训练过程.这只不过是一种方便的方法.
    • 您收到有关未缓存数据的警告,因为 K-Means 的实际输入不是 DataFrame 而是转换后的 RDD.
    • correct initMode for K-means|| is k-means|| not kmeans||.
    • using parameter map in a Pipeline doesn't mean that model are trained in parallel. Spark parallelizes training process over data not over params. It is nothing more than a convenience method.
    • you get the warning about not cached data because actual input to K-Means is not a DataFrame but transformed RDD.

    这篇关于如何将参数传递给 ML Pipeline.fit 方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-21 07:22