我的工作流程:KAFKA->数据流流式传输-> BigQuery

鉴于对我而言低延迟并不重要,因此我使用FILE_LOADS来降低成本。我使用的是BigQueryIO.Write和DynamicDestination(每小时一个新表,当前小时为后缀)。

这个BigQueryIO.Write的配置如下:

.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withNumFileShards(100)


第一个表已成功创建并写入。但是然后永远不会创建以下表格,并且我得到了这些异常:

(99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_NEVER",
      "destinationTable" : {
        "datasetId" : "dev_mydataset",
        "projectId" : "myproject-id",
        "tableId" : "mytable_20180302_16"
      },


对于第一个表,所使用的CreateDisposition为指定的CREATE_IF_NEEDED,但是此参数未考虑在内,默认情况下使用CREATE_NEVER。

我还在JIRA上创建了issue

最佳答案

根据documentation of Apache Beam's BigQueryIO,方法BigQueryIO.Write.CreateDisposition要求在使用CREATE_IF_NEEDED时使用the precondition .withSchema()提供表模式。

Dataflow documentation中所述:


请注意,如果您将CREATE_IF_NEEDED指定为CreateDisposition和
您不提供TableSchema,则转换可能会在运行时失败
如果目标表没有java.lang.IllegalArgumentException
存在。


文档指出的错误与您收到的错误不同(得到java.lang.RuntimeException),但是根据您共享的BigQueryIO.Write()配置,您没有指定任何表架构,因此,如果缺少表,则工作容易失败。

因此,作为解决您的问题的第一步,您应该创建与要加载到BQ中的数据匹配的表架构TableSchema(),然后相应地使用前提条件.withSchema(schema)

List<TableFieldSchema> fields = new ArrayList<>();
// Add fields like:
fields.add(new TableFieldSchema().setName("<FIELD_NAME>").setType("<FIELD_TYPE>"));
TableSchema schema = new TableSchema().setFields(fields);

// BigQueryIO.Write configuration plus:
    .withSchema(schema)

08-04 23:23