我向JavaPairDStream发送了3次相同的对象。我正在更新状态,但已保存3次。打印JavaPairDStream可以确认这一点。
Function3<InputMessageKey, Optional<InputMessage>, State<InputMessage>, Tuple2<InputMessageKey, InputMessage>> mappingFunction = new Function3<InputMessageKey, Optional<InputMessage>, State<InputMessage>, Tuple2<InputMessageKey, InputMessage>>() {
@Override
public Tuple2<InputMessageKey, InputMessage> call(InputMessageKey key, Optional<InputMessage> value, State<InputMessage> state) {
InputMessage inputMessage = value.get();
Tuple2<InputMessageKey, InputMessage> output = new Tuple2<>(key, inputMessage);
state.update(inputMessage);
return output;
}
};
打印流:
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
最佳答案
它没有保存三次。您将返回在函数末尾创建的Tuple2
对象,这就是正在打印的内容。如果要查看已保存的内部状态,请在图形中使用JavaMapWithStateDStream.stateSnapshots
而不是迭代mapWithState
的输出。