本文介绍了在 Java Beam 管道中的日期/时间戳上使用 LogicalType 'timestamp-millis' 编写 avro 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有几个管道从流式 JSON 记录写入 avro 文件,但我在将它们导入 BigQuery 时遇到问题,因为日期字段的 logicalType 未在 avro 架构中定义.

I have several pipelines writing avro files from streaming JSON records, but I'm having issues with importing them to BigQuery, because the logicalType for the date field is not defined in the avro schema.

考虑以下简单的 PoJo:

Consider the following simple PoJo:

@DefaultCoder(AvroCoder.class)
public class SampleClass {
    @AvroEncode(using=DateAsLongEncoding.class)
    private Date updateTime;

    public SampleClass() {
    }

    // Getters and setters
}

使用这个,字段被正确地保存到 avro 中.但是,LogicalType 未在架构中设置,当您希望它是 TIMESTAMPDATE 而不是 long 时,会导致导入 BigQuery 时出现问题.

Using this, the field is correctly saved to avro as a long. However, the LogicalType is not set in the schema, causing issues when importing to BigQuery when you want it to be a TIMESTAMP or DATE instead of a long.

我希望能够注释字段,就像使用 @AvroEncode 一样.设置 @LogicalType('timestamp-millis') 会很好.

I'd like to be able to annotate fields, just as with @AvroEncode. It would be nice to set @LogicalType('timestamp-millis').

有没有人做过类似的事情,或者有任何其他简单的方法来为字段指定 LogicalType?

Has anyone ever accomplished something similar, or have any other easy method of specifying LogicalType for the fields?

推荐答案

使用 gson typeAdapters 反序列化传入的 json 解决了这个问题:

This was solved using gson typeAdapters for deserializing incoming json like so:

GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(Sample.class, new JsonDeserializer() {
   @Override
   public Sample deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
        try {
           JsonObject jObj = json.getAsJsonObject();
           return new Sample(
               jObj.get("timestamp").getAsString()
           );
        } 
        catch (Exception e) {
           log.error("Sample parser failed" + e.toString());
           return null;
        }
    }
});
builder.create();

Sample 类,使用 java.time.Instant 从 ISO 日期字符串创建毫秒:

The Sample class, using java.time.Instant to create millis from ISO date string:

@DefaultCoder(SnappyCoder.class)
public class Sample implements Serializable {
    @AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
    private long updateTime;

    public Sample(String timestamp) {
        this.updateTime = Instant.parse(timestamp).toEpochMilli();
    }
}

这篇关于在 Java Beam 管道中的日期/时间戳上使用 LogicalType 'timestamp-millis' 编写 avro 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 02:00