输入数据
我有两个从 MySQL 导出为 csv 文件的表。
磁盘上的表 1 大小:250 MB
记录:70 万
表 2 磁盘大小:350 MB
记录:60 万
代码更新
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val table-one = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("example-input-files/table-1-data.csv”)
table-one.registerTempTable(“table-one”)
val table-two = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("example-input-files/table-2-data.csv”)
table-two.registerTempTable(“table”-two)
sqlContext.cacheTable(“table-one”)
sqlContext.cacheTable(“table-two”)
val result = sqlContext.sql("SELECT table-one.ID,table-two.ID FROM table-one LEFT JOIN table-two ON table-one.ID = table-two.ID")
result.take(2).foreach(println)
Spark 工作
表。
加入关系数据库说话。
消耗时间。
这总共需要 30 秒。我在一台具有足够内存的机器上运行,以便两个文件都可以容纳(毕竟它是 600Mb)。
我有两种方式来运行这项工作。
sqlContext.cacheTable("the_table")
缓存后我发现连接操作本身需要 8 秒才能完成。
这个时间合理吗?我猜它不是,并且可以进行很多优化来加快查询速度。
我看到的优化
有没有其他方法可以做得更好?
最佳答案
正如评论者所提到的,Spark 是为分布式计算而设计的。在本地处理小型(ish)数据时,仅用于所有初始化和调度的开销就足以使 Spark 与其他 PL 相比显得很慢。
只要您的代码执行窄转换,执行程序实际上将在其内存中的本地数据副本上工作,因此这并不完全正确。然而,您的代码执行连接,这是一个广泛的转换 - 这意味着块将必须在网络中进行混洗。记住这一点。尽可能多地进行宽转换是昂贵的,将它们放在 DAG 的末尾。但同样,您的数据足够小,您可能看不到好处。
另一件事是,如果您有 Hive,那么您可以考虑将数据存储在按连接列分区的表中。
关于performance - 联接的 Spark 性能分析,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31849599/