我的 Spark 流应用程序从Kafka获取数据并对其进行处理。
如果应用程序出现故障,大量的数据会存储在Kafka中,并且在下次启动Spark Streaming应用程序时,它会崩溃,因为一次会消耗过多的数据。
由于我的应用程序不关心过去的数据,因此只使用当前(最新)数据是完全可以的。
我找到了“auto.reset.offest” 选项,它在Spark中的行为几乎没有什么不同。
如果已配置,它将删除存储在zookeeper中的偏移量。
但是,尽管它具有意外的行为,但应该在删除后从最新的数据中获取数据。
但是我发现不是。
我看到所有偏移量在使用数据之前都已清除。
然后,由于默认行为,它应按预期获取数据。
但由于数据过多,它仍然会崩溃。
当我使用“Kafka-Console-Consumer”清理偏移量并使用最新数据时,
并运行我的应用程序,它可以按预期工作。
因此,它看起来“auto.reset.offset”不起作用,并且默认情况下,spark流中的kafka使用者从“最小”偏移量中获取数据。
您对如何使用最新的Spark Streaming中的Kafka数据有任何想法吗?
我正在使用spark-1.0.0和Kafka-2.10-0.8.1。
提前致谢。
最佳答案
我认为您拼写了属性名称。正确的 key 是 auto.offset.reset ,而不是 auto.reset.offest
更多信息在这里:http://kafka.apache.org/documentation.html#configuration
希望这可以帮助。