我基本上是从Kafka的源中读取内容,并将每条消息转储到我的foreach处理器中(对于简单的示例,感谢Jacek的页面)。

如果这确实有效,那么我将在这里的process方法中实际执行一些业务逻辑,但是,这不起作用。我相信println自从它在执行程序上运行以来就无法正常工作,并且没有办法将这些日志返回给驱动程序。但是,此insert into临时表至少应该可以工作,并向我显示消息实际上已被消耗并处理到接收器。

我在这里想念什么?

真的在寻找另一双眼睛来检查我在这里的努力:

 val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
      .option("subscribe", src_topic)
      .load()

    val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

    val df = stream.selectExpr("cast (value as string) as json")

    val writer = new ForeachWriter[Row] {
      val scon = new SConConnection
      override def open(partitionId: Long, version: Long) = {
        true
      }
      override def process(value: Row) = {
        println("++++++++++++++++++++++++++++++++++++" + value.get(0))
        scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
      }
      override def close(errorOrNull: Throwable) = {
        scon.closeConnection
      }
    }


    val yy = df.writeStream
      .queryName("ForEachQuery")
      .foreach(writer)
      .outputMode("append")
      .start()

    yy.awaitTermination()

最佳答案

感谢Harald和其他人的评论,我发现了几件事,这使我实现了正常的处理行为-

本地模式下的

  • 测试代码,yarn不是调试
  • 的最大帮助
  • 由于某种原因,foreach接收器的处理方法不允许调用其他方法。当我将业务逻辑直接放在那里时,它就可以工作。

  • 希望对别人有帮助。

    关于scala - 结构化流-Foreach接收器,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44193162/

    10-11 13:10