我正在使用Flink v.1.4.0。
我已经实现了一个模块,作为我正在开发的软件包的一部分,该模块的作用是对流进行重复数据删除。该模块非常简单:
public class RemoveDuplicateFilter<T> extends RichFlatMapFunction<T, T> {
static final ValueStateDescriptor<Boolean> SEEN_DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class);
private ValueState<Boolean> seen;
@Override
public void open(Configuration configuration) {
RuntimeContext runtimeContext = this.getRuntimeContext();
seen = runtimeContext.getState(SEEN_DESCRIPTOR);
}
@Override
public void flatMap(T value, Collector<T> out) throws Exception {
Boolean hasBeenSeen = seen.value();
if(hasBeenSeen == null || !hasBeenSeen) {
out.collect(value);
seen.update(true);
}
}
问题是:如何在无需实例化实际
Flink
ValueState的情况下测试此代码?即使用Mockito?我已经尝试了许多方法,但是从本质上讲,当涉及到调用时:
RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
...
when(runtimeContext.getState(SEEN_DESCRIPTOR)).thenReturn(seen);
呼叫总是失败。我尝试用
SEEN_DESCRIPTOR
替换Matchers.any()
,但还是没有运气。有什么建议么?
最佳答案
您可以使用flinkspector进行功能的单元测试。