我正在玩Spark Streaming API,并专门测试了检查点功能。但是,我发现在某些情况下返回的检查点不完整。
以下代码针对版本2.1.0(针对Scala 2.11编译)以local[2]
模式运行(尽管我在分布式运行时注意到类似的现象):
public static void main (String[] args) throws Exception {
SparkConf spark = new SparkConf();
createAppendablePrintStream().println(ZonedDateTime.now() + " Starting stream");
String checkpoint = "/export/spark/checkpoint"; // NFS mounted directory
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpoint, () -> {
JavaStreamingContext x = new JavaStreamingContext(spark, Durations.seconds(5));
x.checkpoint(checkpoint);
JavaDStream<String> lines = x.socketTextStream("192.168.8.130", 7777); // IP address of my local VM
JavaPairDStream<String, Integer> stateByType = lines.mapToPair(line -> new Tuple2(line.split(" ")[0], line)).updateStateByKey((Function2<List<String>, Optional<Integer>, Optional<Integer>>) (values, state) -> Optional.of(state.orElse(0) + values.size()));
stateByType.foreachRDD(rdd -> createAppendablePrintStream().println(ZonedDateTime.now() + " Current state: " + rdd.collectAsMap()));
return x;
});
jssc.start();
jssc.awaitTermination();
createAppendablePrintStream().println(ZonedDateTime.now() + " Closing stream");
}
private static PrintStream createAppendablePrintStream() {
try {
return new PrintStream(new FileOutputStream("/tmp/result.txt", true));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
当我向该流中添加新密钥并立即关闭驱动程序时,似乎没有作为检查点的一部分进行还原,如以下日志摘录所证明:
2016-12-29T16:53:33.185Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:53:35.086Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:53:40.288Z[Europe/London] Current state: {WARN:=2, ERROR:=1, INFO:=1}
2016-12-29T16:53:43.695Z[Europe/London] Closing stream
2016-12-29T16:53:53.100Z[Europe/London] Starting stream
2016-12-29T16:54:08.154Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:13.226Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:15.026Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:15.768Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:17.136Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:17.521Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:18.795Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:19.360Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:20.634Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:25.052Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:30.066Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
(请注意,在启动后添加了
ALERT
条目,以显示INFO
条目从不返回。)但是,当我允许新密钥在第二帧中保留状态的一部分时,它将立即从检查点恢复,如以下日志摘录所示:
2016-12-29T16:54:25.052Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:30.066Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
2016-12-29T16:54:35.051Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
2016-12-29T16:54:38.545Z[Europe/London] Closing stream
2016-12-29T16:54:47.306Z[Europe/London] Starting stream
2016-12-29T16:55:01.982Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
对于这种不完整状态有解释吗?可以通过更改配置来解决吗?还是我需要向Spark人员提交错误报告?
最佳答案
我不知道您是如何停止StreamingContext的。但是,对于基于接收方的流,您需要将spark.streaming.receiver.writeAheadLog.enable
设置为true
以启用预写日志。否则,如您所见,最后一批可能会丢失,因为Spark Streaming无法重播它。
有关更多详细信息,请参见http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics。