本文介绍了可以删除基础实木复合地板文件而不会对DeltaLake _delta_log产生负面影响的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在DeltaLake表上使用 .vacuum()非常慢(请参阅 Delta Lake(OSS)表在EMR和S3上-真空需要很长时间,没有任何工作).

Using .vacuum() on a DeltaLake table is very slow (see Delta Lake (OSS) Table on EMR and S3 - Vacuum takes a long time with no jobs).

如果我手动删除了底层实木复合地板文件,并且未添加新的 json 日志文件或未添加新的 .checkpoint.parquet 文件并更改了 _delta_log/_last_checkpoint 指向它的文件;如果有的话,对DeltaLake表的负面影响是什么?

If I manually deleted the underlying parquet files and did not add a new json log file or add a new .checkpoint.parquet file and change the _delta_log/_last_checkpoint file that points to it; what would the negative impacts to the DeltaLake table be, if any?

显然,时间旅行,即加载依赖于我删除的实木复合地板文件的表格的先前版本,是行不通的.我想知道的是,在读取,写入或附加到DeltaLake表的当前版本时是否会有任何问题?

Obviously time-traveling, i.e. loading a previous version of the table that relied on the parquet files I removed, would not work. What I want to know is, would there be any issues reading, writing, or appending to the current version of the DeltaLake table?

我想在pySpark中做什么:

What I am thinking of doing in pySpark:

### Assuming a working SparkSession as `spark`

from subprocess import check_output
import json
from pyspark.sql import functions as F

awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)

s3_bucket_path = "s3a://my_s3_bucket/delta/"

df_chkpt_del = (
    spark.read.format("parquet")
    .load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
    .where(F.col("remove").isNotNull())
    .select("remove.*")
    .withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
    .withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
    .where(F.col("delDateDiffDays") < -7 )
)

这里有很多选择.一种可能是:

There are a lot of options from here. One could be:

df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)

我可以在其中将 files_to_delete.csv 读入bash数组,然后使用简单的bash for 循环,将每个实木复合地板文件s3路径传递给 aws s3 rm命令删除文件.

Where I could read files_to_delete.csv into a bash array and then use a simple bash for loop passing each parquet file s3 path to an aws s3 rm command to remove the files one by one.

这可能比 vacuum()慢,但是至少它在工作时不会消耗群集资源.

This may be slower than vacuum(), but at least it will not be consuming cluster resources while it is working.

如果我这样做,我是否也必须:

If I do this, will I also have to either:

  1. 编写一个新的 _delta_log/000000000000000 #####.json 文件来正确记录这些更改?
  2. 编写一个新的 000000000000000 #####.checkpoint.parquet 文件,以正确记录这些更改,并更改 _delta_log/_last_checkpoint 文件以指向该checkpoint.parquet 文件?
  1. write a new _delta_log/000000000000000#####.json file that correctly documents these changes?
  2. write a new 000000000000000#####.checkpoint.parquet file that correctly documents these changes and change the _delta_log/_last_checkpoint file to point to that checkpoint.parquet file?

第二种选择会更容易.

但是,如果仅删除文件并且不更改 _delta_log 中的任何内容,不会有负面影响,那将是最简单的.

However, if there will be no negative effects if I just remove the files and don't change anything in the _delta_log, then that would be the easiest.

推荐答案

TLDR.回答这个问题.

TLDR. Answering this question.

,这可能会破坏您的增量表.

Yes, this could potentially corrupt your delta table.

让我简要回答delta-lake如何使用 _delta_log 读取版本.

Let me briefly answers how delta-lake reads a version using _delta_log.

如果您想阅读版本 x ,则它将转到从 1 x-1 的所有版本的增量日志,并将要读取的镶木地板文件的总和.每10个版本之后,此过程的摘要就会另存为 .checkpoint ,以使此运行总和的过程高效.

If you want to read version x then it will go to delta log of all versions from 1 to x-1 and will make a running sum of parquet files to read. Summary of this process is saved as a .checkpoint after every 10th version to make this process of running sum efficient.

假设,
版本1日志中说,添加添加file_1,file_2,file_3 版本2日志中说,添加删除file_1,file_2,然后添加file_4

Assume,
version 1 log says, add add file_1, file_2, file_3version 2 log says, add delete file_1, file_2, and add file_4

因此,当阅读版本2时,总指令将为添加file_1,file_2,file_3->删除文件_1,文件_2,并添加文件_4

So when reading version no 2, total instruction will beadd file_1, file_2, file_3 -> delete file_1, file_2, and add file_4

因此,读取的结果文件将是file_3和file_4.

So, resultant files read will be file_3 and file_4.

在版本3中说,您从文件系统中删除 file_4 .如果您不使用 .vacuum ,则增量日志将不知道 file_4 不存在,它将尝试读取该文件并失败.

Say in version 3, you delete file_4 from file system. If you don't use .vacuum then delta log will not know that file_4 is not present, it will try to read it and will fail.

这篇关于可以删除基础实木复合地板文件而不会对DeltaLake _delta_log产生负面影响的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-25 09:21