正如doc所说,Flink 1.4.0中引入了TwoPhaseCommitSinkFunction以启用端到端的精确一次语义。关于此抽象类TwoPhaseCommitSinkFunction及其子类FlinkKafkaProducer011(源代码为herehere),我有两个问题。

TwoPhaseCommitSinkFunction具有abort方法来中止事务。那么,调用abort方法的情况是什么?具体来说,将在提交过程开始后调用它,这意味着放弃当前事务和一次精确的语义?我之所以这样问是因为,正如评论所言,这是所有打算实现一次语义的SinkFunction的推荐基类。

Flink documentation说:



检查点的开始表示我们两阶段提交协议的“预提交”阶段。

这使我相信preCommit的实现应处理所有运算符的检查点,但FlinkKafkaProducer011则实现preCommit像这样:

@Override
    protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
        switch (semantic) {
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                flush(transaction);
                break;
            case NONE:
                break;
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
        checkErroneous();
    }

flush像这样:
private void flush(KafkaTransactionState transaction) throws FlinkKafka011Exception {
        if (transaction.producer != null) {
            transaction.producer.flush();
        }
        long pendingRecordsCount = pendingRecords.get();
        if (pendingRecordsCount != 0) {
            throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
        }

        // if the flushed requests has errors, we should propagate it also and fail the checkpoint
        checkErroneous();
    }

似乎在preCommit中没有检查点被处理。那么,我在某处弄错了吗?

最佳答案

例如,在关闭接收器功能时,在当前正在进行的事务中调用abort,而该事务尚未预先提交。示例场景是集群中另一个节点上出现故障,作业正在故障转移,因此我们中止了当前事务(以便从先前成功的检查点恢复)
Kafka中的flush在事务性KafkaProducer上使用时,基本上具有预提交的语义。这就是preCommit(...)方法同时将其用于EXACTLY_ONCEAT_LEAST_ONCE的原因。

10-06 13:06