我正在尝试使用JDBC write将spark DF插入Postgres。PistGRESH表对其中一个列具有唯一约束,当要插入的DF违反约束时,整个批次被拒绝,SCAP会话关闭,使得错误重复键值违反唯一约束,这是正确的,因为数据是重复的(已经存在于数据库中)。
org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148
需要插入不违反约束的数据行并忽略失败的行,而不使整个批处理失败。
使用的代码是:

mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"}
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)

我该怎么做?

最佳答案

不幸的是,没有现成的解决方案。我看到了许多可能的解决方案:
作为forEachPartition函数的一部分,在PostgreSQL数据库中实现冲突解决的业务逻辑。例如,捕获约束冲突的异常,然后向日志报告。
去掉PostgreSQL数据库的约束,使用autogenerated PK means enable在数据库中存储重复的行。重复数据消除逻辑可以作为每个SQL查询的一部分进一步实现,也可以每天/每小时运行重复数据消除。您可以看到示例here
如果没有其他系统或进程写入SpGRESQL表,除非您的SCAPLE作业,则可以使用Soin操作进行过滤器,以在SARK之前删除SCADATAFAFRAME中的所有现有行。
我希望我的想法会有帮助。

10-06 16:05
查看更多