我正在尝试将Retrosheet事件文件读入Spark。事件文件就是这样构造的。

id,TEX201403310
version,2
info,visteam,PHI
info,hometeam,TEX
info,site,ARL02
info,date,2014/03/31
info,number,0
info,starttime,1:07PM
info,daynight,day
info,usedh,true
info,umphome,joycj901
info,attendance,49031
start,reveb001,"Ben Revere",0,1,8
start,rollj001,"Jimmy Rollins",0,2,6
start,utlec001,"Chase Utley",0,3,4
start,howar001,"Ryan Howard",0,4,3
start,byrdm001,"Marlon Byrd",0,5,9
id,TEX201404010
version,2
info,visteam,PHI
info,hometeam,TEX

如您所见,对于每个游戏,事件都会循环返回。

我已经将文件读入RDD,然后通过第二次for循环为每次迭代添加了一个 key ,该 key 似乎可以正常工作。但是我希望获得一些反馈,以了解是否存在使用 Spark 方法执行此操作的清洁方法。
logFile = '2014TEX.EVA'
event_data = (sc
              .textFile(logfile)
              .collect())

idKey = 0
newevent_list = []
for line in event_dataFile:
    if line.startswith('id'):
        idKey += 1
        newevent_list.append((idKey,line))
    else:
        newevent_list.append((idKey,line))

event_data = sc.parallelize(newevent_list)

最佳答案

PySpark since version 1.1支持Hadoop Input Formats。您可以使用textinputformat.record.delimiter选项使用自定义格式定界符,如下所示

from operator import itemgetter

retrosheet = sc.newAPIHadoopFile(
    '/path/to/retrosheet/file',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': '\nid,'}
)
(retrosheet
    .filter(itemgetter(1))
    .values()
    .filter(lambda x: x)
    .map(lambda v: (
        v if v.startswith('id') else 'id,{0}'.format(v)).splitlines()))

从Spark 2.4开始,您还可以使用DataFrame读取器将数据读取到text
spark.read.option("lineSep", '\nid,').text('/path/to/retrosheet/file')

10-04 20:16
查看更多