问题描述
我从此处摘录了一些经过修改的示例- https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
我添加了几秒钟writeStream(接收器):
scala
case class MyWriter1() extends ForeachWriter[Row]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
println(s"custom1 - ${value.get(0)}")
}
override def close(errorOrNull: Throwable): Unit = true
}
case class MyWriter2() extends ForeachWriter[(String, Int)]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: (String, Int)): Unit = {
println(s"custom2 - $value")
}
override def close(errorOrNull: Throwable): Unit = true
}
object Main extends Serializable{
def main(args: Array[String]): Unit = {
println("starting")
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.master("local[*]")
.appName("app-test")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query1 = wordCounts.writeStream
.outputMode("update")
.foreach(MyWriter1())
.start()
val ds = wordCounts.map(x => (x.getAs[String]("value"), x.getAs[Int]("count")))
val query2 = ds.writeStream
.outputMode("update")
.foreach(MyWriter2())
.start()
spark.streams.awaitAnyTermination()
}
}
不幸的是,只有第一个查询运行,第二个从未运行(从未调用MyWriter2)
请告知我我在做什么错.根据文档:您可以在单个SparkSession中启动任意数量的查询.它们都将同时运行以共享群集资源.
我遇到了相同的情况(但是在较新的结构化流api上),在我的情况下,它有助于在最后一个streamingQuery上调用awaitTermination(). >
s.th.像:
query1.start()
query2.start().awaitTermination()
更新:相反,此内置解决方案/方法更好:
sparkSession.streams.awaitAnyTermination()
I slightly modified example taken from here - https://github.com/apache/spark/blob/v2.2.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
I added seconds writeStream (sink):
scala
case class MyWriter1() extends ForeachWriter[Row]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
println(s"custom1 - ${value.get(0)}")
}
override def close(errorOrNull: Throwable): Unit = true
}
case class MyWriter2() extends ForeachWriter[(String, Int)]{
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: (String, Int)): Unit = {
println(s"custom2 - $value")
}
override def close(errorOrNull: Throwable): Unit = true
}
object Main extends Serializable{
def main(args: Array[String]): Unit = {
println("starting")
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.master("local[*]")
.appName("app-test")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query1 = wordCounts.writeStream
.outputMode("update")
.foreach(MyWriter1())
.start()
val ds = wordCounts.map(x => (x.getAs[String]("value"), x.getAs[Int]("count")))
val query2 = ds.writeStream
.outputMode("update")
.foreach(MyWriter2())
.start()
spark.streams.awaitAnyTermination()
}
}
Unfortunately, only first query runs, second never runs (MyWriter2 never been called)
Please advice what I'm doing wrong. According to doc: You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources.
I had the same situation (but on the newer structured-streaming api) and in my case it helped to call awaitTermination() on the last streamingQuery.
s.th. like:
query1.start()
query2.start().awaitTermination()
Update:Instead above, this build-in solution/method is better:
sparkSession.streams.awaitAnyTermination()
这篇关于Spark结构化流,多个查询未同时运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!