我已经去实现了像这样的结构化流...
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
这一切似乎都可行,但是查看MyWrapper.myWriter的吞吐量却很糟糕。它实际上是在尝试成为JDBC接收器,如下所示:
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach( s => statement.execute(s) )
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
所以我的问题是-是否为每一行实例化了新的ForeachWriter?因此,对于数据集中的每一行都调用open()和close()吗?
是否有更好的设计来提高吞吐量?
如何一次解析SQL语句并执行多次,还保持数据库连接打开?
最佳答案
基础接收器的打开和关闭取决于ForeachWriter
的实现。
调用ForeachWriter
的相关类是 ForeachSink
,这是调用您的编写器的代码:
data.queryExecution.toRdd.foreachPartition { iter =>
if (writer.open(TaskContext.getPartitionId(), batchId)) {
try {
while (iter.hasNext) {
writer.process(encoder.fromRow(iter.next()))
}
} catch {
case e: Throwable =>
writer.close(e)
throw e
}
writer.close(null)
} else {
writer.close(null)
}
}
对于从您的源生成的每个批处理,尝试打开和关闭编写器。如果您想每次打开
open
和close
来真正打开和关闭接收器驱动程序,则需要通过实现来做到这一点。如果您想更好地控制数据的处理方式,则可以实现
Sink
特性,该特性提供批处理ID和底层的DataFrame
:trait Sink {
def addBatch(batchId: Long, data: DataFrame): Unit
}
关于database - Spark结构化流式ForeachWriter和数据库性能,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46820160/