我正在玩Spark Streaming API,并专门测试了检查点功能。但是,我发现在某些情况下返回的检查点不完整。

以下代码针对版本2.1.0(针对Scala 2.11编译)以local[2]模式运行(尽管我在分布式运行时注意到类似的现象):

public static void main (String[] args) throws Exception {
    SparkConf spark = new SparkConf();

    createAppendablePrintStream().println(ZonedDateTime.now() + " Starting stream");
    String checkpoint = "/export/spark/checkpoint"; // NFS mounted directory
    JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpoint, () -> {
        JavaStreamingContext x = new JavaStreamingContext(spark, Durations.seconds(5));
        x.checkpoint(checkpoint);
        JavaDStream<String> lines = x.socketTextStream("192.168.8.130", 7777); // IP address of my local VM
        JavaPairDStream<String, Integer> stateByType = lines.mapToPair(line -> new Tuple2(line.split(" ")[0], line)).updateStateByKey((Function2<List<String>, Optional<Integer>, Optional<Integer>>) (values, state) -> Optional.of(state.orElse(0) + values.size()));
        stateByType.foreachRDD(rdd -> createAppendablePrintStream().println(ZonedDateTime.now() + " Current state: " + rdd.collectAsMap()));
        return x;
    });

    jssc.start();
    jssc.awaitTermination();
    createAppendablePrintStream().println(ZonedDateTime.now() + " Closing stream");
}

private static PrintStream createAppendablePrintStream() {
    try {
        return new PrintStream(new FileOutputStream("/tmp/result.txt", true));
    } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
    }
}


当我向该流中添加新密钥并立即关闭驱动程序时,似乎没有作为检查点的一部分进行还原,如以下日志摘录所证明:

2016-12-29T16:53:33.185Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:53:35.086Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:53:40.288Z[Europe/London] Current state: {WARN:=2, ERROR:=1, INFO:=1}
2016-12-29T16:53:43.695Z[Europe/London] Closing stream
2016-12-29T16:53:53.100Z[Europe/London] Starting stream
2016-12-29T16:54:08.154Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:13.226Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:15.026Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:15.768Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:17.136Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:17.521Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:18.795Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:19.360Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:20.634Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:25.052Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:30.066Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}


(请注意,在启动后添加了ALERT条目,以显示INFO条目从不返回。)

但是,当我允许新密钥在第二帧中保留状态的一部分时,它将立即从检查点恢复,如以下日志摘录所示:

2016-12-29T16:54:25.052Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:30.066Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
2016-12-29T16:54:35.051Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
2016-12-29T16:54:38.545Z[Europe/London] Closing stream
2016-12-29T16:54:47.306Z[Europe/London] Starting stream
2016-12-29T16:55:01.982Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}


对于这种不完整状态有解释吗?可以通过更改配置来解决吗?还是我需要向Spark人员提交错误报告?

最佳答案

我不知道您是如何停止StreamingContext的。但是,对于基于接收方的流,您需要将spark.streaming.receiver.writeAheadLog.enable设置为true以启用预写日志。否则,如您所见,最后一批可能会丢失,因为Spark Streaming无法重播它。

有关更多详细信息,请参见http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics

10-06 02:20