我正在春季启动中使用kafka,并且尝试添加一项功能,该功能将使我们能够启动服务并将其重播到特定时间。
消费者是这样设置的
public interface ProductScenarioStream {
String SERVICE_REQUESTS_PRODUCT_PRICE = "serviceRequestsProductPrice";
String SERVICE_CONCLUDES_PRODUCT_SCENARIO = "serviceConcludesProductScenario";
@Output(SERVICE_REQUESTS_PRODUCT_PRICE)
MessageChannel serviceRequestsProductPrice();
@Input(SERVICE_CONCLUDES_PRODUCT_SCENARIO)
SubscribableChannel serviceConcludesProductScenario();
}
和
@Service
@EnableBinding(ProductScenarioStream.class)
@Profile("stream")
public class ProductStreamServiceImpl implements ProductStreamService
{
@Resource
private ProductScenarioStream productScenarioStream;
@Override
public void send(final ServiceRequestsProductPrice event) {
...
}
}
您是否知道在哪里可以找到设置以允许我在这种情况下快退流上的偏移量?
最佳答案
我假设您的意思是replay
而不是reply
-我已经编辑了您的问题。
Spring Cloud Stream当前没有公开寻找偏移量的机制。
您可以使用spring-kafka的@KafkaListener
代替;实现ConsumerSeekAware
,它为您提供了在启动过程中(或任何时间)进行查找的机制。