参见 UDF 是黑盒 — 不要使用它们除非你别无选择.引用 Reynold Xin 在我在 [email protected] 邮件列表上提出了同样的问题:UDF 是一个黑匣子,因此 Spark 无法知道它在处理什么.那里是一些简单的情况,我们可以在其中分析 UDF 字节码并推断出什么正在做,但一般来说很难做到.这种情况有一个JIRA票 SPARK-14083 分析JVM字节码并将闭包转化为催化剂表达,但正如有人说的(我认为是 Adam B. 在 twitter 上),期待它很快就会成为一种笑话.Dataset API 的一大优势是类型安全,但由于严重依赖用户定义的闭包/lambdas,以牺牲性能为代价.这些闭包通常比表达式慢,因为我们可以更灵活地优化表达式(已知数据类型,没有虚函数调用等).在许多情况下,查看这些闭包的字节码并弄清楚它们要做什么实际上并不难.如果我们能够理解它们,那么我们就可以将它们直接转化为 Catalyst 表达式,以实现更优化的执行.//让我们试着找出 1999 年出生的球员.//这会起作用,你有编译时安全...但它不会使用谓词下推!!!playerDs.filter(_.birthYear == 1999).explain()上面的代码等价于如下:val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) =>p.birthYear == 1999playerDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()someCodeSparkSQLCannotDoMuchOutOfIt 正是您将优化放在一边并让 Spark 优化器跳过它的地方.I always thought that dataset/dataframe API's are the same.. and the only difference is that dataset API will give you compile time safety. Right ?So.. I have very simple case: case class Player (playerID: String, birthYear: Int) val playersDs: Dataset[Player] = session.read .option("header", "true") .option("delimiter", ",") .option("inferSchema", "true") .csv(PeopleCsv) .as[Player] // Let's try to find players born in 1999. // This will work, you have compile time safety... but it will not use predicate pushdown!!! playersDs.filter(_.birthYear == 1999).explain() // This will work as expected and use predicate pushdown!!! // But you can't have compile time safety with this :( playersDs.filter('birthYear === 1999).explain()Explain from first example will show that it's NOT doing predicate pushdown (Notice empty PushedFilters):== Physical Plan ==*(1) Filter <function1>.apply+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...While the second sample will do it correctly (Notice PushedFilters):== Physical Plan ==*(1) Project [.....]+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999)) +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...So the question is.. how can I use DS Api, and have compile time safety.., and predicate pushdown working as expected ????Is it possible ? If not.. does this mean that DS api gives you compile time safety.. but at the cost of performance!! ??? (DF will be much faster in this case.. especially when processing large parquet files) 解决方案 That's the line in your Physical Plan you should remember to know the real difference between Dataset[T] and DataFrame (which is Dataset[Row]).Filter <function1>.applyI keep saying that people should stay away from the typed Dataset API and keep using the untyped DataFrame API as the Scala code becomes a black box to the optimizer in too many places. You've just hit one of these and think also about the deserialization of all the objects that Spark SQL keeps away from JVM to avoid GCs. Every time you touch the objects you literally ask Spark SQL to deserialize objects and load them onto JVM that puts a lot of pressure on GC (which will get triggered more often with the typed Dataset API as compared to the untyped DataFrame API).See UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice.Quoting Reynold Xin after I asked the very same question on [email protected] mailing list: The UDF is a black box so Spark can't know what it is dealing with. There are simple cases in which we can analyze the UDFs byte code and infer what it is doing, but it is pretty difficult to do in general.There is a JIRA ticket for such cases SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions, but as someone said (I think it was Adam B. on twitter) it'd be a kind of joke to expect it any time soon. One big advantage of the Dataset API is the type safety, at the cost of performance due to heavy reliance on user-defined closures/lambdas. These closures are typically slower than expressions because we have more flexibility to optimize expressions (known data types, no virtual function calls, etc). In many cases, it's actually not going to be very difficult to look into the byte code of these closures and figure out what they are trying to do. If we can understand them, then we can turn them directly into Catalyst expressions for more optimized executions.// Let's try to find players born in 1999.// This will work, you have compile time safety... but it will not use predicate pushdown!!!playersDs.filter(_.birthYear == 1999).explain()The above code is equivalent to the following:val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()someCodeSparkSQLCannotDoMuchOutOfIt is exactly where you put optimizations aside and let Spark Optimizer skip it. 这篇关于为什么在类型化数据集 API(与非类型化数据帧 API)中不使用谓词下推?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
08-29 13:48