我想在scala的spark 2.0中为管道编写自定义的Transformer。到目前为止,我还不清楚copytransformSchema方法应该返回什么。他们返回null是否正确? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java复制吗?

我得出结论,随着Transformer扩展PipelineStage的发展,fit调用了transformSchema方法。我是否正确理解transformSchema与sk-learns fit类似?

由于我的Transformer应该将数据集与(非常小的)第二个数据集连接在一起,因此我也希望将该数据集存储在序列化管道中。我应如何将其存储在转换器中以正确使用管道序列化机制?

一个简单的转换器看起来如何,该转换器可以计算单个列的平均值并填充nan值+保持该值?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
    class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {

      def transform(df: Dataset[MyClass]): DataFrame = {

      }

      override def copy(extra: ParamMap): Transformer = {
      }

      override def transformSchema(schema: StructType): StructType = {
        schema
      }
    }

最佳答案

transformSchema应该返回应用Transformer之后期望的模式。例:

  • 如果transfomer添加了IntegerType的列,并且输出列的名称为foo:
    import org.apache.spark.sql.types._
    
    override def transformSchema(schema: StructType): StructType = {
       schema.add(StructField("foo", IntegerType))
    }
    

  • 因此,如果未更改数据集的架构,因为仅填充了用于均值插补的名称值,我应该返回原始案例类作为架构?

    由于Dataset一旦创建便是不变,因此在Spark SQL(以及MLlib)中是不可能的。您只能添加或“替换”(添加后跟drop操作)列。

    关于scala - 如何在MLlib中编写自定义Transformer?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40615713/

    10-12 13:54