是否可以在@OnStopped注释上发送流文件?

基本上,我想编写自定义处理器,该处理器可以在处理器停止位置的flowFile中发送属性。

有什么建议么?

我在下面尝试:

ProcessSession session;

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile flowFile = session.get();
    if (flowFile == null) {
        flowFile = session.create();
    }
    flowFile = session.putAttribute(flowFile, "ATTRIBUTE_SIGNAL", "start");
    session.transfer(flowFile, success);
}

@OnStopped
public void sendStop() {
    FlowFile flowFile = session.get();
    flowFile = session.create();
    flowFile = session.putAttribute(flowFile, "ATTRIBUTE_SIGNAL", "stop");
    session.transfer(flowFile, success);
}


但是它失败了

2018-04-30 20:44:25,540 ERROR [StandardProcessScheduler Thread-3] org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method 'public void com.kotak.nifi.processors.streaming.SignalGenerator.sendStop()' with arguments '[]'.
java.lang.reflect.InvocationTargetException: null.

最佳答案

像OnScheduled / OnStopped / etc之类的生命周期方法并不是真正要生成流文件,这就是为什么只有onTrigger才可以访问ProcessSession的原因。

通常认为处理器之间是松散耦合的,其中一个处理器并不真正了解/关心其他处理器,它只是将流文件从队列中取出并进行处理。

从技术上讲,您可以通过将对onTrigger中获得的ProcessSession的引用存储在处理器的成员变量中来实现所需的功能,以便稍后在OnStopped中使用它。

08-04 08:29