问题描述
我正在尝试在 PySpark MLlib 中构建一个简单的自定义 Estimator
.我有这里可以编写自定义转换器,但我不确定如何在 Estimator
上执行此操作.我也不明白 @keyword_only
做了什么,为什么我需要这么多的 setter 和 getter.Scikit-learn 似乎有自定义模型的适当文档(见这里)但PySpark没有.
I am trying to build a simple custom Estimator
in PySpark MLlib. I have here that it is possible to write a custom Transformer but I am not sure how to do it on an Estimator
. I also don't understand what @keyword_only
does and why do I need so many setters and getters. Scikit-learn seem to have a proper document for custom models (see here) but PySpark doesn't.
示例模型的伪代码:
class NormalDeviation():
def __init__(self, threshold = 3):
def fit(x, y=None):
self.model = {'mean': x.mean(), 'std': x.std()]
def predict(x):
return ((x-self.model['mean']) > self.threshold * self.model['std'])
def decision_function(x): # does ml-lib support this?
推荐答案
一般来说没有文档,因为对于 Spark 1.6/2.0 大部分相关 API 并不打算公开.它应该在 Spark 2.1.0 中发生变化(参见 SPARK-7146).
Generally speaking there is no documentation because as for Spark 1.6 / 2.0 most of the related API is not intended to be public. It should change in Spark 2.1.0 (see SPARK-7146).
API 相对复杂,因为它必须遵循特定的约定才能使给定的 Transformer
或 Estimator
与 Pipeline
API 兼容.其中一些方法可能需要阅读和写作或网格搜索等功能.其他的,比如 keyword_only
只是一个简单的助手,并不是严格要求的.
API is relatively complex because it has to follow specific conventions in order to make given Transformer
or Estimator
compatible with Pipeline
API. Some of these methods may be required for features like reading and writing or grid search. Other, like keyword_only
are just a simple helpers and not strictly required.
假设您已经为均值参数定义了以下混合:
Assuming you have defined following mix-ins for mean parameter:
from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp
class HasMean(Params):
mean = Param(Params._dummy(), "mean", "mean",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasMean, self).__init__()
def setMean(self, value):
return self._set(mean=value)
def getMean(self):
return self.getOrDefault(self.mean)
标准偏差参数:
class HasStandardDeviation(Params):
standardDeviation = Param(Params._dummy(),
"standardDeviation", "standardDeviation",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasStandardDeviation, self).__init__()
def setStddev(self, value):
return self._set(standardDeviation=value)
def getStddev(self):
return self.getOrDefault(self.standardDeviation)
和阈值:
class HasCenteredThreshold(Params):
centeredThreshold = Param(Params._dummy(),
"centeredThreshold", "centeredThreshold",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasCenteredThreshold, self).__init__()
def setCenteredThreshold(self, value):
return self._set(centeredThreshold=value)
def getCenteredThreshold(self):
return self.getOrDefault(self.centeredThreshold)
您可以按如下方式创建基本的Estimator
:
you could create basic Estimator
as follows:
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark import keyword_only
class NormalDeviation(Estimator, HasInputCol,
HasPredictionCol, HasCenteredThreshold,
DefaultParamsReadable, DefaultParamsWritable):
@keyword_only
def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
super(NormalDeviation, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
# Required in Spark >= 3.0
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self._set(inputCol=value)
# Required in Spark >= 3.0
def setPredictionCol(self, value):
"""
Sets the value of :py:attr:`predictionCol`.
"""
return self._set(predictionCol=value)
@keyword_only
def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _fit(self, dataset):
c = self.getInputCol()
mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
return NormalDeviationModel(
inputCol=c, mean=mu, standardDeviation=sigma,
centeredThreshold=self.getCenteredThreshold(),
predictionCol=self.getPredictionCol())
class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
HasMean, HasStandardDeviation, HasCenteredThreshold,
DefaultParamsReadable, DefaultParamsWritable):
@keyword_only
def __init__(self, inputCol=None, predictionCol=None,
mean=None, standardDeviation=None,
centeredThreshold=None):
super(NormalDeviationModel, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, predictionCol=None,
mean=None, standardDeviation=None,
centeredThreshold=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
x = self.getInputCol()
y = self.getPredictionCol()
threshold = self.getCenteredThreshold()
mu = self.getMean()
sigma = self.getStddev()
return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)
感谢 Benjamin-Manns 的 PySpark 中可用的 DefaultParamsReadable, DefaultParamsWritable 的使用 >= 2.3.0
Credits to Benjamin-Manns for the use of DefaultParamsReadable, DefaultParamsWritable available in PySpark >= 2.3.0
最后可以这样使用:
df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])
normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model = Pipeline(stages=[normal_deviation]).fit(df)
model.transform(df).show()
## +---+----+----------+
## | id| x|prediction|
## +---+----+----------+
## | 1| 2.0| false|
## | 2| 3.0| false|
## | 3| 0.0| false|
## | 4|99.0| true|
## +---+----+----------+
这篇关于如何在 PySpark 中创建自定义 Estimator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!