规范化,有关之前都是用 python写的, 偶然要用scala 进行写, 看到这位大神写的, 那个网页也不错,那个连接图做的还蛮不错的,那天也将自己的博客弄一下那个插件。
本文来源 原文地址:http://www.neilron.xyz/spark-ml-feature-scaler/
下面是大神写的:
org.apache.spark.ml.feature包中包含了4种不同的归一化方法:
- Normalizer
- StandardScaler
- MinMaxScaler
- MaxAbsScaler
有时感觉会容易混淆,借助官方文档和实际数据的变换,在这里做一次总结。
0 数据准备
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import org.apache.spark.ml.linalg.Vectors val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") dataFrame.show // 原始数据 +---+--------------+ | id| features| +---+--------------+ | 0|[1.0,0.5,-1.0]| | 1| [2.0,1.0,1.0]| | 2|[4.0,10.0,2.0]| +---+--------------+ |
1 Normalizer
Normalizer的作用范围是每一行,使每一个行向量的范数变换为一个单位范数,下面的示例代码都来自spark官方文档加上少量改写和注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import org.apache.spark.ml.feature.Normalizer // 正则化每个向量到1阶范数 val normalizer = new Normalizer() .setInputCol("features") .setOutputCol("normFeatures") .setP(1.0) val l1NormData = normalizer.transform(dataFrame) println("Normalized using L^1 norm") l1NormData.show() // 将每一行的规整为1阶范数为1的向量,1阶范数即所有值绝对值之和。 +---+--------------+------------------+ | id| features| normFeatures| +---+--------------+------------------+ | 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]| | 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]| | 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]| +---+--------------+------------------+ // 正则化每个向量到无穷阶范数 val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity) println("Normalized using L^inf norm") lInfNormData.show() // 向量的无穷阶范数即向量中所有值中的最大值 +---+--------------+--------------+ | id| features| normFeatures| +---+--------------+--------------+ | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]| | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]| | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]| +---+--------------+--------------+ |
2 StandardScaler
StandardScaler处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或是0均值,或是0均值单位标准差。
主要有两个参数可以设置:
- withStd: 默认为真。将数据标准化到单位标准差。
- withMean: 默认为假。是否变换为0均值。 (此种方法将产出一个稠密输出,所以不适用于稀疏输入。)
StandardScaler需要fit数据,获取每一维的均值和标准差,来缩放每一维特征。
StandardScaler是一个Estimator,它可以fit数据集产生一个StandardScalerModel,用来计算汇总统计。
然后产生的模可以用来转换向量至统一的标准差以及(或者)零均值特征。
注意如果特征的标准差为零,则该特征在向量中返回的默认值为0.0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import org.apache.spark.ml.feature.StandardScaler val scaler = new StandardScaler() .setInputCol("features") .setOutputCol("scaledFeatures") .setWithStd(true) .setWithMean(false) // Compute summary statistics by fitting the StandardScaler. val scalerModel = scaler.fit(dataFrame) // Normalize each feature to have unit standard deviation. val scaledData = scalerModel.transform(dataFrame) scaledData.show // 将每一列的标准差缩放到1。 +---+--------------+------------------------------------------------------------+ |id |features |scaledFeatures | +---+--------------+------------------------------------------------------------+ |0 |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]| |1 |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771] | |2 |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542] | +---+--------------+------------------------------------------------------------+ |
3 MinMaxScaler
MinMaxScaler作用同样是每一列,即每一维特征。将每一维特征线性地映射到指定的区间,通常是[0, 1]。
MinMaxScaler计算数据集的汇总统计量,并产生一个MinMaxScalerModel。
注意因为零值转换后可能变为非零值,所以即便为稀疏输入,输出也可能为稠密向量。
该模型可以将独立的特征的值转换到指定的范围内。
它也有两个参数可以设置:
- min: 默认为0。指定区间的下限。
- max: 默认为1。指定区间的上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import org.apache.spark.ml.feature.MinMaxScaler val scaler = new MinMaxScaler() .setInputCol("features") .setOutputCol("scaledFeatures") // Compute summary statistics and generate MinMaxScalerModel val scalerModel = scaler.fit(dataFrame) // rescale each feature to range [min, max]. val scaledData = scalerModel.transform(dataFrame) println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]") scaledData.select("features", "scaledFeatures").show // 每维特征线性地映射,最小值映射到0,最大值映射到1。 +--------------+-----------------------------------------------------------+ |features |scaledFeatures | +--------------+-----------------------------------------------------------+ |[1.0,0.5,-1.0]|[0.0,0.0,0.0] | |[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]| |[4.0,10.0,2.0]|[1.0,1.0,1.0] | +--------------+-----------------------------------------------------------+ |
4 MaxAbsScaler
MaxAbsScaler将每一维的特征变换到[-1, 1]闭区间上,通过除以每一维特征上的最大的绝对值,它不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。
因为它不会转移/集中数据,所以不会破坏数据的稀疏性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import org.apache.spark.ml.feature.MaxAbsScaler val scaler = new MaxAbsScaler() .setInputCol("features") .setOutputCol("scaledFeatures") // Compute summary statistics and generate MaxAbsScalerModel val scalerModel = scaler.fit(dataFrame) // rescale each feature to range [-1, 1] val scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show() // 每一维的绝对值的最大值为[4, 10, 2] +--------------+----------------+ | features| scaledFeatures| +--------------+----------------+ |[1.0,0.5,-1.0]|[0.25,0.05,-0.5]| | [2.0,1.0,1.0]| [0.5,0.1,0.5]| |[4.0,10.0,2.0]| [1.0,1.0,1.0]| +--------------+----------------+ |
总结
所有4种归一化方法都是线性的变换,当某一维特征上具有非线性的分布时,还需要配合其它的特征预处理方法。
补充:
其他特征转换
VectorIndexer
算法介绍:
VectorIndexer解决数据集中的类别特征Vector。它可以自动识别哪些特征是类别型的,并且将原始值转换为类别指标。它的处理流程如下:
1.获得一个向量类型的输入以及maxCategories参数。
2.基于原始数值识别哪些特征需要被类别化,其中最多maxCategories需要被类别化。
3.对于每一个类别特征计算0-based类别指标。
4.对类别特征进行索引然后将原始值转换为指标。
索引后的类别特征可以帮助决策树等算法处理类别型特征,并得到较好结果。
在下面的例子中,我们读入一个数据集,然后使用VectorIndexer来决定哪些特征需要被作为非数值类型处理,将非数值型特征转换为他们的索引。
调用示例:
Scala:
- import org.apache.spark.ml.feature.VectorIndexer
- val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- val indexer = new VectorIndexer()
- .setInputCol("features")
- .setOutputCol("indexed")
- .setMaxCategories(10)
- val indexerModel = indexer.fit(data)
- val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
- println(s"Chose ${categoricalFeatures.size} categorical features: " +
- categoricalFeatures.mkString(", "))
- // Create new column "indexed" with categorical values transformed to indices
- val indexedData = indexerModel.transform(data)
- indexedData.show()
Python:
- from pyspark.ml.feature import VectorIndexer
- data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
- indexerModel = indexer.fit(data)
- # Create new column "indexed" with categorical values transformed to indices
- indexedData = indexerModel.transform(data)
- indexedData.show()
ElementwiseProduct
算法介绍:
ElementwiseProduct按提供的“weight”向量,返回与输入向量元素级别的乘积。即是说,按提供的权重分别对输入数据进行缩放,得到输入向量v以及权重向量w的Hadamard积。
下面例子展示如何通过转换向量的值来调整向量。
调用示例:
Scala:
- import org.apache.spark.ml.feature.ElementwiseProduct
- import org.apache.spark.ml.linalg.Vectors
- // Create some vector data; also works for sparse vectors
- val dataFrame = spark.createDataFrame(Seq(
- ("a", Vectors.dense(1.0, 2.0, 3.0)),
- ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
- val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
- val transformer = new ElementwiseProduct()
- .setScalingVec(transformingVector)
- .setInputCol("vector")
- .setOutputCol("transformedVector")
- // Batch transform the vectors to create new column:
- transformer.transform(dataFrame).show()
Python:
- from pyspark.ml.feature import ElementwiseProduct
- from pyspark.ml.linalg import Vectors
- # Create some vector data; also works for sparse vectors
- data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
- df = spark.createDataFrame(data, ["vector"])
- transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
- inputCol="vector", outputCol="transformedVector")
- # Batch transform the vectors to create new column:
- transformer.transform(df).show()
SQLTransformer
算法介绍:
SQLTransformer工具用来转换由SQL定义的陈述。目前仅支持SQL语法如"SELECT ...FROM __THIS__ ...",其中"__THIS__"代表输入数据的基础表。选择语句指定输出中展示的字段、元素和表达式,支持Spark SQL中的所有选择语句。用户可以基于选择结果使用Spark SQL建立方程或者用户自定义函数。SQLTransformer支持语法示例如下:
1. SELECTa, a + b AS a_b FROM __THIS__
2. SELECTa, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
3. SELECTa, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
示例:
假设我们有如下DataFrame包含id,v1,v2列:
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
使用SQLTransformer语句"SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"转换后得到输出如下:
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0| 3.0 | 4.0 | 3.0
2 | 2.0| 5.0 | 7.0 |10.0
调用示例:
Scala:
- import org.apache.spark.ml.feature.SQLTransformer
- val df = spark.createDataFrame(
- Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
- val sqlTrans = new SQLTransformer().setStatement(
- "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
- sqlTrans.transform(df).show()
Java:
- import java.util.Arrays;
- import java.util.List;
- import org.apache.spark.ml.feature.SQLTransformer;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.RowFactory;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.types.*;
- List<Row> data = Arrays.asList(
- RowFactory.create(0, 1.0, 3.0),
- RowFactory.create(2, 2.0, 5.0)
- );
- StructType schema = new StructType(new StructField [] {
- new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
- new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
- new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
- });
- Dataset<Row> df = spark.createDataFrame(data, schema);
- SQLTransformer sqlTrans = new SQLTransformer().setStatement(
- "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
- sqlTrans.transform(df).show();
Python:
- from pyspark.ml.feature import SQLTransformer
- df = spark.createDataFrame([
- (0, 1.0, 3.0),
- (2, 2.0, 5.0)
- ], ["id", "v1", "v2"])
- sqlTrans = SQLTransformer(
- statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
- sqlTrans.transform(df).show()
VectorAssembler
算法介绍:
VectorAssembler是一个转换器,它将给定的若干列合并为一列向量。它可以将原始特征和一系列通过其他转换器得到的特征合并为单一的特征向量,来训练如逻辑回归和决策树等机器学习算法。VectorAssembler可接受的输入列类型:数值型、布尔型、向量型。输入列的值将按指定顺序依次添加到一个新向量中。
示例:
假设我们有如下DataFrame包含id,hour,mobile, userFeatures以及clicked列:
id | hour | mobile| userFeatures | clicked
----|------|--------|------------------|---------
0 |18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures列中含有3个用户特征。我们想将hour,mobile以及userFeatures合并为一个新列。将VectorAssembler的输入指定为hour,mobile以及userFeatures,输出指定为features,通过转换我们将得到以下结果:
id | hour | mobile| userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 |18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
调用示例:
Scala:
- import org.apache.spark.ml.feature.VectorAssembler
- import org.apache.spark.ml.linalg.Vectors
- val dataset = spark.createDataFrame(
- Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
- ).toDF("id", "hour", "mobile", "userFeatures", "clicked")
- val assembler = new VectorAssembler()
- .setInputCols(Array("hour", "mobile", "userFeatures"))
- .setOutputCol("features")
- val output = assembler.transform(dataset)
- println(output.select("features", "clicked").first())
Python:
- from pyspark.ml.linalg import Vectors
- from pyspark.ml.feature import VectorAssembler
- dataset = spark.createDataFrame(
- [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
- ["id", "hour", "mobile", "userFeatures", "clicked"])
- assembler = VectorAssembler(
- inputCols=["hour", "mobile", "userFeatures"],
- outputCol="features")
- output = assembler.transform(dataset)
- print(output.select("features", "clicked").first())
QuantileDiscretizer
算法介绍:
QuantileDiscretizer讲连续型特征转换为分级类别特征。分级的数量由numBuckets参数决定。分级的范围有渐进算法决定。渐进的精度由relativeError参数决定。当relativeError设置为0时,将会计算精确的分位点(计算代价较高)。分级的上下边界为负无穷到正无穷,覆盖所有的实数值。
示例:
假设我们有如下DataFrame包含id,hour:
id | hour
----|------
0 |18.0
----|------
1 |19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour是一个Double类型的连续特征,将参数numBuckets设置为3,我们可以将hour转换为如下分级特征。
id | hour | result
----|------|------
0 |18.0 | 2.0
----|------|------
1 |19.0 | 2.0
----|------|------
2 |8.0 | 1.0
----|------|------
3 |5.0 | 1.0
----|------|------
4 |2.2 | 0.0
调用示例:
Scala:
- import org.apache.spark.ml.feature.QuantileDiscretizer
- val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
- var df = spark.createDataFrame(data).toDF("id", "hour")
- val discretizer = new QuantileDiscretizer()
- .setInputCol("hour")
- .setOutputCol("result")
- .setNumBuckets(3)
- val result = discretizer.fit(df).transform(df)
- result.show()
Python:
- from pyspark.ml.feature import QuantileDiscretizer
- data = [(0, 18.0,), (1, 19.0,), (2, 8.0,), (3, 5.0,), (4, 2.2,)]
- df = spark.createDataFrame(data, ["id", "hour"])
- discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
- result = discretizer.fit(df).transform(df)
- result.show()