如何使用Scala实现ConsumerRebalanceListener

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
}


订阅主题时,新制作的Scala重新平衡监听器的示例将是什么?

试图学习并全神贯注于在Scala中实现Java方法/接口。

谢谢。

最佳答案

您可以直接扩展接口

class MyListener extends ConsumerRebalanceListener {
     ...
 }


API文档中的示例如下所示:

  class SaveOffsetsOnRebalance(consumer: Consumer[_, _] ) extends ConsumerRebalanceListener {

   def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       // save the offsets in an external store using some custom code not described
   partitions.toScala.forEach(
         saveOffsetInExternalStore(consumer.position(partition))
   )
   }

   def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
       // read the offsets from an external store using some custom code not described here
       partitions.forEach(
          consumer.seek(partition, readOffsetFromExternalStore(partition)))
   }
 }


只需添加适当的导入

关于java - 如何在Scala中实现Java接口(interface)(用于Kafka)?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50072246/

10-11 22:39
查看更多