我向JavaPairDStream发送了3次相同的对象。我正在更新状态,但已保存3次。打印JavaPairDStream可以确认这一点。

Function3<InputMessageKey, Optional<InputMessage>, State<InputMessage>, Tuple2<InputMessageKey, InputMessage>> mappingFunction = new Function3<InputMessageKey, Optional<InputMessage>, State<InputMessage>, Tuple2<InputMessageKey, InputMessage>>() {
        @Override
        public Tuple2<InputMessageKey, InputMessage> call(InputMessageKey key, Optional<InputMessage> value, State<InputMessage> state) {
            InputMessage inputMessage = value.get();
            Tuple2<InputMessageKey, InputMessage> output = new Tuple2<>(key, inputMessage);
            state.update(inputMessage);
            return output;
        }
    };


打印流:

(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)

最佳答案

它没有保存三次。您将返回在函数末尾创建的Tuple2对象,这就是正在打印的内容。如果要查看已保存的内部状态,请在图形中使用JavaMapWithStateDStream.stateSnapshots而不是迭代mapWithState的输出。

08-18 09:46