本文介绍了Flink自定义触发器提供了意外的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个Trigger,它会在20秒内首次触发,此后每5秒触发一次.我用过GlobalWindows和自定义的Trigger

I want to create a Trigger which gets fired in 20 seconds for the first time and in every five seconds after that. I have used GlobalWindows and a custom Trigger

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())

这是TradeTrigger中的代码:

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        } else {
            if((System.currentTimeMillis()-ctime) >= 5000){
                ctime = System.currentTimeMillis();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

    }

    @Override
    public TriggerResult onEventTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }


    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }

}

因此,基本上,当flagfalse时,即第一次,Trigger应该在20秒内被触发并将flag设置为true.从下一次开始,它应该每5秒触发一次.

So basically, when flag is false, i.e. the first time, the Trigger should get fired in 20 seconds and set the flag to true. From the next time, it should get fired every 5 seconds.

我面临的问题是,每次触发Trigger时,我在输出中仅收到一条消息.也就是说,我在20秒后收到一条消息,每五秒钟收到一条消息.我希望每次触发时在输出中显示20条消息.

The problem I am facing is, I am getting only one message in the output every time the Trigger is fired. That is, I get a single message after 20 seconds and single messages in every five seconds.I am expecting twenty messages in the output on each triggering.

如果我使用.timeWindow(Time.seconds(5))并创建一个五秒钟的时间窗口,则每5秒钟我会在输出中得到20条消息.请帮助我正确编写此代码.有什么我想念的吗?

If I use .timeWindow(Time.seconds(5)) and create a time window of five seconds, I get 20 messages in output every 5 seconds.Please help me get this code right. Is there something I am missing?

推荐答案

借助Fabian和Flink邮件列表中的答案使它起作用.通过TriggerContext将状态存储在ValueState变量中.检查onEvent()方法中的变量,如果是第一次,则将processingTimeTimer注册比当前时间多20秒并更新状态.在onProcessingTime方法中,将另一个ProcessingTimeTimer注册比当前时间多5秒,更新状态并触发Window.

Got it working with the help of the answer from Fabian and Flink mailing lists.Stored the state in a ValueState variable through the TriggerContext. Checked the variable in onEvent() method and if it was the first time, registered a processingTimeTimer for 20 seconds more than the current time and updated the state. In the onProcessingTime method, registered another ProcessingTimeTimer for 5 seconds more than current time, updated the state and fired the Window.

这篇关于Flink自定义触发器提供了意外的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 00:06