


Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's "apply" function.


I was testing two streams, main "Features" on flatMap1 constantly ingesting data and control stream "Model" on flatMap2 changing the model on request.


I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.


Am I missing something obvious here?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){

    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");

    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");

    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";



Here is the answer from the mailing list...

如果是,则需要确定确定分配哪个记录的方法 转到哪个并行实例.以某种方式CoFlatMapFunction 在模型和结果之间进行并行(分区)连接 会话窗口,因此您需要某种形式的密钥来选择 对元素进行分区.这有道理吗?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?


If not, try to set it to parallelism 1 explicitly.


通过以下命令可以使所有人都可以访问只读状态的全局状态 broadcast().

A global state that all can access read-only is doable via broadcast().

可供所有人读取和更新的全局状态是 当前不可用.一致的操作将是相当 成本很高,需要某种形式的分布式通信/共识.

A global state that is available to all for read and update is currently not available. Consistent operations on that would be quite costly, require some form of distributed communication/consensus.


Instead, I would encourage you to go with the following:

1)如果可以对状态进行分区,请使用keyBy().mapWithState()- 本地化状态操作并使其非常快.

1) If you can partition the state, use a keyBy().mapWithState() - That localizes state operations and makes it very fast.

2)如果您的状态不是按键组织的,则您的状态可能非常 很小,您也许可以使用非并行操作.

2) If your state is not organized by key, your state is probably very small, and you may be able to use a non-parallel operation.

3)如果某项操作更新了状态,而另一项访问了该状态, 您通常可以通过迭代和CoFlatMapFunction来实现 (一侧是原始输入,另一侧是反馈输入).

3) If some operation updates the state and another one accesses it, you can often implement that with iterations and a CoFlatMapFunction (one side is the original input, the other the feedback input).

最终所有方法都将状态访问和修改本地化, 如果可能的话,这是一个很好的遵循模式.

All approaches in the end localize state access and modifications, which is a good pattern to follow, if possible.



07-17 00:08