经过一段时间的演化,spark-binlog,delta-plus慢慢进入正轨。spark-binlog可以将MySQL binlog作为标准的Spark数据源来使用,目前支持insert/update/delete 三种事件的捕捉。 delta-plus则是对Delta Lake的一个增强库,譬如在Delta Plus里实现了将binlog replay进Detla表,从而保证Delta表和数据库表接近实时同步。除此之外,detla-plus还集成了譬如布隆过滤器等来尽快数据更新更新速度。更多特性可参考我写的专栏。
数据湖Delta Lake 深入解析zhuanlan.zhihu.com图标有了这两个库,加上Spark,我们就能通过两行代码完成库表的同步。
以前如果要做数据增量同步,大概需要这么个流程:
问题很明显,Pipeline长,涉及到技术多,中间转存其实也挺麻烦的,难做到实时。我们希望可以更简单些,比如最好是这样:
然后我可能只要写如下代码就可以搞定:
val spark: SparkSession = ???
val df = spark.readStream.format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").option("host","127.0.0.1").option("port","3306").option("userName","xxxxx").option("password","xxxxx").option("databaseNamePattern","mlsql_console").option("tableNamePattern","script_file").optioin("binlogIndex","4").optioin("binlogFileOffset","4").load()
df.writeStream.format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
option("path","/tmp/sync/tables").option("mode","Append").option("idCols","id").option("duration","5").option("syncType","binlog").checkpointLocation("/tmp/cpl-binlog2").mode(OutputMode.Append).save("{db}/{table}")读和写,非常简单。读你需要提供MySQL binlog信息,写的时候指定主键,以及表的存储路径。
如果使用MLSQL则更简单,下面是一个完整的流式同步脚本:
set streamName="binlog";
load binlog.`` wherehost="127.0.0.1"and port="3306"and userName="xxxx"and password="xxxxxx"and bingLogNamePrefix="mysql-bin"and binlogIndex="4"and binlogFileOffset="4"and databaseNamePattern="mlsql_console"and tableNamePattern="script_file"as table1;
save append table1
as rate.mysql_{db}.{table}
options mode="Append"and idCols="id"and duration="5"and syncType="binlog"and checkpointLocation="/tmp/cpl-binlog2";
因为是增量同步,所以第一次需要先全量同步一次,用MLSQL也很简单:
connect jdbc whereurl="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"and driver="com.mysql.jdbc.Driver"and user="xxxxx"and password="xxxx"as db_cool;
load jdbc.db_cool.script_file
as script_file;save overwrite script_file as delta.mysql_mlsql_console.script_file
;
load delta.mysql_mlsql_console.script_file
as output;如果你使用了Console则可在编辑器里直接运行:
如果你安装了binlog2delta插件, 则可享受向导便利: