正如doc所说,Flink 1.4.0中引入了TwoPhaseCommitSinkFunction
以启用端到端的精确一次语义。关于此抽象类TwoPhaseCommitSinkFunction
及其子类FlinkKafkaProducer011
(源代码为here和here),我有两个问题。TwoPhaseCommitSinkFunction
具有abort
方法来中止事务。那么,调用abort
方法的情况是什么?具体来说,将在提交过程开始后调用它,这意味着放弃当前事务和一次精确的语义?我之所以这样问是因为,正如评论所言,这是所有打算实现一次语义的SinkFunction的推荐基类。
Flink documentation说:
检查点的开始表示我们两阶段提交协议的“预提交”阶段。
这使我相信preCommit
的实现应处理所有运算符的检查点,但FlinkKafkaProducer011
则实现preCommit
像这样:
@Override
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
checkErroneous();
}
flush
像这样:private void flush(KafkaTransactionState transaction) throws FlinkKafka011Exception {
if (transaction.producer != null) {
transaction.producer.flush();
}
long pendingRecordsCount = pendingRecords.get();
if (pendingRecordsCount != 0) {
throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
}
// if the flushed requests has errors, we should propagate it also and fail the checkpoint
checkErroneous();
}
似乎在
preCommit
中没有检查点被处理。那么,我在某处弄错了吗? 最佳答案
例如,在关闭接收器功能时,在当前正在进行的事务中调用abort
,而该事务尚未预先提交。示例场景是集群中另一个节点上出现故障,作业正在故障转移,因此我们中止了当前事务(以便从先前成功的检查点恢复)
Kafka中的flush
在事务性KafkaProducer
上使用时,基本上具有预提交的语义。这就是preCommit(...)
方法同时将其用于EXACTLY_ONCE
和AT_LEAST_ONCE
的原因。