Spark 2.1.1(scala api)从s3位置流式传输json文件。
我想根据在json中为每个记录找到的ID列(“event_id”)对所有传入记录进行重复数据删除。我不关心保留哪条记录,即使该记录的重复只是部分的。我正在使用附加模式,因为通过spark.sql()方法仅对数据进行了充实/过滤,没有分组/窗口聚合。然后,我使用追加模式将 Parquet 文件写入s3。
根据文档,我应该能够在不带水印的情况下使用dropDuplicates来进行重复数据删除(显然,这在长时间运行的生产中无效)。但是,此操作失败并显示以下错误:
用户类引发异常:org.apache.spark.sql.AnalysisException:当流数据帧/数据集上存在流聚合时,不支持追加输出模式
该错误似乎很奇怪,因为我没有进行聚合(除非dropDuplicates或sparkSQL算作聚合?)。
我知道重复不会在彼此的三天之内发生,因此我通过添加水印(在下拉重复之前立即使用.withWatermark())再次进行了尝试。但是,似乎要等到3天才写完数据。 (即,由于今天是7月24日,因此仅将7月21日之前的数据写入到输出中)。
由于没有聚合,因此我想在处理该批处理后立即写每行,并简单地丢弃任何具有前三天发生的事件ID的行。有没有简单的方法可以做到这一点?
谢谢
最佳答案
就我而言,我曾经通过DStream通过两种方式实现这一目标:
单程:
tmp_data
(包含3天的唯一数据,请参见下文)leftOuterJoin
filter
,并输出新的唯一数据tmp_data
保存在HDFS或任何其他方式:
UNIQUE INDEX
关于scala - Spark Streaming dropDuplicates,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45291335/