我正在写一个案例来测试 flink 两步提交,下面是概述.

I am writing a case to test flink two step commit, below is overview.

sink kafka 恰好是一次 kafka 生产者.sink step 是mysql sink 扩展两步提交.sink compare是mysql sink扩展两步提交,这个sink偶尔会抛出一个异常来模拟checkpoint failed.

sink kafka is exactly once kafka producer. sink step is mysql sink extend two step commit. sink compare is mysql sink extend two step commit, and this sink will occasionally throw a exeption to simulate checkpoint failed.

当检查点失败并恢复时,我发现 mysql 两步提交可以正常工作,但是 kafka 消费者将从上次成功读取偏移量,而 kafka 生产者即使在此检查点失败之前完成了它也会产生消息.

When checkpoint is failed and restore, I find mysql two step commit will work fine, but kafka consumer will read offset from last success and kafka producer produce messages even he was done it before this checkpoint failed.


How to avoid duplicate message in this case?



  • flink 1.9.1

Java 1.8

卡夫卡 2.11

kafka 生产者代码:

        dataStreamReduce.addSink(new FlinkKafkaProducer<>(
                new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
                    public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
                        UUID uuid = UUID.randomUUID();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("uuid", uuid.toString());
                        jsonObject.put("key1", element.f0);
                        jsonObject.put("key2", element.f1);
                        jsonObject.put("key3", element.f2);
                        jsonObject.put("indicate", element.f3);
                        return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
        )).name("sink kafka");


        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

mysql 接收器:

                new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
                        Connection, Void>
                        (new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {

                    int count = 0;
                    Connection connection;

                    protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
                        if (count > 10) {
                            throw new Exception("compare test exception.");
                        PreparedStatement ps = transaction.prepareStatement(
                                " insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
                                        " values(?, ?, ?, ?, ?) " +
                                        " ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
                        ps.setString(1, context.timestamp().toString());
                        ps.setString(2, value.f0);
                        ps.setString(3, value.f1);
                        ps.setString(4, value.f1);
                        ps.setLong(5, value.f3);
                        count += 1;

                    protected Connection beginTransaction() throws Exception {
                        LOGGER.error("compare in begin transaction");
                        try {
                            if (connection.isClosed()) {
                                throw new Exception("mysql connection closed");
                        }catch (Exception e) {
                            LOGGER.error("mysql connection is error: " + e.toString());
                            LOGGER.error("reconnect mysql connection");
                            String jdbcURI = "jdbc:mysql://";
                            Connection connection = DriverManager.getConnection(jdbcURI);
                            this.connection = connection;
                        return this.connection;

                    protected void preCommit(Connection transaction) throws Exception {
                        LOGGER.error("compare in pre Commit");

                    protected void commit(Connection transaction) {
                        LOGGER.error("compare in commit");
                        try {
                        } catch (Exception e) {
                            LOGGER.error("compare Commit error: " + e.toString());

                    protected void abort(Connection transaction) {
                        LOGGER.error("compare in abort");
                        try {
                        } catch (Exception e) {
                            LOGGER.error("compare abort error." + e.toString());

                    protected void recoverAndCommit(Connection transaction) {
                        LOGGER.error("compare in recover And Commit");

                    protected void recoverAndAbort(Connection transaction) {
                        LOGGER.error("compare in recover And Abort");
                .setParallelism(1).name("sink compare");



I'm not quite sure I understand the question correctly:

当检查点失败并恢复时,我发现 mysql 两步提交可以正常工作,但 kafka 生产者将从上次成功读取偏移量并生成消息,即使他在此检查点失败之前完成了它.

Kafka 生产者没有读取任何数据.因此,我假设您的整个管道重新读取旧偏移量并产生重复项.如果是这样,您需要了解 Flink 如何确保恰好一次.

Kafka producer is not reading any data. So, I'm assuming your whole pipeline rereads old offsets and produces duplicates. If so, you need to understand how Flink ensures exactly once.

  1. 创建定期检查点以在出现故障时保持一致的状态.
  2. 这些检查点包含检查点时最后成功读取的记录的偏移量.
  3. 恢复后,Flink 将从存储在最后一个成功检查点的偏移量中重新读取所有记录.因此,将重放上次检查点和失败之间生成的相同记录.
  4. 重放的记录将恢复故障前的状态.
  5. 它将产生源自重放输入记录的重复输出.
  6. 接收器有责任确保没有重复项有效地写入目标系统.


For the last point, there are two options:

  • 仅在写入检查点时输出数据,这样目标中就不会出现有效的重复项.这种简单的方法非常通用(独立于接收器),但会在延迟中增加检查点间隔.
  • 让接收器对输出进行重复数据删除.

后一个选项用于 Kafka sink.它使用 Kafka 事务来删除重复数据.为了避免消费者方面的重复,您需要确保它没有阅读 文档中提到的未提交数据.还要确保您的事务超时时间足够长,以免在失败和恢复之间丢弃数据.

The latter option is used for the Kafka sink. It uses Kafka transactions for letting it deduplicate data. To avoid duplicates on consumer side, you need to ensure it's not reading uncommitted data as mentioned in the documentation. Also make sure your transaction timeout is large enough that it doesn't discard data between failure and recovery.

08-24 04:39