本文介绍了运行“死锁"在从 Kafka 进行流式聚合时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

几天前我发布了另一个类似的问题:

  • 我还看到大量跳过的任务,而且每次迭代都会增加:

    第一次迭代的图看起来像

    第二次迭代的图形看起来像

    迭代次数越多,图就会越长,跳过的步骤越多.

    基本上,我认为问题在于为下一次迭代存储迭代结果.不幸的是,在尝试了很多不同的事情并阅读了文档之后,我也无法为此提出解决方案.任何帮助表示热烈赞赏.谢谢!

    解决方案

    此流式作业并未陷入死锁",但其执行时间随着每次迭代呈指数增长,导致流式作业迟早会失败.

    union->reduce->union->reduce... RDD 上的迭代过程创建了不断增加的 RDD 谱系.每次迭代都会向需要在下一次迭代中计算的谱系添加依赖关系,这也会导致执行时间增加.依赖(世系)图清楚地表明了这一点.

    一种解决方案是定期检查 RDD.

    history.checkpoint()

    您还可以探索通过 updateStateByKey

    I posted another question with a similar regards a few days ago:

    I managed to get at least a "working" solution now, meaning that the process itself seems to work correctly. But, as I am a bloody beginner concerning Spark, I seem to have missed some things on how to build these kind of applications in a correct way (performance-/computational-wise)...

    What I want to do:

    1. Load history data from ElasticSearch upon application startup

    2. Start listening to a Kafka topic on startup (with sales events, passed as JSON strings) with Spark Streaming

    3. For each incoming RDD, do an aggregation per user
    4. Union the results from 3. with the history
    5. Aggregate the new values, such as total revenue, per user
    6. Use the results from 5. as new "history" for the next iteration

    My code is the following:

    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
    import org.elasticsearch.spark.sql._
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    
    object ReadFromKafkaAndES {
      def main(args: Array[String]) {
    
        Logger.getLogger("org").setLevel(Level.WARN)
        Logger.getLogger("akka").setLevel(Level.WARN)
        Logger.getLogger("kafka").setLevel(Level.WARN)
    
        val checkpointDirectory = "/tmp/Spark"
        val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
        conf.set("es.nodes", "localhost")
        conf.set("es.port", "9200")
    
        val topicsSet = Array("sales").toSet
    
        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc, Seconds(15))
        ssc.checkpoint(checkpointDirectory)
    
        //Create SQLContect
        val sqlContext = new SQLContext(sc)
    
        //Get history data from ES
        var history = sqlContext.esDF("data/salesaggregation")
    
        //Kafka settings
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    
        // Create direct kafka stream with brokers and topics
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
    
        //Iterate
        messages.foreachRDD { rdd =>
    
          //If data is present, continue
          if (rdd.count() > 0) {
    
            //Register temporary table for the aggregated history
            history.registerTempTable("history")
    
            println("--- History -------------------------------")
            history.show()
    
            //Parse JSON as DataFrame
            val saleEvents = sqlContext.read.json(rdd.values)
    
            //Register temporary table for sales events
            saleEvents.registerTempTable("sales")
    
            val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")
    
            println("--- Sales ---------------------------------")
            sales.show()
    
            val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")
    
            println("--- Aggregation ---------------------------")
            agg.show()
    
            //This is our new "history"
            history = agg
    
            //Cache results
            history.cache()
    
            //Drop temporary table
            sqlContext.dropTempTable("history")
    
          }
    
        }
    
        // Start the computation
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    The computations seem to work correctly:

    --- History -------------------------------
    +--------------------+--------------------+-----------+------------+------+
    | latestSaleTimestamp|         productList|totalPoints|totalRevenue|userId|
    +--------------------+--------------------+-----------+------------+------+
    |2015-07-22 10:03:...|Buffer(47, 1484, ...|         91|       12.05|    23|
    |2015-07-22 12:50:...|Buffer(256, 384, ...|         41|        7.05|    24|
    +--------------------+--------------------+-----------+------------+------+
    
    --- Sales ---------------------------------
    +------+--------------------+------------------+-----------+
    |userId| latestSaleTimestamp|      totalRevenue|totalPoints|
    +------+--------------------+------------------+-----------+
    |    23|2015-07-29 09:17:...|            255.59|        208|
    |    24|2015-07-29 09:17:...|226.08999999999997|        196|
    +------+--------------------+------------------+-----------+
    
    --- Aggregation ---------------------------
    +------+--------------------+------------------+-----------+
    |userId| latestSaleTimestamp|      totalRevenue|totalPoints|
    +------+--------------------+------------------+-----------+
    |    23|2015-07-29 09:17:...| 267.6400001907349|        299|
    |    24|2015-07-29 09:17:...|233.14000019073484|        237|
    +------+--------------------+------------------+-----------+
    

    but if the applications runs several iterations, I can see that the performance deteriorates:

    I also see a high number of skipped tasks, which increases with every iteration:

    The first iteration's graphs look like

    The second iteration's graphs look like

    The more iterations have passed, the longer the graph will get, with lots of skipped steps.

    Basically, I think the problem is with storing the iterations' results for the next iteration. Unfortunately, also after trying a lot of different things and reading the docs, I'm not able to come up with a solution for this. Any help is warmly appreciated. Thanks!

    解决方案

    This streaming job is not in 'deadlock' but its execution time is increasing exponentially with every iteration, resulting in a streaming job that will fail sooner rather than later.

    The union->reduce->union->reduce... iterative process on RDDs creates an ever increasing lineage of the RDD. Every iteration adds dependencies to that lineages that need to be computed on the next iteration, also resulting in the increasing execution time. The dependency (lineage) graph shows that clearly.

    One solution is to checkpoint the RDD at regular intervals.

    history.checkpoint()
    

    You could also explore replacing the union/reduce process by updateStateByKey

    这篇关于运行“死锁"在从 Kafka 进行流式聚合时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 03:59