本文介绍了Spark Streaming:如何在Python中获取已处理文件的文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我是Spark的新手(老实说,我也是Python的新手),如果我错过了一些明显的东西,请原谅我。
我正在使用Spark和Python进行文件流传输。在我所做的第一个示例中,Spark正确地侦听给定的目录并计算文件中出现的单词,因此我知道一切都是在侦听该目录的情况下工作的。
现在,我正在尝试获取为进行审计而处理的文件的名称。我在这里读到http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E这不是一项微不足道的任务。我这里有一个可能的解决方案http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E我试着按如下方式实现:from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fileName(data):
string = data.toDebugString
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingFileNamePrinter")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("file:///test/input/")
files = lines.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
不幸的是,现在它不是每秒监听文件夹,而是监听一次,输出‘NONE’,然后等待什么也不做。这与运行正常的代码之间的唯一区别是files = lines.foreachRDD(fileName)
在我担心获取文件名(明天的问题)之前,有人能理解为什么这只检查目录一次吗?
提前感谢M
推荐答案
,因此这是一个新手错误。我将我的解决方案张贴出来,供我自己和其他人参考。
正如@user3689574所指出的,我没有在我的函数中返回调试字符串。这充分解释了为什么我得到的是‘无’。
接下来,我在函数外部打印调试,这意味着它从来不是ForeachRDD的一部分。将其移动到函数中,如下所示:
def fileName(data):
debug = data.toDebugString()
print(debug)
这会打印调试信息,并继续监听目录。改变这一点解决了我最初的问题。在获取文件名方面,这变得非常简单。
目录没有变化时的调试字符串如下:
(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []
这清楚地表明没有文件。将文件复制到目录中时,调试输出如下:
(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []
它通过一个快速正则表达式很容易地为您提供了文件名。希望这对其他人有帮助。
这篇关于Spark Streaming:如何在Python中获取已处理文件的文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!