Spark 2.1.1(scala api)从s3位置流式传输json文件。

我想根据在json中为每个记录找到的ID列(“event_id”)对所有传入记录进行重复数据删除。我不关心保留哪条记录,即使该记录的重复只是部分的。我正在使用附加模式,因为通过spark.sql()方法仅对数据进行了充实/过滤,没有分组/窗口聚合。然后,我使用追加模式将 Parquet 文件写入s3。

根据文档,我应该能够在不带水印的情况下使用dropDuplicates来进行重复数据删除(显然,这在长时间运行的生产中无效)。但是,此操作失败并显示以下错误:

用户类引发异常:org.apache.spark.sql.AnalysisException:当流数据帧/数据集上存在流聚合时,不支持追加输出模式

该错误似乎很奇怪,因为我没有进行聚合(除非dropDuplicates或sparkSQL算作聚合?)。

我知道重复不会在彼此的三天之内发生,因此我通过添加水印(在下拉重复之前立即使用.withWatermark())再次进行了尝试。但是,似乎要等到3天才写完数据。 (即,由于今天是7月24日,因此仅将7月21日之前的数据写入到输出中)。

由于没有聚合,因此我想在处理该批处理后立即写每行,并简单地丢弃任何具有前三天发生的事件ID的行。有没有简单的方法可以做到这一点?

谢谢

最佳答案

就我而言,我曾经通过DStream通过两种方式实现这一目标:

单程:

  • 加载tmp_data(包含3天的唯一数据,请参见下文)
  • 接收 batch_data ,并使用 tmp_data
  • 进行leftOuterJoin
  • 在步骤2上执行filter,并输出新的唯一数据
  • 通过step2的结果使用新的唯一数据更新 tmp_data ,并删除旧数据(超过3天)
  • tmp_data保存在HDFS或任何
  • 一次又一次重复

  • 其他方式:
  • 在mysql上创建表,并在 event_id
  • 上设置UNIQUE INDEX
  • 接收 batch_data 并仅将 event_id + event_time +任何保存到mysql
  • mysql将自动忽略重复的
  • 关于scala - Spark Streaming dropDuplicates,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45291335/

    10-16 02:44
    查看更多