本文介绍了Spark 2.0 数据集与数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从 spark 2.0.1 开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:

starting out with spark 2.0.1 I got some questions. I read a lot of documentation but so far could not find sufficient answers:

  • 有什么区别
    • df.select("foo")
    • df.select($"foo")
    • myDataSet.map(foo.someVal) 是类型安全的,不会转换为 RDD 而是保持在 DataSet 表示中/没有额外的开销(2.0.0 的性能明智)
    • myDataSet.map(foo.someVal) is typesafe and will not convert into RDD but stay in DataSet representation / no additional overhead (performance wise for 2.0.0)
    • 为什么我应该使用 UDF/UADF 而不是地图(假设地图保留在数据集表示中)?

    推荐答案

    1. df.select("foo")df.select($"foo") 的区别在于签名.前一个需要至少一个String,后一个需要零个或多个Columns.除此之外没有任何实际区别.
    2. myDataSet.map(foo.someVal) 类型检查,但作为任何 Dataset 操作使用对象的 RDD,并进行比较对于 DataFrame 操作,有很大的开销.我们来看一个简单的例子:

    1. Difference between df.select("foo") and df.select($"foo") is signature. The former one takes at least one String, the later one zero or more Columns. There is no practical difference beyond that.
    2. myDataSet.map(foo.someVal) type checks, but as any Dataset operation uses RDD of objects, and compared to DataFrame operations, there is a significant overhead. Let's take a look at a simple example:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    如您所见,此执行计划需要访问所有字段,并且必须DeserializeToObject.

    As you can see this execution plan requires access to all fields and has to DeserializeToObject.

    没有.一般来说,其他方法不是语法糖,并且会生成明显不同的执行计划.例如:

    No. In general other methods are not syntactic sugar and generate a significantly different execution plan. For example:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    与之前显示的计划相比,它可以直接访问列.这与其说是 API 的限制,不如说是操作语义差异的结果.

    Compared to the plan shown before it can access column directly. It is not so much a limitation of the API but a result of a difference in the operational semantics.

    没有这样的选择.类型列允许您将Dataset 静态转换为另一个静态类型的Dataset:

    There is no such option. While typed columns allow you to transform statically Dataset into another statically typed Dataset:

    ds.select($"bar".as[Int])
    

    没有类型安全.还有一些其他尝试包括类型安全的优化操作,如类型聚合,但这个实验性 API.

    there are not type safe. There some other attempts to include type safe optimized operations, like typed aggregations, but this experimental API.

    这完全取决于您.Spark 中的每个分布式数据结构都有其自己的优点和缺点(参见例如 Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题).

    It is completely up to you. Each distributed data structure in Spark provides its own advantages and disadvantages (see for example Spark UDAF with ArrayType as bufferSchema performance issues).

    就我个人而言,我发现静态类型的 Dataset 最没用:

    Personally, I find statically typed Dataset to be the least useful:

    • 不要提供与 Dataset[Row] 相同范围的优化(尽管它们共享存储格式和一些执行计划优化,但它并没有完全受益于代码生成或关闭)堆存储)也不能访问 DataFrame 的所有分析功能.

    • Don't provide the same range of optimizations as Dataset[Row] (although they share storage format and some execution plan optimizations it doesn't fully benefit from code generation or off-heap storage) nor access to all the analytical capabilities of the DataFrame.

    类型转换是黑盒,有效地为优化器创建分析障碍.例如选择(过滤器)不能被推到类型转换上:

    Typed transformations are black boxes, and effectively create analysis barrier for the optimizer. For example selections (filters) cannot be be pushed over typed transformation:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    
    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    

    相比:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    
    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134]
    

    这会影响谓词下推或投影下推等功能.

    This impacts features like predicate pushdown or projection pushdown.

    没有 RDD 那样灵活,只有一小部分本地支持的类型.

    There are not as flexible as RDDs with only a small subset of types supported natively.

    相关问题:

    这篇关于Spark 2.0 数据集与数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 04:49