本文介绍了如何在pyspark中使用foreach接收器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
如何在Python Spark结构化流中使用foreach
来触发输出操作.
How can I use foreach
in Python Spark structured streaming to trigger ops on output.
query = wordCounts\
.writeStream\
.outputMode('update')\
.foreach(func)\
.start()
def func():
ops(wordCounts)
推荐答案
Spark 2.4.0中添加了对Python中foreach接收器的支持,并且文档已更新: http://spark.apache.org/docs/latest/structured-streaming -programming-guide.html#using-foreach-and-foreach批处理
Support for the foreach sink in Python has been added in Spark 2.4.0 and the documentation has been updated: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
确保您具有该版本,并且现在可以执行以下操作:
Make sure that you have that version and you can now do:
def process_row(row):
# Process row
pass
query = streamingDF.writeStream.foreach(process_row).start()
这篇关于如何在pyspark中使用foreach接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!