我正在探索将表连接到自身时Spark的行为。我正在使用Databricks。

我的虚拟场景是:

  • 将外部表读取为数据帧A(基础文件为增量格式)
  • 将数据框B定义为仅选择了某些列的数据框A
  • 在第1列和第2列上联接数据框A和B

  • (是的,这没有多大意义,我只是在尝试了解Spark的基 native 制)
    a = spark.read.table("table") \
    .select("column1", "column2", "column3", "column4") \
    .withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
    
    b = a.select("column1", "column2", "columnA")
    
    c= a.join(b, how="left", on = ["column1", "column2"])
    

    我的第一次尝试是按原样运行代码(尝试1)。然后,我尝试重新分区和缓存(尝试2)
    a = spark.read.table("table") \
    .select("column1", "column2", "column3", "column4") \
    .withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
    .repartition(col("column1"), col("column2")).cache()
    

    最后,我重新分区,排序和缓存
     a = spark.read.table("table") \
    .select("column1", "column2", "column3", "column4") \
    .withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
    .repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
    

    生成的各个dag如下所示。

    我的问题是:
  • 为什么在尝试1中,即使未明确指定缓存,该表仍似乎已缓存。
  • 为什么在InMemoreTableScan后面总是跟随另一个此类型的节点。
  • 为什么尝试3高速缓存似乎分两个阶段进行?
  • 为什么要尝试3 WholeStageCodegen跟随一个(并且只有一个)InMemoreTableScan。

  • apache-spark - Apache Spark : impact of repartitioning,对联接进行排序和缓存-LMLPHP

    apache-spark - Apache Spark : impact of repartitioning,对联接进行排序和缓存-LMLPHP

    apache-spark - Apache Spark : impact of repartitioning,对联接进行排序和缓存-LMLPHP

    最佳答案

    您在这三个计划中观察到的是DataBricks运行时和Spark的混合。

    首先,在运行DataBricks运行时3.3+时,将自动为所有 Parquet 文件启用缓存。
    相应的配置:
    spark.databricks.io.cache.enabled true
    对于第二个查询,InMemoryTableScan发生了两次,因为调用联接时,spark尝试并行计算数据集A和数据集B。假设不同的执行者被分配了上述任务,则两者都必须从(DataBricks)缓存中扫描表。

    对于第三个,InMemoryTableScan本身并不引用缓存。这仅意味着形成的任何计划催化剂都涉及多次扫描缓存的表。

    PS:我无法想象要点4 :)

    10-08 07:15
    查看更多