论文发表于 2020年, 研究数据湖产品的很好的学习资料.
概要
开篇很明确的表明了为什么要做Delta lake这样一个产品. Databricks尝试将数据仓库直接架在云上对象存储之上, 这种尝试的过程中遇到了对象存储的一些问题, 为了解决这些问题, 提出了Delta lake这套技术方案.
对象存储的优势
- 性价比高, pay-as-you-go 用多少付多少
- 能快速扩缩容
- 存算分离, 使得用户可以单独去调整存储或计算资源
对象存储的问题
- 对象存储只提供了类似 kv 的api , 每一个路径就是一个key , 很难做到跨key 对象之间的事务保障. 在更新某张表的时候, 可能会导致其他客户端读取到中间数据. 甚至在更新过程中的意外退出可能会导致损坏的数据
- 元数据操作性能特别差, 特别是list 操作, 例如S3 每次只能返回1000个对象, 每次执行需要花费上百ms.
- 由于云上读取数据会有初始的latency(慢启动), 所以要想利用在parquet文件的footer中保存的min/max的statistics信息, 就需要频繁去读取每个文件的footer, 来进行谓词下推, 这个过程反而可能会因为"慢启动" 的问题导致这种 "skipping check" 反而比原始query 还要慢.
Delta Lake设计思路
因此, 为了解决这些对象存储的问题提出了Delta Lake, an ACID table storage layer over cloud object stores的架构设计.
他这里也对比了其他几种解决的思路
比如通过数据分区, 或者像snowflake那样通过一个集中式的元数据服务, 这个劣势就是需要单独维护一个元数据服务, 并且这个服务很容易成为瓶颈, 因为所有的操作都需要经过这个服务.
Delta lake的思路就是直接将元数据保存在object store之上, 并通过WAL日志实现事务保障.
可以看到分区目录下是数据文件, _dalta_log 目录中就记录的是transaction log.
这些日志中记录了, 哪些文件被添加了, 哪些文件被删除了, 元数据的操作, schema变更, statistics信息.
这样读取的时候需要遍历delta log 来确定所需要读取的文件列表, 那么为了避免每次读取需要查所有的json文件, 会定期的checkpoint, 将多个json文件合并成一个.parquet文件, 并在_last_checkpoint中记录最新的checkpoint id.
这样读取数据的流程就是查询checkpoint文件找到这些文件的列表, 然后可以根据元数据中的statistics 过滤掉不相关的文件, 然后直接读取这些datafile, 相比原来的操作list + 读取文件的 footer 要快很多.
读取协议
- 读取last_checkpoint id
- 使用list操作 找到 checkpoint 及他之后的json列表. 这样就可以构建出某个时间点表的视图. 这里有个点需要注意 设计中还存在对云存储最终一致性的兼容
- 根据这个元数据的文件列表进行数据读取
写入协议
- 在写完一个data object后, 需要更新元数据到delta_log目录中
- 找到要新写入的文件序列id r, 将新的record 写入
r + 1.json
(这样岂不是每个json文件只记录一个文件?) 按照论文里描述的, 一个log 文件应该是包含多个操作的, 有可能spark是微批写入, 每次写入的时候可能会涉及多个data file的变动
- 写入
r+1.json
的操作需要是原子的. 这依赖于不同的云厂商所提供的原子性的api.- 例如 Google Cloud Storage 有 atomic put-if-absent的接口
- HDFS 使用原子的rename api
- 而S3是没有的, 所以他还是额外提供了一个coordination的服务来实现
- 如果写入失败, 会重新执行前面的一步, 重新执行commit
事务保障
ACID 分别对应 原子性(要么成功要么失败),一致性(数据永远符合完整性约束,且对数据的修改在事务完成后立刻对用户可见),隔离性(并发操作互不影响),持久性(数据不易失)。
原子性: 对log文件的操作是原子的, 对这个文件的修改实际上就是事务提交的过程, 要么成功要么失败, 保障了原子性
一致性: 一致性是指,一个事务必须使数据库从一个一致性状态变换到另一个一致性状态(执行成功),或回滚到原始的一致性状态(执行失败)。这意味着必须维护完整性约束,以使在事务之前和之后数据库保持一致性和正确性。从这个意义上来说, 目前Delta lake说的是保障的是单表上的事务能力, 所以一致性也是能满足的. 原子性满足, 一致性就可以满足.
隔离性: 隔离性是指,并发执行的各个事务之间不能互相干扰,即一个事务内部的操作及使用的数据,对并发的其他事务是隔离的。此属性确保并发执行一系列事务的效果等同于以某种顺序串行地执行它们,也就是要达到这么一种效果:对于任意两个并发的事务T1和T2,在事务T1看来,T2要么在T1开始之前就已经结束,要么在T1结束之后才开始,这样每个事务都感觉不到有其他事务在并发地执行。这要求两件事:
- 在一个事务执行过程中,数据的中间的(可能不一致)状态不应该被暴露给所有的其他事务。
- 两个并发的事务应该不能操作同一项数据。数据库管理系统通常使用锁来实现这个特征。
拿转账来说,在A向B转账的整个过程中,只要事务还没有提交(commit),查询A账户和B账户的时候,两个账户里面的钱的数量都不会有变化。如果在A给B转账的同时,有另外一个事务执行了C给B转账的操作,那么当两个事务都结束的时候,B账户里面的钱必定是A转给B的钱加上C转给B的钱再加上自己原有的钱。
如此,隔离性防止了多个事务并发执行时由于交叉执行而导致数据的不一致。事务隔离分为不同级别,包括未提交读(Read uncommitted)、提交读(read committed)、可重复读(repeatable read)和串行化(Serializable)。以上4个级别的隔离性依次增强,分别解决不同的问题。事务隔离级别越高,就越能保证数据的完整性和一致性,但同时对并发性能的影响也越大。
**持久性: **数据都是写到持久化存储上, 在commit阶段发生的失败会进行重写, 可以保障持久性
隔离性中常见的问题是
- 脏读: 脏读指的是读到了其他事务未提交的数据,未提交意味着这些数据可能会回滚,也就是可能最终不会存到数据库中,也就是不存在的数据。读到了并一定最终存在的数据,这就是脏读。
- 不可重复读: 不可重复读指的是在同一事务内,不同的时刻读到的同一批数据可能是不一样的,可能会受到其他事务的影响,比如其他事务改了这批数据并提交了
- 幻读: 幻读是针对数据插入(INSERT)操作来说的。假设事务A对某些行的内容作了更改,但是还未提交,此时事务B插入了与事务A更改前的记录相同的记录行,并且在事务A提交之前先提交了,而这时,在事务A中查询,会发现好像刚刚的更改对于某些数据未起作用,但其实是事务B刚插入进来的,让用户感觉很魔幻,感觉出现了幻觉,这就叫幻读。
在Delta Lake中commit的过程可能多个client并行的, 但同一时刻只会有一个client commit成功, 失败的客户端会重试写入到下一个文件, 这个时候应该会有一些冲突检测的逻辑. 这是一种乐观并发控制机制
并且通过delta log维护每次提交的版本文件列表, 可以实现MVCC语义. 读取的时候只会读取到某一个提交成功的版本. 所以Delta Lake是通过乐观并发控制 + MVCC, 以上的隔离性的几个问题都不会存在, 达到了Serializable隔离级别
因此可以说保障了ACID语义
Higher Level Features
Time Travel and Rollbacks
在checkpoint中记录的每个版本中的快照, 所以时间旅行非常的简单, Spark SQL支持通过AS OF timestamp
和VERSION AS OF commit_id
来读取某个版本的数据. 当然为了读取历史版本的数据, 需要去设置数据保存的时长, 防止读取的时候数据已经被物理删除了.
同时也可以通过MERGE INTO的语法来回滚/修复数据
Efficient UPSERT, DELETE and MERGE
由于支持了事务, 所以可以很安全的更新数据. 而不会影响当前的reader. 他这里面没有提到具体是怎么实现更新的, 但是感觉应该是类似于hudi, 先定位到文件, 然后在重写这部分文件.
Streaming Ingest and Consumption
可以承担一部分消息队列的能力. delta_log的数据可以用作读取某个cp点之后的数据的变化. 可以在后台执行compaction任务将小文件合并成大文件, 优化读取的性能.
Data Layout Optimization
也是由于事务的支持, 因此可以通过一些后台的layout optimization来优化读取, 常见的优化手段: compact, 更新 statistics, 索引, Z-Ordering
Z-Order 能使得表在多维度的场景中都能取得比较不错的data skipping效果. 性能测试中也展示了zorder在多维场景下的数据过滤的效果.
Caching
这里是指可以在计算端的本地磁盘缓存远程的数据, 由于一个数据文件写入后即不再变化了, 所以缓存是比较安全的, 而且可以很好的加速查询
Audit Logging
通过元数据管理可以将数据的操作日志都记录在元数据中, 可以表文件的历史变更, paimon里面也有类似的audit log的表
Schema Evolution
通过元数据管理, 表其实具备了多版本的能力, 可以通过不同版本的schema去读取底层的数据. 这也是天然就能实现的
Connectors to Query and ETL Engines
这一点比较有意思, 这个是指他提供了和其他系统整合的connector, 但是利用了一种机制
通过提供 _symlink_format_manifest 文件, 这样就可以暴露一个当前系统的快照给那些批式处理和olap系统, 而这些系统只要能读取parquet文件就可以了.
用户只需要跑一个sql, 就可以生成这样一个manifest文件, 就可以作为Presto, Arthena, RedShift, SnowFlake的外表了. 也不需要额外插件开发的负担
常见使用场景
基于云端存储的传统ETL, 利用上云上对象存储的优势
BI分析. 直接基于对象存储的Olap查询. 而spark为了加速这种查询专门开发了Photon runtime, 极大提升即席查询的能力, 好处是不需要单独把数据导入到一个专门的系统中
最后Delta lake在业务上的集大成者应该就是Lakehouse, Databricks应该在随后就发表了一篇相关论文. Lakehouse 直接基于对象存储的统一存储来实现 批式ETL, 流式计算, Olap分析, Machine learning
性能
最后在性能上主要有几个大的提升点:
- 通过log文件管理元数据, 减少了传统架构中的list等元数据操作
- Z-Ordering 等layout 优化的效果
- 写入性能保持和之前接近
Related Work
Hudi/Iceberg: 很类似, 也是在存储上定义了存储格式和协议. 能力上其实也差不多.
Hive: 当前hive也具备了ACID的能力, 借助于metastore. 但是可能缺少一些time travel能力, 数据新鲜度不够, 分区数据大的时候元数据操作仍然会是瓶颈
HBase/kudu: 这些系统也是能在HDFS之上提供更低延迟的写入和读取能力, 可以将small write合并后写入, 但是需要运行一个单独的分布式系统, 成本完全不是一回事
Cstore: 最后这个是Cstore, 长期来看这些系统都是在尝试提供高性能的事务能力, 以及分析能力. 而Cstore就是尝试这样融合的HTAP的系统. 这些系统都是提供了常驻的服务来优化OLTP或者analysis.
而Delta lake呢 则是裸跑在object store之上, 提供了相对够用的事务层. 免去了独立存储服务的开销.
总结
Delta Lake 在云上对象存储之上封装了一层基于乐观并发控制 + MVCC的写入读取协议, 提供了ACID语义, 在这基础上实现了诸如Time Travel, Update/Delete, Data Layout, Schema Evolution, Streaming Read等一系列high level的能力.
虽然市场上数据湖有三大主要的产品, Delta lake, Hudi, Iceberg. 但是这三者的创建之初的初衷都还是各有侧重点的. 这篇文章中主要介绍了Delta lake主要还是想尽可能的利用云上对象存储廉价和pay-as-you-go的特性, 但是又不得不解决一系列对象存储的问题所引入的技术.
而Hudi最早应该是Uber为了解决其内部离线数仓增量更新的问题. 但是随着开源社区和市场的打磨, 这些产品的功能也逐渐趋同, 都基本具备了 Update/Delete, ACID, Time Travel, Data layout optimization, 流读等功能.
此外也可以看到的技术大趋势
- 存算分离
- 基于同一存储的Lakehouse架构
参考
https://developer.aliyun.com/article/874742 云数仓与数据湖元数据 ACID 介绍与对比
https://www.cnblogs.com/cciejh/p/acid.html 深入理解大数据架构之——事务及其ACID特性
https://zhuanlan.zhihu.com/p/117476959 MySQL事务隔离级别和实现原理
https://draveness.me/database-concurrency-control/
https://docs.delta.io/latest/concurrency-control.html#id1