如何使用Scala实现ConsumerRebalanceListener
?
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
}
订阅主题时,新制作的Scala重新平衡监听器的示例将是什么?
试图学习并全神贯注于在Scala中实现Java方法/接口。
谢谢。
最佳答案
您可以直接扩展接口
class MyListener extends ConsumerRebalanceListener {
...
}
API文档中的示例如下所示:
class SaveOffsetsOnRebalance(consumer: Consumer[_, _] ) extends ConsumerRebalanceListener {
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
// save the offsets in an external store using some custom code not described
partitions.toScala.forEach(
saveOffsetInExternalStore(consumer.position(partition))
)
}
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
// read the offsets from an external store using some custom code not described here
partitions.forEach(
consumer.seek(partition, readOffsetFromExternalStore(partition)))
}
}
只需添加适当的导入
关于java - 如何在Scala中实现Java接口(interface)(用于Kafka)?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50072246/