我有为Random Forest regression编码编写的这段代码。但是Random Forest regressionOne Hot Encoding之后不需要indexer。现在,我想尝试需要Linear RegressionOne Hot Encoding。我浏览了Sparks One Hot Encoder文档,但无法了解如何将其合并到当前代码中。如何在当前代码中添加One Hot Encoding步骤?

from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import VectorAssembler
import org.apache.spark.ml.feature.OneHotEncoder

label_col = "x4"

# converting RDD to dataframe
train_data_df = train_data.toDF(("x0","x1","x2","x3","x4"))

# Indexers encode strings with doubles
string_indexers = [
   StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
   for x in train_data_df.columns if x != label_col
]

# Assembles multiple columns into a single vector
assembler = VectorAssembler(
    inputCols=["idx_{0}".format(x) for x in train_data_df.columns if x != label_col],
    outputCol="features"
)


pipeline = Pipeline(stages=string_indexers + [assembler])
model = pipeline.fit(train_data_df)
indexed = model.transform(train_data_df)

label_points = (indexed
.select(col(label_col).cast("double").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))


更新:

from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel


###### FOR TEST DATA ######
label_col_test = "x4"

# converting RDD to dataframe
test_data_df = test_data.toDF(("x0","x1","x2","x3","x4"))

# Indexers encode strings with doubles
string_indexers_test = [
   StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
   for x in testData_df_1.columns if x != label_col_test
]

# encoders
encoders_test = [
   StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
   for x in testData_df_1.columns if x != label_col_test
]

# Assembles multiple columns into a single vector
assembler_test = VectorAssembler(
    inputCols=["idx_{0}".format(x) for x in testData_df_1.columns if x != label_col_test],
    outputCol="features"
)


pipeline_test = Pipeline(stages=string_indexers_test + encoders_test + [assembler_test])
model_test = pipeline_test.fit(test_data_df)
indexed_test = model_test.transform(test_data_df)

label_points_test = (indexed_test
    .select(col(label_col_test).cast("float").alias("label"), col("features"))
    .map(lambda row: LabeledPoint(row.label, row.features)))

# Build the model
model = LinearRegressionWithSGD.train(label_points)

valuesAndPreds = label_points_test.map(lambda p: (p.label, model.predict(p.features)))

MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

最佳答案

您可以简单地将其添加为索引和组装之间的步骤:

encoders = [
   StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
   for x in train_data_df.columns if x != label_col
]

assembler = VectorAssembler(
    inputCols=[
        "enc_{0}".format(x) for x in train_data_df.columns if x != label_col
    ],
    outputCol="features"
)


pipeline = Pipeline(stages=string_indexers + encoders + [assembler])

10-08 11:12