我想在scala的spark 2.0中为管道编写自定义的Transformer
。到目前为止,我还不清楚copy
或transformSchema
方法应该返回什么。他们返回null
是否正确? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java复制吗?
我得出结论,随着Transformer
扩展PipelineStage
的发展,fit
调用了transformSchema
方法。我是否正确理解transformSchema
与sk-learns fit类似?
由于我的Transformer
应该将数据集与(非常小的)第二个数据集连接在一起,因此我也希望将该数据集存储在序列化管道中。我应如何将其存储在转换器中以正确使用管道序列化机制?
一个简单的转换器看起来如何,该转换器可以计算单个列的平均值并填充nan值+保持该值?
@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {
def transform(df: Dataset[MyClass]): DataFrame = {
}
override def copy(extra: ParamMap): Transformer = {
}
override def transformSchema(schema: StructType): StructType = {
schema
}
}
最佳答案
transformSchema
应该返回应用Transformer
之后期望的模式。例:
IntegerType
的列,并且输出列的名称为foo
:import org.apache.spark.sql.types._
override def transformSchema(schema: StructType): StructType = {
schema.add(StructField("foo", IntegerType))
}
因此,如果未更改数据集的架构,因为仅填充了用于均值插补的名称值,我应该返回原始案例类作为架构?
由于
Dataset
一旦创建便是不变,因此在Spark SQL(以及MLlib)中是不可能的。您只能添加或“替换”(添加后跟drop
操作)列。关于scala - 如何在MLlib中编写自定义Transformer?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40615713/