问题描述
我张贴了类似关于另外一个问题,前几天:
I posted another question with a similar regards a few days ago:
- How to load history data when starting Spark Streaming process, and calculate running aggregations
我现在设法得到至少工作的解决方案,这意味着该过程本身似乎正常工作。但是,正如我关于星火血腥的初学者,我似乎已经错过了如何以正确的方式构建这些应用程序类型的一些事情(性能 - /计算明智)...
I managed to get at least a "working" solution now, meaning that the process itself seems to work correctly. But, as I am a bloody beginner concerning Spark, I seem to have missed some things on how to build these kind of applications in a correct way (performance-/computational-wise)...
我想要做的:
-
从ElasticSearch在应用程序启动加载历史数据
Load history data from ElasticSearch upon application startup
开始听在启动卡夫卡的话题(与销售活动,为JSON字符串传递)与星火流
Start listening to a Kafka topic on startup (with sales events, passed as JSON strings) with Spark Streaming
我的code是以下内容:
My code is the following:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object ReadFromKafkaAndES {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("kafka").setLevel(Level.WARN)
val checkpointDirectory = "/tmp/Spark"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
conf.set("es.nodes", "localhost")
conf.set("es.port", "9200")
val topicsSet = Array("sales").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(15))
ssc.checkpoint(checkpointDirectory)
//Create SQLContect
val sqlContext = new SQLContext(sc)
//Get history data from ES
var history = sqlContext.esDF("data/salesaggregation")
//Kafka settings
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Register temporary table for the aggregated history
history.registerTempTable("history")
println("--- History -------------------------------")
history.show()
//Parse JSON as DataFrame
val saleEvents = sqlContext.read.json(rdd.values)
//Register temporary table for sales events
saleEvents.registerTempTable("sales")
val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")
println("--- Sales ---------------------------------")
sales.show()
val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")
println("--- Aggregation ---------------------------")
agg.show()
//This is our new "history"
history = agg
//Cache results
history.cache()
//Drop temporary table
sqlContext.dropTempTable("history")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
的计算似乎正常工作:
The computations seem to work correctly:
--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp| productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...| 91| 12.05| 23|
|2015-07-22 12:50:...|Buffer(256, 384, ...| 41| 7.05| 24|
+--------------------+--------------------+-----------+------------+------+
--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 255.59| 208|
| 24|2015-07-29 09:17:...|226.08999999999997| 196|
+------+--------------------+------------------+-----------+
--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 267.6400001907349| 299|
| 24|2015-07-29 09:17:...|233.14000019073484| 237|
+------+--------------------+------------------+-----------+
但如果应用程序运行多次迭代,我可以看到性能降低:
but if the applications runs several iterations, I can see that the performance deteriorates:
我也看到了大量的跳过任务,这与每次迭代增加:
I also see a high number of skipped tasks, which increases with every iteration:
第一次迭代的图形看起来像
The first iteration's graphs look like
第二次迭代的图形看起来像
The second iteration's graphs look like
该迭代次数越多过去了,时间越长,图形会,有很多跳过的步骤。
The more iterations have passed, the longer the graph will get, with lots of skipped steps.
基本上,我认为这个问题是与存储迭代'的结果为下一次迭代。不幸的是,还尝试了很多不同的东西,阅读文档后,我不能拿出一个解决方案。任何帮助是热烈AP preciated。谢谢!
Basically, I think the problem is with storing the iterations' results for the next iteration. Unfortunately, also after trying a lot of different things and reading the docs, I'm not able to come up with a solution for this. Any help is warmly appreciated. Thanks!
推荐答案
此数据流作业,是不是在僵局,但它的执行时间与每次迭代呈几何级增长,造成了数据流作业,将失败宜早不宜迟。
This streaming job is not in 'deadlock' but its execution time is increasing exponentially with every iteration, resulting in a streaming job that will fail sooner rather than later.
在RDDS的联盟 - > reduce->联盟 - >减少......反复的过程创建了一个不断增加的RDD的血统。每次迭代增加依赖于谱系需要被计算的下一次迭代,也导致增加的执行时间。依赖(血统)图显示清晰。
The union->reduce->union->reduce... iterative process on RDDs creates an ever increasing lineage of the RDD. Every iteration adds dependencies to that lineages that need to be computed on the next iteration, also resulting in the increasing execution time. The dependency (lineage) graph shows that clearly.
一个解决方法是定期检查点的RDD。
One solution is to checkpoint the RDD at regular intervals.
history.checkpoint()
您也可以探索替代工会/由 updateStateByKey
You could also explore replacing the union/reduce process by updateStateByKey
这篇关于跑在"僵局"而来自卡夫卡做流聚集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!