我需要将S3中可以包含~100 million records
的大文本文件拆分为多个文件,并将单个文件另存为.txt
文件回S3。这些记录没有定界,并且可以基于开始和结束位置来标识每一列。每个记录的长度根据“类型”而有所不同,“类型”是一个具有固定开始/结束位置的字符串,我需要根据“类型”的值将此文件拆分为多个文件。
例如。
My name is Chris age 45
My name is Denni age 46
My name is Vicki age 47
My name is Denni age 51
My name is Chris age 52
在上面的示例中,假设我的“记录类型”在第12位开始,在第17位结束。通过一系列步骤,
1. I need to get a distinct list of record types, which in this case are "Chris", "Denni" and "Vicki"
2. I need to split this file into 3 files, one for each record type and save them with same name as record types. Chris.txt, Denni.txt and Vicki.txt
所需输出:
Chris.txt:
My name is Chris age 45
My name is Chris age 52
Denni.txt:
My name is Denni age 46
My name is Denni age 51
Vicki.txt:
My name is Vicki age 47
我正在使用pyspark数据帧来实现这一目标,而我现在所拥有的就是这样,
df_inter =df.select(df.value.substr(start,end).alias("Type"),df.value.alias("value"))
df_types = df_inter.select("Type").distinct()
type_count = df_types.count()
while(i<type_count):
type = df_types.select(df_types.Type).collect()[i][0]
df_filtered = df_inter.filter(df_inter["Type"] == type)
df_filtered.saveAsTextFile("path")
i += 1
当前代码可以工作,但是用
~25 mins
处理具有5个节点2.5 gb file
EMR集群的r5.xlarge
,并且处理更长的时间,例如25 GB
文件。我想了解是否有更有效的方法来减少处理时间。感谢您的输入。 最佳答案
我假设您的数据由制表符分隔。您可以将整个数据加载到数据框中,如下所示:
df = spark.read.format("com.databricks.spark.csv") \
.option("mode", "DROPMALFORMED") \
.option("header", "false") \
.option("inferschema", "true") \
.option("delimiter", '\t').load(PATH_TO_FILE)
+---+----+---+-----+---+---+
|_c0| _c1|_c2| _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Chris|age| 45|
| My|name| is|Denni|age| 46|
| My|name| is|Vicki|age| 47|
| My|name| is|Denni|age| 51|
| My|name| is|Chris|age| 52|
+---+----+---+-----+---+---+
from pyspark.sql.functions import col
Then you can filter the dataframe data and split into multiple dataframe depending on your column value.
Chris_df=df.filter(col('_c3')=='Chris')
+---+----+---+-----+---+---+
|_c0| _c1|_c2| _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Chris|age| 45|
| My|name| is|Chris|age| 52|
+---+----+---+-----+---+---+
Denni_df=df.filter(col('_c3')=='Denni')
+---+----+---+-----+---+---+
|_c0| _c1|_c2| _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Denni|age| 46|
| My|name| is|Denni|age| 51|
+---+----+---+-----+---+---+
Vicki_df=df.filter(col('_c3')=='Vicki')
+---+----+---+-----+---+---+
|_c0| _c1|_c2| _c3|_c4|_c5|
+---+----+---+-----+---+---+
| My|name| is|Vicki|age| 47|
+---+----+---+-----+---+---+
希望这项工作更快!