我正在尝试读取一些表( Parquet 文件)并进行一些联接,并将它们以 Parquet 格式写入S3中,但是我遇到错误或花了两个多小时来写表。
错误:


    An error was encountered:
    Invalid status code '400' from https://.... with error payload: {"msg":"requirement failed: session isn't active."}
除该表外,我还可以将其他表编写为 Parquet 。
这是我的示例代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.config("spark.sql.catalogImplementation", "in-memory").getOrCreate()

table1 = spark.read.parquet("s3://.../table1")
table1.createOrReplaceTempView("table1")

table2 = spark.read.parquet("s3://.../table2")
table2.createOrReplaceTempView("table2")

table3 = spark.read.parquet("s3://.../table3")
table3.createOrReplaceTempView("table3")

table4 = spark.read.parquet("s3://.../table4")
table4.createOrReplaceTempView("table4")

Final_table = spark.sql("""
select
      a.col1
      a.col2
...
      d.coln
 from

        table1 a
        left outer join
        table2 b
        on
        cond1
        cond2
        cond3
        left outer join
        table3 c
        on
...
        """)

Final_table.count()
# 3813731240

output_file="s3://.../final_table/"

final_table.write.option("partitionOverwriteMode", "dynamic").mode('overwrite').partitionBy("col1").parquet(output_file)
为了添加更多内容,我尝试了分区,但没有成功。另外,我尝试了不同的EMR群集,例如
群集1:

m5.24x大
群集2:

m5.24x大
1个核心
m5.24x大
群集3:

m5d.2xlarge
8核
m5d.2xlarge
EMR发布版本
5.29.0

最佳答案

大多数 Spark 作业可以通过可视化其DAG进行优化。
在这种情况下,如果您能够运行sql并以最少的时间获取计数,并且所有时间都花在编写上,那么这里有一些建议

  • 因为您已经知道数据帧的计数,所以请删除计数操作,因为这对于您的工作来说是不必要的开销。
  • 现在,您将基于col1对数据进行分区,因此最好尝试对数据进行重新分区,以便在写入时执行最少的随机播放。

  • 你可以做类似的事情
    df.repartition('col1', 100).write
    
    如果您知道分区号,也可以设置分区号。

    07-27 22:37