我在Linux上的独立模式下将Pyspark与Spark 2.4结合使用,以聚合传入的数据,并使用Jupyter笔记本(当前用于测试)将这些数据写入数据库,并剥离以下内容:

spark = SparkSession.builder.appName("foo").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:1234").option("subscribe", "bar”).load()
df2 = df.withWatermark("timestamp", "1 second").groupby(F.window('timestamp', "5 second")).agg(F.min("timestamp").alias('timestamp_window_min'))

def write_into_sink(df, epoch_id):
    df.write.jdbc(table="foo_agg", mode="append", [...])
    pass
query_write_sink = df2.writeStream.foreachBatch(write_into_sink).trigger(processingTime = "1 seconds").start()


在Spark的tmp目录中运行2小时后,tmp/temporary-[...]/state/0/中有数十个目录,其中包含许多小的cec和delta文件,在运行期间总共增加了6 GB的磁盘空间。因此,我的问题是由于磁盘已满,我无法运行几个小时的脚本。我如何才能运行更长的时间,例如几天甚至几个月?如果我关闭/杀死python内核,则会清除目录。

我已经按照Apache Spark does not delete temporary directories进行操作,并将conf/spark-env.sh设置为SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true",但是在重新启动后仍然没有帮助,因为它仅在火花运行后而不在运行期间处理文件。我还在同一文件中尝试过SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true spark.worker.cleanup.appDataTtl=120",但具有相同的不存在的结果。

那么,您是否知道如何在运行期间消除spark的tmp文件?

最佳答案

您可能会有一个cron条目来清理相关目录(也许每30分钟删除30分钟之前的文件),如下所示:

0/30 * * * * find /path_to_spark_directory/* -mmin +30 -exec rm -rf {} \; 

09-30 15:33
查看更多