我有为Random Forest regression
编码编写的这段代码。但是Random Forest regression
在One Hot Encoding
之后不需要indexer
。现在,我想尝试需要Linear Regression
的One Hot Encoding
。我浏览了Sparks One Hot Encoder文档,但无法了解如何将其合并到当前代码中。如何在当前代码中添加One Hot Encoding
步骤?
from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import VectorAssembler
import org.apache.spark.ml.feature.OneHotEncoder
label_col = "x4"
# converting RDD to dataframe
train_data_df = train_data.toDF(("x0","x1","x2","x3","x4"))
# Indexers encode strings with doubles
string_indexers = [
StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
for x in train_data_df.columns if x != label_col
]
# Assembles multiple columns into a single vector
assembler = VectorAssembler(
inputCols=["idx_{0}".format(x) for x in train_data_df.columns if x != label_col],
outputCol="features"
)
pipeline = Pipeline(stages=string_indexers + [assembler])
model = pipeline.fit(train_data_df)
indexed = model.transform(train_data_df)
label_points = (indexed
.select(col(label_col).cast("double").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))
更新:
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
###### FOR TEST DATA ######
label_col_test = "x4"
# converting RDD to dataframe
test_data_df = test_data.toDF(("x0","x1","x2","x3","x4"))
# Indexers encode strings with doubles
string_indexers_test = [
StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
for x in testData_df_1.columns if x != label_col_test
]
# encoders
encoders_test = [
StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
for x in testData_df_1.columns if x != label_col_test
]
# Assembles multiple columns into a single vector
assembler_test = VectorAssembler(
inputCols=["idx_{0}".format(x) for x in testData_df_1.columns if x != label_col_test],
outputCol="features"
)
pipeline_test = Pipeline(stages=string_indexers_test + encoders_test + [assembler_test])
model_test = pipeline_test.fit(test_data_df)
indexed_test = model_test.transform(test_data_df)
label_points_test = (indexed_test
.select(col(label_col_test).cast("float").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))
# Build the model
model = LinearRegressionWithSGD.train(label_points)
valuesAndPreds = label_points_test.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
最佳答案
您可以简单地将其添加为索引和组装之间的步骤:
encoders = [
StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
for x in train_data_df.columns if x != label_col
]
assembler = VectorAssembler(
inputCols=[
"enc_{0}".format(x) for x in train_data_df.columns if x != label_col
],
outputCol="features"
)
pipeline = Pipeline(stages=string_indexers + encoders + [assembler])