我有来自不同来源的三个不同的流(对象:Trade,MarketData, WeightAdj,它们唯一的共同点是“产品”。这是我的流。

贸易流:贸易编号,产品,执行

MarketData流:产品,marketData

计算流:产品,因子

我想使用Flink实现的目标我想加入所有三个流并产生Tuple3<Trade,MarketData,WeightAdj >的最新值。这意味着每次这些流中的任何一个发出事件时,我都应获取最新的Tuple3<Trade,MarketData,WeightAdj>

我尝试使用后跟keyBy的'connect'函数加入这些流,但是如果发出MarketData或WeightAdj事件,它不会产生Enriched对象。

public static void main(String[] args) throws Exception {
// some code
  tradeStream.connect(marketStream)
    .keyBy(
            new KeySelector<Trade, String>() {
                @Override
                public String getKey(Trade trd) throws Exception {
                    return trd.product;
                }
            }, new KeySelector<MarketData, String>() {
                @Override
                public String getKey(MarketData marketData)
                        throws Exception {
                    return marketData.product;
                }
            }

    )
    .flatMap(new JoinRichCoFlatMapFunction())
    .connect(weightStream)
    .keyBy(new KeySelector<Tuple2<Trade, MarketData>, String>() {
        @Override
        public String getKey(Tuple2<Trade, MarketData> trd) throws Exception {
            return trd.f0.product;
        }
    }, new KeySelector<WeightAdj, String>() {
        @Override
        public String getKey(WeightAdj wght) throws Exception {
            return wght.product;
        }
    })
    .flatMap(new TupleWeightJionRichCoFlatMapFunction())
    .print();
}

public static final class JoinRichCoFlatMapFunction extends RichCoFlatMapFunction<Trade, MarketData, Tuple2<Trade, MarketData>>{

    private ValueState<Trade> trades;
    private ValueState<MarketData> marketData;

    @Override
    public void open(Configuration config) {
        trades = getRuntimeContext().getState(new ValueStateDescriptor<>("Trades", Trade.class));
        marketData = getRuntimeContext().getState(new ValueStateDescriptor<>("MarketData", MarketData.class));
    }

    @Override
    public void flatMap1(Trade trd,Collector<Tuple2<Trade, MarketData>> out) throws Exception {

        MarketData mktData = marketData.value();
        if (mktData != null) {
            marketData.clear();
            out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
        } else {
            trades.update(trd);;
        }
    }

    @Override
    public void flatMap2(MarketData mktData,Collector<Tuple2<Trade, MarketData>> out) throws Exception {

        Trade trd = trades.value();
        if (trd != null) {
            trades.clear();
            out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
        } else {
            marketData.update(mktData);;
        }
    }
}

public static final class TupleWeightJionRichCoFlatMapFunction extends RichCoFlatMapFunction<Tuple2<Trade, MarketData>, WeightAdj, Tuple3<Trade, MarketData, WeightAdj>>{

    private ValueState<Tuple2<Trade, MarketData>> tradeMarketState;
    private ValueState<WeightAdj> weightState;

    @Override
    public void open(Configuration config) {

        TypeInformation<Tuple2<Trade, MarketData>> info = TypeInformation.of(new TypeHint<Tuple2<Trade, MarketData>>(){});
        tradeMarketState = getRuntimeContext().getState(new ValueStateDescriptor<>("Trades", info));
        weightState = getRuntimeContext().getState(new ValueStateDescriptor<>("Weights", WeightAdj.class));
    }

    @Override
    public void flatMap1(Tuple2<Trade, MarketData> trdWithMaktData, Collector<Tuple3<Trade, MarketData, WeightAdj>> out)
            throws Exception {

        WeightAdj weigt = weightState.value();
        if (weigt != null) {
            weightState.clear();
            out.collect(new Tuple3<Trade, MarketData, WeightAdj>(trdWithMaktData.f0, trdWithMaktData.f1, weigt));
        } else {
            tradeMarketState.update(trdWithMaktData);;
        }
    }

    @Override
    public void flatMap2(WeightAdj weightData,Collector<Tuple3<Trade, MarketData, WeightAdj>> out) throws Exception {

        Tuple2<Trade, MarketData> trdWithMktData = tradeMarketState.value();
        if (trdWithMktData != null) {
            tradeMarketState.clear();
            out.collect(new Tuple3<Trade, MarketData, WeightAdj>(trdWithMktData.f0, trdWithMktData.f1, weightData));
        } else {
            weightState.update(weightData);;
        }
    }
}


知道我在做什么错吗?

最佳答案

如果我正确地理解了您的目标,则需要以不同的方式处理以下几点:


不要在任何状态下调用clear(),因为您需要继续记住从三个流中的每一个看到的最后一个值。
始终致电out.collect()。如果正在调用flatmap1flatmap2,则意味着已对某些内容进行了更新,因此需要报告一些新内容。


(看起来您在模仿Flink培训中RidesAndFares exercise中使用的逻辑。在此练习中,要求是不同的:在这种情况下,需要将一对乘车和票价事件组合在一起,找到给定rideId的乘车/票价对后,将对该rideId进行加入。)

现在有几点警告:


如果您从不调用clear()并且产品空间不受限制,那么您将无限期保持数量不断增加的状态。如果这是一个问题,则可以使用state TTL安排清除陈旧状态。
请记住,如果将Tuple序列化程序与RocksDB一起使用,则不能处理null。我很想按照以下方式重写您的每个flatmap方法:


public void flatMap1(Trade trd, Collector<Tuple2<Trade, MarketData>> out) throws Exception {

    trades.update(trd);;
    MarketData mktData = marketData.value();
    out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
}


但是当应用程序启动时,这可能会产生一个Tuple2,其中mktData为null。因此,最好避免这种情况。

正如Arvid所提到的,Table / SQL API使这些连接变得容易。

09-05 14:40