问题描述
我在 java spark 应用程序中对过滤器和类型化数据集的映射使用 lambda 函数时遇到问题.
I have an issue in using lambda functions on filters and maps of typed datasets in java spark applications.
我收到此运行时错误
ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
我正在使用下面的类并触发 2.2.0.https://gitlab.com/opencell/test-bigdata
I am using the below class and spark 2.2.0.Full example with sample data is available in https://gitlab.com/opencell/test-bigdata
Dataset<CDR> cdr = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.csv("CDR_SAMPLE.csv")
.as(Encoders.bean(CDR.class));
long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();
System.out.println("validated entries :" + v);
CDR 文件定义为 gitlab 链接
编辑
val cdrCSVSchema = StructType(Array(
StructField("timestamp", DataTypes.TimestampType),
StructField("quantity", DataTypes.DoubleType),
StructField("access", DataTypes.StringType),
StructField("param1", DataTypes.StringType),
StructField("param2", DataTypes.StringType),
StructField("param3", DataTypes.StringType),
StructField("param4", DataTypes.StringType),
StructField("param5", DataTypes.StringType),
StructField("param6", DataTypes.StringType),
StructField("param7", DataTypes.StringType),
StructField("param8", DataTypes.StringType),
StructField("param9", DataTypes.StringType),
StructField("dateParam1", DataTypes.TimestampType),
StructField("dateParam2", DataTypes.TimestampType),
StructField("dateParam3", DataTypes.TimestampType),
StructField("dateParam4", DataTypes.TimestampType),
StructField("dateParam5", DataTypes.TimestampType),
StructField("decimalParam1", DataTypes.DoubleType),
StructField("decimalParam2", DataTypes.DoubleType),
StructField("decimalParam3", DataTypes.DoubleType),
StructField("decimalParam4", DataTypes.DoubleType),
StructField("decimalParam5", DataTypes.DoubleType),
StructField("extraParam", DataTypes.StringType)))
我用这个命令加载了 CSV 文档
and I used this command to load the CSV document
val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")
然后尝试此命令来编码和运行 lambda 函数,但我仍然收到错误
and then tried this command to encode and run lambda function, but I am still getting error
cdr.as[CDR].filter(c => c.timestamp != null).show
推荐答案
TL;DR 明确定义架构,因为输入数据集没有可从中推断类型的值(对于 java.sql.Date
字段).
TL;DR Define the schema explicitly since the input dataset does not have values to infer types from (for java.sql.Date
fields).
对于您的情况,使用无类型数据集 API 可能是一种解决方案(也许是一种解决方法,老实说,我建议使用它以避免从内部行格式进行不必要的反序列化):
For your case, using untyped Dataset API could be a solution (perhaps a workaround and honestly I'd recommend it to avoid unnecessary deserialization from internal row format):
cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count
(这是 Scala,我将把它翻译成 Java 作为家庭练习).
(It's Scala and I'm leaving translating it to Java as a home exercise).
问题是您使用 inferSchema
选项,而输入 CDR_SAMPLE.csv
文件中的大多数字段不可用,这使得大多数字段类型为 String(这是默认类型)没有可用于推断更具体类型的值).
The issue is that you use inferSchema
option with most fields unavailable in the input CDR_SAMPLE.csv
file that makes most fields of type String (which is the default type when no values are available to infer more specific type).
这使得 java.sql.Date
类型的字段,即 dateParam1
到 dateParam5
,类型为 String.
That makes the fields of type java.sql.Date
, i.e. dateParam1
up to dateParam5
, of type String.
import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
option("inferSchema", "true").
option("delimiter", ";").
option("header", true).
csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
|-- timestamp: timestamp (nullable = true)
|-- quantity: integer (nullable = true)
|-- access: string (nullable = true)
|-- param1: string (nullable = true)
|-- param2: string (nullable = true)
|-- param3: string (nullable = true)
|-- param4: string (nullable = true)
|-- param5: string (nullable = true)
|-- param6: string (nullable = true)
|-- param7: string (nullable = true)
|-- param8: string (nullable = true)
|-- param9: string (nullable = true)
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
|-- decimalParam1: string (nullable = true)
|-- decimalParam2: string (nullable = true)
|-- decimalParam3: string (nullable = true)
|-- decimalParam4: string (nullable = true)
|-- decimalParam5: string (nullable = true)
|-- extraParam: string (nullable = true)
请注意,感兴趣的字段,即 dateParam1
到 dateParam5
,都是字符串.
Note that the fields of interest, i.e. dateParam1
to dateParam5
, are all strings.
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
当您通过使用 CDR
类中定义的编码器假装"字段类型不同时,问题就会浮出水面:
The issue surfaces when you "pretend" the type of the fields is different by using the encoder as defined in CDR
class which says:
private Date dateParam1;
private Date dateParam2;
private Date dateParam3;
private Date dateParam4;
private Date dateParam5;
这就是问题的根本原因.Spark 可以从类中推断出的内容之间存在差异.如果没有转换,代码就可以工作,但既然你坚持......
That's the root cause of the issue. There is a difference between what Spark could infer from the class. Without the conversion the code would've worked, but since you insisted...
cdrs.as[CDR]. // <-- HERE is the issue = types don't match
filter(cdr => cdr.timestamp != null).
show // <-- trigger conversion
在 filter
运算符中访问哪个字段并不重要.问题在于发生的转换会导致错误执行(以及整个阶段的 Java 代码生成).
It does not really matter what field you access in filter
operator. The issue is that the conversion takes place that leads to incorrect execution (and whole-stage Java code generation).
我怀疑 Spark 可以做很多事情,因为您请求 inferSchema
的数据集没有用于类型推断的值.最好的办法是明确定义架构并使用 schema(...)
运算符来设置它.
I doubt Spark can do much about it since you requested inferSchema
with a dataset with no values to use for the type inference. The best bet is to define the schema explicitly and use schema(...)
operator to set it.
这篇关于Spark CSV - 找不到适用于实际参数的构造函数/方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!