PySpark newb在这里,我试图组织一个笨拙的RDD历史数据。我从WASB中读取了数据,需要了解其结构。我对Schema有一个大致的了解,但是由于这是一个很大的摘录,因此我可以看到并非所有记录都是一致的。
我正在努力的是按位置引用RDD元素,以便我可以尝试从数据中提取一些含义。由于不一致,我现在不能提交模式-这意味着数据帧不是一个选择,并且我失去了DF的灵活查询样式。
有关环境和数据的快速摘要:
Azure HDInsight群集,WASB中的数据
HDFS 2.7版
纱v。2.7
Spark v 1.6(具有8个工作节点的HA配置(每个16核心x 112 GB RAM)
Jupyter-PySpark
数据:以大约奇数分隔“CSV”。 765MM条记录
读取数据ops = sc.textFile("wasb://[email protected]/raw_ops.csv")
在时髦的分隔符上拆分ops = ops.map(lambda s: s.split(u"\ufffd")).cache()
显示RDD的5条记录ops.take(5)
[[u'ALER RECOMMENDED BRAKE FLUID EXCHANGE - $139.88 ~', u'~PERFORMED DEALER RECOMMENDED BRAKE FLUID EXCHANGE USING A SPECIAL BRAKE FLUID EXCHANGE MACHINE,A PRESSURIZED SUPPLY OF MOC BRAKE FLUID', u'HIST_LD', u'2016-03-08 16:02:53', u'ARCHIVE'],[u'04638', u'0734140', u'2011-10-19', u'345267460', u'2', u'TIR', u'', u'0', u'685745051', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'],[u'04638', u'0734140', u'2011-10-19', u'345267460', u'1', u'PRIME ITEM', u'', u'0', u'0', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION ~', u'~TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATIONS AND DOCUMENT PSI ~', u'~ ~', u'~20450 SET AT 36 PSI.', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'],[u'12093', u'0399468', u'2011-10-19', u'345268559', u'2', u'201', u'', u'1.5', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE'],[u'12093', u'0399468', u'2011-10-19', u'345268559', u'1', u'PRIME ITEM', u'', u'0', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK ~', u'~REPLACE GAS CAP AND SAND FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE']]
我看到第三列可能是一个日期,如何从RDD的每一行中提取此值?
(伪代码示例在此处查找2013年数据):ops.filter(lambda x[2]: year(x[2])==2013)
我发现有关如何在线执行此操作的文档非常有限-特别是因为它涉及在没有决定性模式的情况下处理结构不一致的数据。最底下的“伪代码”行应该是什么?
我的最终目标是解析2013-2015年的数据,将其划分为自己的数据框,然后将其写入Hive。谢谢您的帮助!
最佳答案
因此,这是解决问题的一种方法:
from datetime import datetime
def only_pass(maybe_date):
try:
datetime.strptime(maybe_date,"%Y-%m-%d").date()
return 1
except Exception as err:
return 0
only_rows_with_dates = rdd.filter(lambda row: only_pass(row[2]) == 1)
关于hadoop - 无法与PySpark应用架构-不一致的字段,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38462465/