我正在使用 Apache Spark DataFrame 并且我想将数据 upsert 到 Elasticsearch
我发现我可以像这样覆盖它们

val df = spark.read.option("header","true").csv("/mnt/data/akc_breed_info.csv")

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","443")
  .option("es.net.ssl","true")
  .option("es.nodes", esURL)
  .option("es.mapping.id", index)
  .mode("Overwrite")
  .save("index/dogs")

但到目前为止我注意到这个命令 mode("Overwrite") 实际上是删除所有现有的重复数据并插入新数据

有没有办法可以 upsert 它们而不是删除并重新编写它们?因为我需要几乎实时查询这些数据。提前致谢

最佳答案

mode("Overwrite") 是一个问题的原因是,当您覆盖整个数据帧时,它会立即删除与您的数据帧行匹配的所有数据,看起来整个索引对我来说都是空的,我想知道如何实际 upsert

这是我的代码

df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.nodes.discovery", "false")
  .option("es.nodes.client.only", "false")
  .option("es.net.ssl","true")
  .option("es.mapping.id", index)
  .option("es.write.operation", "upsert")
  .option("es.nodes", esURL)
  .option("es.port", "443")
  .mode("append")
  .save(path)

注意,你必须把 "es.write.operation", "upert".mode("append")

关于scala - Spark Dataframe 更新插入到 Elasticsearch,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50962579/

10-09 06:44
查看更多