我已将数据从CSV文件导入Flink Java。由于解析错误,我必须将其导入为字符串的属性之一(属性Result)。现在,我想将String转换为Double。但是我不知道如何使用TableSource,Table或DataSet类的对象来执行此操作。请参阅下面的代码。

我研究了flink文档,并尝试了一些Map和FlatMap类的解决方案。但是我没有找到解决方案。

        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);

//Get H data from CSV file.
TableSource csvSource = CsvTableSource.builder()
                .path("Path")
                .fieldDelimiter(";")
                .field("ID", Types.INT())
                .field("result", Types.STRING())
                .field("unixDateTime", Types.LONG())
                .build();

   // register the TableSource
        tableEnv.registerTableSource("HTable", csvSource);

        Table HTable = tableEnv.scan("HTable");

        DataSet<Row> result = tableEnv.toDataSet(HTable, Row.class);

最佳答案

我认为应该使用replacecast的组合将字符串转换为双精度形式,如“ SELECT ID,CAST(REPLACE(result,',','。')AS DOUBLE)AS结果一样, ...”或使用表API的等效项。

09-07 10:10