我在csv文件中有一个timestamp字段,该字段使用spark csv库加载到数据框。同一段代码可在我的本地计算机上使用Spark 2.0版本运行,但在Azure Hortonworks HDP 3.5和3.6上引发错误。

我已经检查过,Azure HDInsight 3.5也使用相同的Spark版本,因此我认为Spark版本没有问题。

import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
  .add("EventDate",DataTypes.TimestampType)
  .add("Name",DataTypes.StringType)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)

整个异常如下:
Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
  at java.sql.Timestamp.valueOf(Timestamp.java:237)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at scala.util.Try.getOrElse(Try.scala:79)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
  ... 27 more

csv文件只有一行,如下所示:
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

最佳答案

TL; DR 使用timestampFormat选项(不是dateFormat)。

我设法在最新的Spark版本 2.3.0-SNAPSHOT (从母版构建)中重现了它。

// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT

case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema

scala> spark
  .read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .schema(schema)
  .load("so-43259485.csv")
  .show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql.Timestamp.valueOf(Timestamp.java:237)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at scala.util.Try.getOrElse(Try.scala:79)

corresponding line in the Spark sources是问题的“根本原因”:
Timestamp.valueOf(s)

阅读javadoc of Timestamp.valueOf之后,您可以了解到该参数应为:



注意:“可以忽略小数秒”,因此,我们首先将EventDate加载为字符串,然后将其删除,然后将其转换为Timestamp,以将其切断。
val eventsAsString = spark.read.format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .load("so-43259485.csv")

事实证明,for fields of TimestampType type如果定义,则首先生成Spark uses timestampFormat option,并且仅当不使用the code the uses Timestamp.valueOf 时才生成。

事实证明,解决方法仅是使用timestampFormat选项(而不是dateFormat!)。
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate              |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+

Spark 2.1.0

通过inferSchema选项和自定义timestampFormat在CSV中使用模式推断。

使用inferSchema触发模式推断对于timestampFormat生效很重要。
val events = spark.read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .option("inferSchema", true)
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .load("so-43259485.csv")

scala> events.show(false)
+-------------------+----+
|EventDate          |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)

留下“不正确”的初始版本以供学习
val events = eventsAsString
  .withColumn("date", split($"EventDate", " ")(0))
  .withColumn("date", translate($"date", "/", "-"))
  .withColumn("time", split($"EventDate", " ")(1))
  .withColumn("time", split($"time", "[.]")(0))    // <-- remove millis part
  .withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
  .select($"EventDate" cast "timestamp", $"Name")

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)
    events.show(false)

scala> events.show
+-------------------+----+
|          EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

Spark 2.2.0

从Spark 2.2开始,您可以使用to_timestamp函数进行字符串到时间戳的转换。
eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)

scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate              |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27                                 |
+-----------------------+----------------------------------------------------+

关于apache-spark - 如何以自定义格式加载带有时间戳的CSV?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43259485/

10-16 23:45