我想按键减少数据帧。减少逻辑非常复杂,需要在大约10-15个字段上进行更新。这就是为什么我想将DataFrame转换为DataSet并减少Java POJO的原因。
问题
问题是在groupByKey-reduceByKey
之后,我得到了一些非常奇怪的值。 Encoders.bean(Entity.class)
读取正确的数据。请参见“代码示例”部分。
工作环境
用Encoders.bean
替换Encoders.kryo
不起作用,例外:Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.
我也看到了this workarround,但是Encoders.product
需要TypeTag
。我不知道如何在Java代码中创建TypeTag
。
代码示例
Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
.as(Encoders.bean(Entity.class));
// shows correct numbers
ds.show(10, false);
// correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages |
+-----------+-----------+-----+-------------------------------+
|A1 |S1 |1 |[[2018-10-29 23:11:44, 12.5]] |
|A2 |S1 |1 |[[2018-10-30 14:43:05, 13.2]] |
|A3 |S1 |2 |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+
//after reduce shows wrong numbers
ds
.groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
.reduceGroups((e1, e2) -> e1)
.map(tuple -> tuple._2, Encoders.bean(Entity.class))
.show(10, false);
// wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages |
+-----------+-----+-----------+---------------------------------------------------------+
|A1 |2 |S1 |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1 |1 |S1 |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]] |
+-----------+-----+-----------+---------------------------------------------------------+
实体.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
private String broker_name;
private String server_name;
private Integer order;
private Storage[] storages;
}
存储.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
private Timestamp timestamp;
private Double storage;
}
testData.json:
[
{
"broker_name": "A1",
"server_name": "S1",
"order": 1,
"storages": [
{
"timestamp": "2018-10-29 23:11:44.000",
"storage": 12.5
}
]
},
{
"broker_name": "A1",
"server_name": "S1",
"order": 1,
"storages": [
{
"timestamp": "2018-10-30 14:43:05.000",
"storage": 13.2
}
]
},
{
"broker_name": "A1",
"server_name": "S1",
"order": 2,
"storages": [
{
"timestamp": "2019-11-02 10:00:03.000",
"storage": 1001.0
}
]
}
]
testSchema.json:
{
"type": "struct",
"fields": [
{
"name": "broker_name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "server_name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "order",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "storages",
"type": {
"type": "array",
"elementType": {
"type": "struct",
"fields": [
{
"name": "timestamp",
"type": "timestamp",
"nullable": true,
"metadata": {}
},
{
"name": "storage",
"type": "double",
"nullable": true,
"metadata": {}
}
]
},
"containsNull": true
},
"nullable": true,
"metadata": {}
}
]
}
最佳答案
这是因为反序列化对Encoder
推断的模式使用结构匹配,并且由于bean类没有自然结构,所以模式的字段按名称排序。
因此,如果您定义像Entity
这样的bean类,则从bean Encoder
推断出的模式将是
Encoders.bean(Storage.class).schema().printTreeString();
root
|-- storage: double (nullable = true)
|-- timestamp: timestamp (nullable = true)
不
root
|-- timestamp: timestamp (nullable = true)
|-- storage: double (nullable = true)
这是应该在
Dataset
中使用的架构。换句话说,架构定义为:StructType schema = Encoders.bean(Entity.class).schema();
要么
StructType schema = StructType.fromDDL(
"broker_name string, order integer, server_name string, " +
"storages array<struct<storage: double, timestamp: timestamp>>"
);
将是有效的,并可用于直接加载
testData
:Dataset<Entity> ds = spark.read()
.option("multiline", "true")
.schema(schema)
.json("testData.json")
.as(Encoders.bean(Entity.class));
而您当前的架构,则等同于:
StructType valid = StructType.fromDDL(
"broker_name string, order integer, server_name string, " +
"storages array<struct<timestamp: timestamp, storage: double>>"
);
事实并非如此,尽管事实上它可以与JSON阅读器一起使用,JSON阅读器(与
Encoders
相比)按名称匹配数据。可以将这种行为报告为一个错误-直观上讲,在任何情况下
Encoder
都不会转储与其自身加载逻辑不兼容的数据。相关JIRA门票-SPARK-27050