我已经为我的Apache Flink流实现了MapFunction。它正在解析传入的元素并将其转换为其他格式,但有时会出现错误(即传入的数据无效)。

我看到了两种可能的处理方式:

  • 忽略无效元素,但似乎我不能忽略错误,因为对于任何传入元素,我都必须提供传出元素。
  • 将传入元素拆分为有效和无效,但似乎我应该为此使用其他函数。

  • 因此,我有两个问题:
  • 如何正确处理我的MapFunction中的错误?
  • 如何正确实现此类转换功能?
  • 最佳答案

    您可以使用FlatMapFunction而不是MapFunction。这将使您仅在元素有效时才发出它。下面显示了一个示例实现:

    input.flatMap(new FlatMapFunction<String, Long>() {
        @Override
        public void flatMap(String input, Collector<Long> collector) throws Exception {
            try {
                Long value = Long.parseLong(input);
                collector.collect(value);
            } catch (NumberFormatException e) {
                // ignore invalid data
            }
        }
    });
    

    10-07 19:00
    查看更多