我正在使用 Apache Spark DataFrame 并且我想将数据 upsert 到 Elasticsearch
我发现我可以像这样覆盖它们
val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","443")
.option("es.net.ssl","true")
.option("es.nodes", esURL)
.option("es.mapping.id", index)
.mode("Overwrite")
.save("index/dogs")
但到目前为止我注意到这个命令
mode("Overwrite")
实际上是删除所有现有的重复数据并插入新数据有没有办法可以
upsert
它们而不是删除并重新编写它们?因为我需要几乎实时查询这些数据。提前致谢 最佳答案
mode("Overwrite")
是一个问题的原因是,当您覆盖整个数据帧时,它会立即删除与您的数据帧行匹配的所有数据,看起来整个索引对我来说都是空的,我想知道如何实际 upsert
它
这是我的代码
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.nodes.discovery", "false")
.option("es.nodes.client.only", "false")
.option("es.net.ssl","true")
.option("es.mapping.id", index)
.option("es.write.operation", "upsert")
.option("es.nodes", esURL)
.option("es.port", "443")
.mode("append")
.save(path)
注意,你必须把
"es.write.operation", "upert"
和 .mode("append")
关于scala - Spark Dataframe 更新插入到 Elasticsearch,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50962579/