我想按键减少数据帧。减少逻辑非常复杂,需要在大约10-15个字段上进行更新。这就是为什么我想将DataFrame转换为DataSet并减少Java POJO的原因。

问题

问题是在groupByKey-reduceByKey之后,我得到了一些非常奇怪的值。 Encoders.bean(Entity.class)读取正确的数据。请参见“代码示例”部分。

工作环境

Encoders.bean替换Encoders.kryo不起作用,例外:

Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.

我也看到了this workarround,但是Encoders.product需要TypeTag。我不知道如何在Java代码中创建TypeTag

代码示例

    Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
        .as(Encoders.bean(Entity.class));

    // shows correct numbers
    ds.show(10, false);

    // correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages                       |
+-----------+-----------+-----+-------------------------------+
|A1         |S1         |1    |[[2018-10-29 23:11:44, 12.5]]  |
|A2         |S1         |1    |[[2018-10-30 14:43:05, 13.2]]  |
|A3         |S1         |2    |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+


    //after reduce shows wrong numbers
    ds
        .groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
        .reduceGroups((e1, e2) -> e1)
        .map(tuple -> tuple._2, Encoders.bean(Entity.class))
        .show(10, false);

    // wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages                                                 |
+-----------+-----+-----------+---------------------------------------------------------+
|A1         |2    |S1         |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1         |1    |S1         |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]]  |
+-----------+-----+-----------+---------------------------------------------------------+


实体.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
    private String broker_name;
    private String server_name;
    private Integer order;
    private Storage[] storages;
}


存储.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
    private Timestamp timestamp;
    private Double storage;
}


testData.json:

[
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "storage": 12.5
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-30 14:43:05.000",
        "storage": 13.2
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 2,
    "storages": [
      {
        "timestamp": "2019-11-02 10:00:03.000",
        "storage": 1001.0
      }
    ]
  }
]


testSchema.json:

{
  "type": "struct",
  "fields": [
    {
      "name": "broker_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "server_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "order",
      "type": "integer",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "storages",
      "type": {
        "type": "array",
        "elementType": {
          "type": "struct",
          "fields": [
            {
              "name": "timestamp",
              "type": "timestamp",
              "nullable": true,
              "metadata": {}
            },
            {
              "name": "storage",
              "type": "double",
              "nullable": true,
              "metadata": {}
            }
          ]
        },
        "containsNull": true
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}

最佳答案

这是因为反序列化对Encoder推断的模式使用结构匹配,并且由于bean类没有自然结构,所以模式的字段按名称排序。

因此,如果您定义像Entity这样的bean类,则从bean Encoder推断出的模式将是

Encoders.bean(Storage.class).schema().printTreeString();


root
 |-- storage: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)






root
 |-- timestamp: timestamp (nullable = true)
 |-- storage: double (nullable = true)




这是应该在Dataset中使用的架构。换句话说,架构定义为:

StructType schema = Encoders.bean(Entity.class).schema();


要么

StructType schema = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " +
  "storages array<struct<storage: double, timestamp: timestamp>>"
);


将是有效的,并可用于直接加载testData

Dataset<Entity> ds = spark.read()
  .option("multiline", "true")
  .schema(schema)
  .json("testData.json")
  .as(Encoders.bean(Entity.class));


而您当前的架构,则等同于:



StructType valid = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " +
  "storages array<struct<timestamp: timestamp, storage: double>>"
);




事实并非如此,尽管事实上它可以与JSON阅读器一起使用,JSON阅读器(与Encoders相比)按名称匹配数据。

可以将这种行为报告为一个错误-直观上讲,在任何情况下Encoder都不会转储与其自身加载逻辑不兼容的数据。

相关JIRA门票-SPARK-27050

07-24 15:00