问题描述
我在 HDFS 上的数据是序列文件格式.我正在使用 PySpark (Spark 1.6) 并试图实现两件事:
My data on HDFS is in Sequence file format. I am using PySpark (Spark 1.6) and trying to achieve 2 things:
数据路径包含 yyyy/mm/dd/hh 格式的时间戳,我想将其引入数据本身.我试过 SparkContext.wholeTextFiles 但我认为它可能不支持序列文件格式.
Data path contains a timestamp in yyyy/mm/dd/hh format that I would like to bring into the data itself. I tried SparkContext.wholeTextFiles but I think that might not support Sequence file format.
如果我想处理一天的数据并且想将日期带入数据中,我该如何处理上面的一点?在这种情况下,我将加载 yyyy/mm/dd/* 格式的数据.
How do I deal with the point above if I want to crunch data for a day and want to bring in the date into the data? In this case I would be loading data like yyyy/mm/dd/* format.
感谢任何指点.
推荐答案
如果存储类型与 SQL 类型兼容,并且您使用 Spark 2.0,那就很简单了.导入input_file_name
:
If stored types are compatible with SQL types and you use Spark 2.0 it is quite simple. Import input_file_name
:
from pyspark.sql.functions import input_file_name
读取文件并转换为DataFrame
:
df = sc.sequenceFile("/tmp/foo/").toDF()
添加文件名:
df.withColumn("input", input_file_name())
如果此解决方案不适用于您的情况,那么通用的方法是直接列出文件(对于 HDFS,您可以使用 hdfs3
库):
If this solution is not applicable in your case then universal one is to list files directly (for HDFS you can use hdfs3
library):
files = ...
一一读取添加文件名:
def read(f):
"""Just to avoid problems with late binding"""
return sc.sequenceFile(f).map(lambda x: (f, x))
rdds = [read(f) for f in files]
和联合:
sc.union(rdds)
这篇关于在 PySpark 中获取序列文件格式的文件的 HDFS 文件路径的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!