问题描述
我有以下 CSV(样本)
I have the following CSV (sample)
id timestamp routeid creationdate parameters
1000 21-11-2016 22:55 14 21-11-2016 22:55 RSRP=-102,
1002 21-11-2016 22:55 14 21-11-2016 22:55 RA Req. SN=-146,TPC=4,RX Antennas=-8,
1003 21-11-2016 22:55 14 21-11-2016 22:55 RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,
基本上我想将一列中的参数分成多列,如下所示:
Basically I want to separate parameters from one column into multiple columns as followed :
id , timestamp, routeid, creationdate, RSRP ,RA REQ. SN, TPC,RX Antennas,MCS
因此,如果没有任何参数的值,我会将值设为 NULL 代替:
So if there is no value of any parameter for that, I would put the value as NULL like instead :
1000 21-11-2016 22:55 14 21-11-2016 22:55 -102 NULL NULL NULL NULL
如果值存在,则填写行,
And if the value exists fill out the rows,
这是我尝试过的:
from pyspark import SparkContext
import os
import sys
from pyspark.sql import SQLContext
import itertools
import re
sc = SparkContext("local","Work")
sqlContext = SQLContext(sc)
df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('file:///sample.csv')
def aaa(a):
aa = a.split(',', 15000)
filtered = filter(lambda p: not re.match(r'^\s*$', p), aa)
listWithNoEmptyLines = [z for z in filtered if z != []]
for x in listWithNoEmptyLines:
ab = x.split("=")
AllList = []
rsrp = ""
ra_req_sn = ""
tpc = ""
rx_antenas = ""
mcs = ""
if 'RSRP' in ab:
rsrp = ab[1]
else:
rsrp = "NULL"
if 'RA Req. SN' in ab:
ra_req_sn = ab[1]
else:
ra_req_sn = "NULL"
if 'TPC' in ab:
tpc = ab[1]
else:
tpc = "NULL"
if 'RX Antennas' in ab:
rx_antenas = ab[1]
else:
rx_antenas = "NULL"
if 'MCS' in ab:
mcs = ab[1]
else:
mcs = "NULL"
return rsrp,ra_req_sn,tpc,rx_antenas
DFtoRDD = df1.rdd.map(list).map(lambda x: [str(x[1]), str(x[2]), str(x[3]), aaa(str(x[4]))])
print DFtoRDD.collect()
给我以下结果,
[['1000','21-11-2016 22:55', '14', '21-11-2016 22:55', ('-102', 'NULL', 'NULL', 'NULL')], ['1002',21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '-146', 'NULL', 'NULL')], ['1003','21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '134', 'NULL', 'NULL')]]
预期结果:
id timestamp routeid creationdate RSRP RA Req. SN TPC RX Antennas MCS
1000 21-11-2016 22:55 14 21-11-2016 22:55 -102 NULL NULL NULL NULL
1002 21-11-2016 22:55 14 21-11-2016 22:55 NULL -146 4 -8 NULL
1003 21-11-2016 22:55 14 21-11-2016 22:55 NULL 134 -191 -91 -83
推荐答案
您需要按如下方式定义一个 udf,然后选择每个字段.我使用了与您使用制表符分隔符所做的相同数据.
You'll need to define an udf as followed and then select each field. I have used the same data you did with a tab separator.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt')
df1.show()
# +----+----------------+-------+----------------+--------------------+
# | id| timestamp|routeid| creationdate| parameters|
# +----+----------------+-------+----------------+--------------------+
# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|
# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|
# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|
# +----+----------------+-------+----------------+--------------------+
现在让我们定义我们的 UDF 如上所述:
Now let's define our UDF as mentioned above :
import re
def f_(s):
pattern = re.compile("([^,=]+)=([0-9\-]+)")
return dict(pattern.findall(s or ""))
我们可以直接在简单"样本上测试该函数:
We can test the function directly on a "simple" sample :
f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}
好的,它可以工作了.我们现在可以注册在 SQL 中使用:
Ok it's working. We can now register to use in SQL :
spark.udf.register("f", f_, MapType(StringType(), StringType()))
spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+
# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|
# +---------------------------------------------------+
# | Map(RA Req. SN ->...|
# +---------------------------------------------------+
但就您而言,我认为您会对每个字段的实际 udf 感兴趣:
But in your case, I think that you'll be interested in an actually udf for each field :
extract = udf(f_, MapType(StringType(), StringType()))
df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+
# | id| timestamp|routeid| creationdate| parameters|RSRP|
# +----+----------------+-------+----------------+--------------------+----+
# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|-102|
# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|null|
# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|null|
# +----+----------------+-------+----------------+--------------------+----+
这篇关于使用 PySpark 将复杂的 RDD 转换为扁平化的 RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!