我使用DATAFROW SDK 2×Java API(Apache Simulink SDK)将数据写入MySQL。我已经创建了基于Apache Beam SDK documentation的管道来使用数据流将数据写入mysql。它一次插入一行,因为我需要实现批量插入。我在官方文档中找不到任何启用批量插入模式的选项。
想知道,是否可以在数据流管道中设置批量插入模式?如果是,请告诉我需要在下面的代码中更改什么。

 .apply(JdbcIO.<KV<Integer, String>>write()
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
        public void setParameters(KV<Integer, String> element, PreparedStatement query) {
          query.setInt(1, kv.getKey());
          query.setString(2, kv.getValue());
        }
      })

最佳答案

编辑2018-01-27:
原来这个问题和directrunner有关。如果使用dataflowrunner运行同一管道,则应获得实际多达1000条记录的批处理。DirectRunner总是在分组操作后创建大小为1的捆绑包。
原始答案:
在使用apache beam的jdbcio编写云数据库时,我遇到了同样的问题。问题是,虽然jdbcio确实支持在一个批处理中写入多达1000条记录,但实际上我从未见过它一次写入超过一行(我不得不承认:这总是在开发环境中使用directrunner)。
因此,我在jdbcio中添加了一个特性,您可以通过将数据分组并将每个组编写为一个批来控制批的大小。下面是一个基于apache beam的原始wordcount示例如何使用此功能的示例。

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

与jdbcio的常规写方法不同的是,新的方法以awriteIterable()作为输入,而不是以aPCollection<Iterable<RowT>>作为输入。每个iterable作为一个批写入数据库。
添加此项的JDBCIO版本可以在以下位置找到:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
包含上述示例的整个示例项目可以在这里找到:https://github.com/olavloite/spanner-beam-example
(apache beam上也有一个pull请求挂起,将其包含在项目中)

07-24 09:15