本文介绍了将 Spark Dataframe 中的一列转换为多列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有这种结构的大数据框(或多或少 1.2GB):

+---------+--------------+-------------------------------------------------------------------------------------------------------+|国家 |日期数据 |文字 |+---------+--------------+-------------------------------------------------------------------------------------------------------+|"EEUU" |"2016-10-03" |"T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee" ||"EEUU" |"2016-10-03" |"T_D: QQAA\nT_NAME: name_2\nT_IN: ind_2\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 46ee" ||.|.|.||.|.|.||"EEUU" |"2016-10-03" |"T_D: QQWE\nT_NAME: name_300000\nT_IN: ind_65\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 47aa" |+---------+--------------+-------------------------------------------------------------------------------------------------------+

行数为 300.000,text"字段为大约 5000 个字符的字符串.

我想在这个新字段中分隔文本"字段:

+---------+------------+------+------------+--------+--------+---------+--------+------+|国家 |日期数据 |t_d |t_name |t_in |t_c |t_add |...... |t_r |+---------+------------+------+------------+--------+--------+---------+--------+------+|EEUU |2016-10-03 |QQWE |姓名_1 |ind_1 |c1ws12 |Sec_1_P |...... |45ee ||EEUU |2016-10-03 |QQAA |名称_2 |ind_2 |c1ws12 |Sec_1_P |...... |45ee ||.|.|.|.|.|.|.|.|||.|.|.|.|.|.|.|.|||.|.|.|.|.|.|.|.|||EEUU |2016-10-03 |QQWE |名称_300000 |ind_65 |c1ws12 |Sec_1_P |...... |47aa |+---------+------------+------+------------+--------+--------+---------+--------+------+

目前,我正在使用正则表达式来解决这个问题.首先,我编写正则表达式并创建一个函数来从文本中提取单个字段(总共 90 个正则表达式):

val D_text = "((?pattern2.findFirstIn(url) 匹配 {case Some(texst_new) =>文本_新情况无=>空值"case null =>空值"})

然后,我创建了一个新的 Dataframe (tbl_separate_fields),作为应用带有正则表达式的函数从文本中提取每个新字段的结果.

val tbl_separate_fields = hiveDF.select(hiveDF("国家"),hiveDF("date_data"),getFirst(D_text)(hiveDF("texst")).alias("t_d"),getFirst(NAME_text)(hiveDF("texst")).alias("t_name"),getFirst(IN_text)(hiveDF("texst")).alias("t_in"),getFirst(C_text)(hiveDF("texst")).alias("t_c"),getFirst(ADD_text)(hiveDF("texst")).alias("t_add"),....getFirst(R_text)(hiveDF("texst")).alias("t_r"))

最后,我将此数据框插入到 Hive 表中:

tbl_separate_fields.registerTempTable("tbl_separate_fields")hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data) SELECT * FROM tbl_separate_fields")

此解决方案对整个数据帧持续 1 小时,因此我希望优化并减少执行时间.有什么解决办法吗?

我们使用 Hadoop 2.7.1Apache-Spark 1.5.1.Spark的配置是:

val conf = new SparkConf().set("spark.storage.memoryFraction", "0.1")val sc = 新的 SparkContext(conf)val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

提前致谢.

编辑数据:

+---------+--------------+-------------------------------------------------------------------------------------------------------+|国家 |日期数据 |文字 |+---------+--------------+-------------------------------------------------------------------------------------------------------+|"EEUU" |"2016-10-03" |"T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee" ||"EEUU" |"2016-10-03" |"T_NAME: name_2\nT_D: QQAA\nT_IN: ind_2\nT_C: c1ws12 ...........\nT_R: 46ee" ||.|.|.||.|.|.||"EEUU" |"2016-10-03" |"T_NAME: name_300000\nT_ADD: Sec_1_P\nT_IN: ind_65\nT_C: c1ws12\n ...........\nT_R: 47aa" |+---------+--------------+-------------------------------------------------------------------------------------------------------+
解决方案

在这种情况下使用正则表达式既缓慢又脆弱.

如果您知道所有记录都具有相同的结构,即所有文本"值都具有相同的编号和部分"的顺序,则以下代码将工作(对于任意数量的列),主要是利用 org.apache.spark.sql.functions 中的 split 功能:

import org.apache.spark.sql.functions._//首先 - 将文本"列值拆分为数组val textAsArray: DataFrame = inputDF.withColumn("as_array", split(col("text"), "\n")).drop("文本").cache()//获取样本(第一行)以获取列名,如果您想对其进行硬编码,可以跳过:val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArrayval columnNames: Array[(String, Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex//使用正确的值为每个 columnName 添加 Column 并删除不再需要的 as_array 列val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) {case (df, (colName, index)) =>df.withColumn(colName, split(col("as_array").getItem(index), ":").getItem(1))}.drop("as_array")withValueColumns.show()//对于我创建的示例数据,//在文本"列中只有 4 个部分",这会打印://+-------+--------------+----+------+-----+------+//|国家|日期数据|T_D|T_NAME|T_IN|T_C|//+-------+--------------+----+------+-----+------+//|EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|//|EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12|//+-------+--------------+----+------+-----+------+

或者,如果上面的假设不成立,你可以使用一个UDF将文本列转换成Map,然后执行类似的reduceLeft 对所需列的硬编码列表进行操作:

import sqlContext.implicits._//样本数据:顺序不同,并非所有记录都有所有列:val inputDF: DataFrame = sc.parallelize(Seq(("EEUU", "2016-10-03", "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12"),("EEUU", "2016-10-03", "T_D: QQAA\nT_IN: ind_2\nT_NAME: name_2"))).toDF("country", "date_data", "text")//预期列名的硬编码列表:val columnNames: Seq[String] = Seq("T_D", "T_NAME", "T_IN", "T_C")//UDF 将文本转换为键值映射val asMap = udf[Map[String, String], String] { s =>s.split("\n").map(_.split(": ")).map { case Array(k, v) =>k->v }.toMap}val textAsMap = inputDF.withColumn("textAsMap", asMap(col("text"))).drop("text")//对于每个列名 - 在地图中查找值val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) {case (df, colName) =>df.withColumn(colName, col("textAsMap").getItem(colName))}.drop("textAsMap")withValueColumns.show()//印刷://+-------+--------------+----+------+-----+------+//|国家|日期数据|T_D|T_NAME|T_IN|T_C|//+-------+--------------+----+------+-----+------+//|EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|//|EEUU|2016-10-03|QQAA|name_2|ind_2|空|//+-------+----------+----+------+-----+------+

I have a big dataframe (1.2GB more or less) with this structure:

+---------+--------------+------------------------------------------------------------------------------------------------------+
| country |  date_data   |                                                 text                                                 |
+---------+--------------+------------------------------------------------------------------------------------------------------+
| "EEUU"  | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee"       |
| "EEUU"  | "2016-10-03" | "T_D: QQAA\nT_NAME: name_2\nT_IN: ind_2\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 46ee"       |
| .       | .            | .                                                                                                    |
| .       | .            | .                                                                                                    |
| "EEUU"  | "2016-10-03" | "T_D: QQWE\nT_NAME: name_300000\nT_IN: ind_65\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 47aa" |
+---------+--------------+------------------------------------------------------------------------------------------------------+

The number of rows is 300.000 and "text" field is a string of 5000 characters approximately.

I would like to separate the field "text" in this new fields:

+---------+------------+------+-------------+--------+--------+---------+--------+------+
| country | date_data  | t_d  |   t_name    |  t_in  |  t_c   |  t_add  | ...... | t_r  |
+---------+------------+------+-------------+--------+--------+---------+--------+------+
| EEUU    | 2016-10-03 | QQWE | name_1      | ind_1  | c1ws12 | Sec_1_P | ...... | 45ee |
| EEUU    | 2016-10-03 | QQAA | name_2      | ind_2  | c1ws12 | Sec_1_P | ...... | 45ee |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| EEUU    | 2016-10-03 | QQWE | name_300000 | ind_65 | c1ws12 | Sec_1_P | ...... | 47aa |
+---------+------------+------+-------------+--------+--------+---------+--------+------+

Currently, I´m using regular expressions to solve this problem. Firstly, I write the regular expresions and create a function to extract individual fields from text (90 regular expressions in total):

val D_text = "((?<=T_D: ).*?(?=\\\\n))".r
val NAME_text = "((?<=nT_NAME: ).*?(?=\\\\n))".r
val IN_text = "((?<=T_IN: ).*?(?=\\\\n))".r
val C_text = "((?<=T_C: ).*?(?=\\\\n))".r
val ADD_text = "((?<=T_ADD: ).*?(?=\\\\n))".r
        .
        .
        .
        .
val R_text = "((?<=T_R: ).*?(?=\\\\n))".r

//UDF function:
 def getFirst(pattern2: scala.util.matching.Regex) = udf(
          (url: String) => pattern2.findFirstIn(url) match {
              case Some(texst_new) => texst_new
              case None => "NULL"
              case null => "NULL"
          }
   )

Then, I create a new Dataframe (tbl_separate_fields ) as a result of applying the function with a regular expression to extract every new field from text.

val tbl_separate_fields = hiveDF.select(
          hiveDF("country"),
          hiveDF("date_data"),
          getFirst(D_text)(hiveDF("texst")).alias("t_d"),
          getFirst(NAME_text)(hiveDF("texst")).alias("t_name"),
          getFirst(IN_text)(hiveDF("texst")).alias("t_in"),
          getFirst(C_text)(hiveDF("texst")).alias("t_c"),
          getFirst(ADD_text)(hiveDF("texst")).alias("t_add"),
                            .
                            .
                            .
                            .

        getFirst(R_text)(hiveDF("texst")).alias("t_r")

        )

Finally, I insert this dataframe into a Hive table:

tbl_separate_fields.registerTempTable("tbl_separate_fields")
hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data)  SELECT * FROM tbl_separate_fields")

This solution lasts for 1 hour for the entire dataframe so I wish to optimize and reduce the execution time. Is there any solution?

We are using Hadoop 2.7.1 and Apache-Spark 1.5.1. The configuration for Spark is:

val conf = new SparkConf().set("spark.storage.memoryFraction", "0.1")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Thanks in advance.

EDIT DATA:

+---------+--------------+------------------------------------------------------------------------------------------------------+
| country |  date_data   |                                                 text                                                 |
+---------+--------------+------------------------------------------------------------------------------------------------------+
| "EEUU"  | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee"       |
| "EEUU"  | "2016-10-03" | "T_NAME: name_2\nT_D: QQAA\nT_IN: ind_2\nT_C: c1ws12 ...........\nT_R: 46ee"                         |
| .       | .            | .                                                                                                    |
| .       | .            | .                                                                                                    |
| "EEUU"  | "2016-10-03" | "T_NAME: name_300000\nT_ADD: Sec_1_P\nT_IN: ind_65\nT_C: c1ws12\n ...........\nT_R: 47aa"            |
+---------+--------------+------------------------------------------------------------------------------------------------------+
解决方案

Using regular expressions in this case is slow and also fragile.

If you know that all records have the same structure, i.e. that all "text" values have the same number and order of "parts", the following code would work (for any number of columns), mainly taking advantage of the split function in org.apache.spark.sql.functions:

import org.apache.spark.sql.functions._

// first - split "text" column values into Arrays
val textAsArray: DataFrame = inputDF
  .withColumn("as_array", split(col("text"), "\n"))
  .drop("text")
  .cache()

// get a sample (first row) to get column names, can be skipped if you want to hard-code them:
val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArray
val columnNames: Array[(String, Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex

// add Column per columnName with the right value and drop the no-longer-needed as_array column
val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) {
  case (df, (colName, index)) => df.withColumn(colName, split(col("as_array").getItem(index), ": ").getItem(1))
}.drop("as_array")

withValueColumns.show()
// for the sample data I created,
// with just 4 "parts" in "text" column, this prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN|   T_C|
// +-------+----------+----+------+-----+------+
// |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// |   EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12|
// +-------+----------+----+------+-----+------+

Alternatively, if the assumption above is not true, you can use a UDF that converts the text column into a Map, and then perform a similar reduceLeft operation on the hard-coded list of desired columns:

import sqlContext.implicits._

// sample data: not the same order, not all records have all columns:
val inputDF: DataFrame = sc.parallelize(Seq(
  ("EEUU", "2016-10-03", "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12"),
  ("EEUU", "2016-10-03", "T_D: QQAA\nT_IN: ind_2\nT_NAME: name_2")
)).toDF("country", "date_data", "text")

// hard-coded list of expected column names:
val columnNames: Seq[String] = Seq("T_D", "T_NAME", "T_IN", "T_C")

// UDF to convert text into key-value map
val asMap = udf[Map[String, String], String] { s =>
  s.split("\n").map(_.split(": ")).map { case Array(k, v) => k -> v }.toMap
}


val textAsMap = inputDF.withColumn("textAsMap", asMap(col("text"))).drop("text")

// for each column name - lookup the value in the map
val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) {
  case (df, colName) => df.withColumn(colName, col("textAsMap").getItem(colName))
}.drop("textAsMap")

withValueColumns.show()
// prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN|   T_C|
// +-------+----------+----+------+-----+------+
// |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// |   EEUU|2016-10-03|QQAA|name_2|ind_2|  null|
// +-------+----------+----+------+-----+------+

这篇关于将 Spark Dataframe 中的一列转换为多列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-13 18:19