有没有办法动态停止

有没有办法动态停止

本文介绍了有没有办法动态停止 Spark Structured Streaming?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的场景中,我有几个数据集不时出现,我需要在我们的平台中摄取它们.摄取过程涉及几个转换步骤.其中之一是 Spark.到目前为止,我特别使用火花结构化流媒体.基础设施还涉及 kafka,spark 结构化流从中读取数据.

In my scenario I have several dataSet that comes every now and then that i need to ingest in our platform. The ingestion processes involves several transformation steps. One of them being Spark. In particular I use spark structured streaming so far. The infrastructure also involve kafka from which spark structured streaming reads data.

我想知道是否有一种方法可以检测到某个主题在一段时间内没有其他东西可以消费时决定停止工作.那就是我想在消耗该特定数据集所需的时间内运行它,然后停止它.出于特定原因,我们决定不使用 spark 的批处理版本.

I wonder if there is a way to detect when there is nothing else to consume from a topic for a while to decide to stop the job. That is i want to run it for the time it takes to consume that specific dataset and then stop it. For specific reasons we decided not to use the batch version of spark.

因此是否有任何超时或可用于检测没有更多数据进入并且所有内容都已处理的东西.

Hence is there any timeout or something that can be used to detect that there is no more data coming it and that everything has be processed.

谢谢

推荐答案

你大概可以用这个:-

def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
    while (query.isActive) {
      try{
        if(query.lastProgress.numInputRows < 10){
          query.awaitTermination(1000)
        }
      }
      catch
      {
        case e:NullPointerException => println("First Batch")
      }
      Thread.sleep(500)
    }
  }

您可以创建一个 numInputRows 变量.

You can make a numInputRows variable.

这篇关于有没有办法动态停止 Spark Structured Streaming?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-18 20:55