问题描述
我想使用的Spark与卡夫卡(1.1.0版本),但是流星火工作不断崩溃,由于这个错误:
I am trying to use Spark Streaming with Kafka (version 1.1.0) but the Spark job keeps crashing due to this error:
14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
我从日志中获得的唯一相关的信息是这样的:
The only relevant information I get from the logs is this:
14/11/21 12:34:18 INFO MemoryStore: Block input-0-1416573258200 stored as bytes to memory (size 85.8 KB, free 2.3 GB)
14/11/21 12:34:18 INFO BlockManagerMaster: Updated info of block input-0-1416573258200
14/11/21 12:34:18 INFO BlockGenerator: Pushed block input-0-1416573258200
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
14/11/21 12:37:35 INFO BlockManagerInfo: Added input-0-1416573258200 in memory on ********:43117 (size: 85.8 KB, free: 2.3 GB)
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
样品code:
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));
jssc.checkpoint(checkpointDir);
HashMap<String, Integer> topics = new HashMap<String, Integer>();
topics.put(KAFKA_TOPIC, 1);
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("group.id", "spark-streaming-test");
kafkaParams.put("zookeeper.connect", ZOOKEEPER_QUORUM);
kafkaParams.put("zookeeper.connection.timeout.ms", "1000");
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String, String> streamPair = kafkaStream.flatMapToPair(...).reduceByKey(...);
我不知道什么原因这个问题。
I'm not sure what cause of this issue is.
推荐答案
检查以下内容。
1)你有没有正确地创建流上下文中
1) Did you create the streaming context properly as in
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
您初始化不正确。
看一看下面的
例如:code。在 recoverableNetworkCount应用
2)是否启用了财产提前写日志spark.streaming.receiver.writeAheadLog.enable
2) Have you enabled the property write ahead log "spark.streaming.receiver.writeAheadLog.enable"
3)检查流UI流的稳定性。
处理时间和LT;批次间隔。
3) Check the stability of streaming in the Streaming UI.processing time < batch interval.
这篇关于星火流:无法计算分裂,阻止未找到的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!