我有以下 CSV(样本)

I have the following CSV (sample)

 id     timestamp         routeid   creationdate        parameters
 1000  21-11-2016 22:55     14      21-11-2016 22:55    RSRP=-102,
 1002  21-11-2016 22:55     14      21-11-2016 22:55    RA Req. SN=-146,TPC=4,RX Antennas=-8,
 1003  21-11-2016 22:55     14      21-11-2016 22:55    RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,


Basically I want to separate parameters from one column into multiple columns as followed :

id , timestamp, routeid, creationdate, RSRP ,RA REQ. SN, TPC,RX Antennas,MCS

因此,如果没有任何参数的值,我会将值设为 NULL 代替:

So if there is no value of any parameter for that, I would put the value as NULL like instead :

 1000  21-11-2016 22:55     14      21-11-2016 22:55 -102 NULL NULL NULL NULL


And if the value exists fill out the rows,


from pyspark import SparkContext
import os
import sys
from pyspark.sql import SQLContext
import itertools
import re

sc = SparkContext("local","Work")
sqlContext = SQLContext(sc)

df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('file:///sample.csv')

def aaa(a):
    aa = a.split(',', 15000)
    filtered = filter(lambda p: not re.match(r'^\s*$', p), aa)
    listWithNoEmptyLines = [z for z in filtered if z != []]

    for x in listWithNoEmptyLines:
       ab = x.split("=")
        AllList = []
        rsrp = ""
        ra_req_sn = ""
        tpc = ""
        rx_antenas = ""
        mcs = ""
         if 'RSRP' in ab:
            rsrp = ab[1]
            rsrp = "NULL"
         if 'RA Req. SN' in ab:
            ra_req_sn = ab[1]
            ra_req_sn = "NULL"
         if 'TPC' in ab:
            tpc = ab[1]
            tpc = "NULL"
         if 'RX Antennas' in ab:
             rx_antenas = ab[1]
             rx_antenas = "NULL"
         if 'MCS' in ab:
             mcs = ab[1]
            mcs = "NULL"
    return rsrp,ra_req_sn,tpc,rx_antenas
DFtoRDD  = df1.rdd.map(list).map(lambda x: [str(x[1]), str(x[2]), str(x[3]), aaa(str(x[4]))])
print DFtoRDD.collect()


[['1000','21-11-2016 22:55', '14', '21-11-2016 22:55', ('-102', 'NULL', 'NULL', 'NULL')], ['1002',21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '-146', 'NULL', 'NULL')], ['1003','21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '134', 'NULL', 'NULL')]]


   id     timestamp         routeid   creationdate        RSRP    RA Req. SN   TPC   RX Antennas MCS
  1000  21-11-2016 22:55     14      21-11-2016 22:55    -102      NULL        NULL    NULL       NULL
  1002  21-11-2016 22:55     14      21-11-2016 22:55    NULL    -146         4        -8        NULL
  1003  21-11-2016 22:55     14      21-11-2016 22:55    NULL    134       -191       -91       -83


您需要按如下方式定义一个 udf,然后选择每个字段.我使用了与您使用制表符分隔符所做的相同数据.

You'll need to define an udf as followed and then select each field. I have used the same data you did with a tab separator.

from pyspark.sql.functions import udf
from pyspark.sql.types import *

df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt')
# +----+----------------+-------+----------------+--------------------+
# |  id|       timestamp|routeid|    creationdate|          parameters|
# +----+----------------+-------+----------------+--------------------+
# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|
# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|
# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|
# +----+----------------+-------+----------------+--------------------+

现在让我们定义我们的 UDF 如上所述:

Now let's define our UDF as mentioned above :

import re
def f_(s):
    pattern = re.compile("([^,=]+)=([0-9\-]+)")
    return dict(pattern.findall(s or ""))


We can test the function directly on a "simple" sample :

f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}

好的,它可以工作了.我们现在可以注册在 SQL 中使用:

Ok it's working. We can now register to use in SQL :

spark.udf.register("f", f_, MapType(StringType(), StringType()))

spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+
# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|
# +---------------------------------------------------+
# |                               Map(RA Req. SN ->...|
# +---------------------------------------------------+

但就您而言,我认为您会对每个字段的实际 udf 感兴趣:

But in your case, I think that you'll be interested in an actually udf for each field :

extract = udf(f_,  MapType(StringType(), StringType()))

df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+
# |  id|       timestamp|routeid|    creationdate|          parameters|RSRP|
# +----+----------------+-------+----------------+--------------------+----+
# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|-102|
# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|null|
# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|null|
# +----+----------------+-------+----------------+--------------------+----+

