本文介绍了Spark 管道误差梯度提升模型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在 python 中使用梯度提升模型时出现错误.我之前对数据进行了标准化,使用 VectorAssemble 进行转换,并对列进行索引,运行时出现错误:
I am getting an error when use gradient boosting model in python. I previously normalized the data, used VectorAssemble to transform, and indexed the columns, error occurs when when I run this:
from pyspark.ml import Pipeline
#pipeline = Pipeline(stages=[gbt])
stages = []
stages += [gbt]
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_train)
prediction = model.transform(df_train)
prediction.printSchema()
这是错误:
command-3539065191562733> in <module>()
6
7 pipeline = Pipeline(stages=stages)
----> 8 model = pipeline.fit(df_train)
9 prediction = model.transform(df_train)
10 prediction.printSchema()
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
293
294 def _fit(self, dataset):
--> 295 java_model = self._fit_java(dataset)
296 model = self._create_model(java_model)
297 return self._copyValues(model)
/databricks/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
290 """
291 self._transfer_params_to_java()
--> 292 return self._java_obj.fit(dataset._jdf)
293
294 def _fit(self, dataset):
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
怎么了?我已经研究了一段时间,但不确定数据或代码有什么问题
What is wrong? I have worked on this for a while but am not sure what is wrong with the data or the code
推荐答案
我只是尝试了一个虚拟数据,没有任何测试拆分:
I just tried out with a dummy data, with no test split whatsoever:
import pyspark.sql.functions as F
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
tst= sqlContext.createDataFrame([('a',7,2,0),('b',3,4,1),('c',5,6,0),('d',7,8,1),('a',9,10,0),('a',11,12,1),('g',13,14,0)],schema=['col1','col2','col3','label'])
str_indxr = StringIndexer(inputCol='col1', outputCol="col1_indexed")
ohe = OneHotEncoderEstimator(inputCols=['col1_indexed'],outputCols=['col1_ohe'])
vec_assmblr = VectorAssembler(inputCols=['col1_ohe','col2','col3'],outputCol='features_norm')
gbt = GBTClassifier(labelCol="label", featuresCol="features_norm", maxIter=10)
pip_line = Pipeline(stages=[str_indxr,ohe,vec_assmblr,gbt])
pip_line_fit = pip_line.fit(tst)
#%%
df_tran = pip_line_fit.transform(tst)
这有效.所以我可以想到两件事:
This works. So i could think of two things:
- 火花版本.我使用 2.4.0.你的是否大于或等于这个?
- 对于 minmax 缩放器或 vec 汇编器等其他阶段,您是否从 mlib 导入?ml 和 mlib 导入的这种混合导致奇怪的问题.mlib 将被淘汰,因此请导入您的所有功能来自 ml 库.
这篇关于Spark 管道误差梯度提升模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!