本文介绍了Spark 管道误差梯度提升模型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 python 中使用梯度提升模型时出现错误.我之前对数据进行了标准化,使用 VectorAssemble 进行转换,并对列进行索引,运行时出现错误:

I am getting an error when use gradient boosting model in python. I previously normalized the data, used VectorAssemble to transform, and indexed the columns, error occurs when when I run this:

from pyspark.ml import Pipeline
#pipeline = Pipeline(stages=[gbt])
stages = []

stages += [gbt]

pipeline = Pipeline(stages=stages)  
model = pipeline.fit(df_train)
prediction = model.transform(df_train)
prediction.printSchema()

这是错误:

command-3539065191562733> in <module>()
      6 
      7 pipeline = Pipeline(stages=stages)
----> 8 model = pipeline.fit(df_train)
      9 prediction = model.transform(df_train)
     10 prediction.printSchema()

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    293 
    294     def _fit(self, dataset):
--> 295         java_model = self._fit_java(dataset)
    296         model = self._create_model(java_model)
    297         return self._copyValues(model)

/databricks/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    290         """
    291         self._transfer_params_to_java()
--> 292         return self._java_obj.fit(dataset._jdf)
    293 
    294     def _fit(self, dataset):

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

怎么了?我已经研究了一段时间,但不确定数据或代码有什么问题

What is wrong? I have worked on this for a while but am not sure what is wrong with the data or the code

推荐答案

我只是尝试了一个虚拟数据,没有任何测试拆分:

I just tried out with a dummy data, with no test split whatsoever:

import pyspark.sql.functions as F
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
tst= sqlContext.createDataFrame([('a',7,2,0),('b',3,4,1),('c',5,6,0),('d',7,8,1),('a',9,10,0),('a',11,12,1),('g',13,14,0)],schema=['col1','col2','col3','label'])
str_indxr = StringIndexer(inputCol='col1', outputCol="col1_indexed")
ohe = OneHotEncoderEstimator(inputCols=['col1_indexed'],outputCols=['col1_ohe'])
vec_assmblr = VectorAssembler(inputCols=['col1_ohe','col2','col3'],outputCol='features_norm')
gbt = GBTClassifier(labelCol="label", featuresCol="features_norm", maxIter=10)
pip_line = Pipeline(stages=[str_indxr,ohe,vec_assmblr,gbt])
pip_line_fit = pip_line.fit(tst)
#%%
df_tran = pip_line_fit.transform(tst)

这有效.所以我可以想到两件事:

This works. So i could think of two things:

  1. 火花版本.我使用 2.4.0.你的是否大于或等于这个?
  2. 对于 minmax 缩放器或 vec 汇编器等其他阶段,您是否从 mlib 导入?ml 和 mlib 导入的这种混合导致奇怪的问题.mlib 将被淘汰,因此请导入您的所有功能来自 ml 库.

这篇关于Spark 管道误差梯度提升模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 03:05