问题描述
我正在使用Flink,并且正在使用Kafka连接器.我从flink收到的消息是用逗号分隔的项目列表."'a','b','c',1,0.1 ....'12:01:00.000'''其中一个包含事件时间,我想将此事件时间用于每个分区的加水印(在kafka源代码中),然后将该事件时间用于会话窗口.我的情况与通常情况有所不同,因为据我了解,人们通常使用"kafka Timestamps"和SimpleStringSchema().在我的情况下,我必须编写自己的反序列化器以实现DeserializationSchema并返回一个Tuple或Pojo.因此,基本上用我自己的函数替换SimpleStringSchema().Flink开箱即用地提供了一些反序列化器,但是我真的不理解如何创建自定义反序列化逻辑.
检查flink网站,我发现了这一点:
我已经举了一个例子(谢谢大卫!),但是我仍然不知道如何实现我的例子.
我真的需要一个示例来说明如何将其用于列表.上面指出的一个是针对JSON的,因此给了我理论,概念,但我被困在那里.
您应该像这样介绍POJO
公共类事件实现了可序列化的{...私人长时间戳;}
并实现与链接中类似的简单反序列化器-您可以解析该行,或者用逗号手动分割消息字符串,也可以使用现成的csv阅读器,例如 opencsv ,以将该行解析为您的POJO:
公共类EventDeserializationSchema实现DeserializationSchema< Event>.{私有静态最终长serialVersionUID = 1L;@Override公共ClickEvent解串(字节[]消息)引发IOException {字符串行=新字符串(message,StandardCharsets.UTF_8);String []部分= line.split(,");事件event = new Event();//TODO:此处的活动零件返回事件;}@Overridepublic boolean isEndOfStream(Event nextElement){返回false;}@Override公共TypeInformation< Event>getProducedType(){返回TypeInformation.of(Event.class);}}
I'm working with Flink and I'm using the Kafka Connector. The messages that I'm receiving from flink is a list of comma separated items. "'a','b','c',1,0.1 ....'12:01:00.000'"One of them contain the event time, I would like to use this event time for the per-partition watermarking (in the kafka source), then use this Event Time for session windowing.My case is a bit different from usual because from what i have understood people usually use "kafka Timestamps" and SimpleStringSchema(). On my case instead I have to write my own deserializer that implement DeserializationSchema and return a Tuple or Pojo. So basically substitute the SimpleStringSchema() with my own function. Flink offer out of the box some deserializers but I really don't understnd how i can create a custom deserialization logic.
Checking the flink website i have found this:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
I have been given an example (Thanks David!), but still i don't get how to implement mine.
I would really need an example of how I can do it for a list. The one indicated above is for JSON so is giving me the theory, the concept but i got stuck there.
You should introduce the POJO like
public class Event implements Serializable {
...
private Long timestamp;
}
and implement the simple deserializer similar to the one from the link - you can parse the line either manually splitting by comma the message string, or you an out-of-box csv readers, like opencsv, to parse the line into your POJO:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
@Override
public ClickEvent deserialize(byte[] message) throws IOException {
String line = new String(message, StandardCharsets.UTF_8);
String[] parts = line.split(",");
Event event = new Event();
// TODO: parts to event here
return event;
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
这篇关于Apache Flink-如何实现实现DeserializationSchema的自定义Deserializer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!