问题描述
我正在阅读 AsyncCassandraOperations 以执行异步插入以提高基于另一篇文章的性能 此处.但是我无法在 google 或 spring 数据文档上找到很多帮助.
I am reading up on AsyncCassandraOperations to perform async inserts to improve performance based on another post here. But I am unable to find a lot of help on google or spring data documentation.
以前我使用 Cassandra Repository 进行所有数据提取和插入/更新,我发现它非常慢.根据建议,我现在单独使用 AsyncCassandraOperations 进行插入操作,但它不会让我使用.我遇到需要一个类型为org.springframework.data.cassandra.core.AsyncCassandraOperations"的bean错误.
Previously I was using Cassandra Repository for all data extraction and insert/updates which I found to be super slow. As per recommendation I am now using AsyncCassandraOperations for the insert operation alone, but it wont let me. I encounter required a bean of type 'org.springframework.data.cassandra.core.AsyncCassandraOperations' error.
请问使用 AsyncCassandraOperations 的正确方法是什么?
What would be the correct way to use AsyncCassandraOperations please?
@Autowired private MyRepository repository_name;
@Autowired private AsyncCassandraOperations acops;
public void persist(List<POJO> l_POJO)
{
System.out.println("Enter Persist: "+new java.util.Date());
List<l_POJO> l_POJO_stale = repository_name.findBycol1AndStale("sample",false);
l_POJO_stale.forEach(s -> s.setStale(true));
l_POJO_stale.forEach(s -> acops.update(s));
try
{
acops.insert(l_POJO);
}
catch (Exception e)
{
System.out.println("Error in persisting new data");
}
}
推荐答案
不知道有没有用spring boot,如果用了AsyncCassandraOperations(AsyncCassandraTemplate是实现类)应该自动创建.如果错误显示您需要一个 AsyncCassandraOperations bean,直接的方法是创建一个,如下所示.
Don't know whether spring boot is used, if so the AsyncCassandraOperations(AsyncCassandraTemplate is the implementation class) should be created automatically.If the error shows you need an AsyncCassandraOperations bean, the straight way is to create one as shown below.
@Bean
AsyncCassandraTemplate asyncCassandraTemplate(Session session) {
return new AsyncCassandraTemplate(session);
}
由于您使用的是 Spring data Repository 接口,您也可以使用 ReactiveCrudRepository
接口来更新或插入实体对象到 Cassandra,如 这个 spring 数据示例项目 ,作为使用 AsyncCassandraTemplate的替代方法代码>类.
Since you are using Spring data Repository interface, you can alse use the ReactiveCrudRepository
interface to update or insert entity objects to Cassandra, which is shown in this spring data example project , as an alternative way to using the AsyncCassandraTemplate
class.
在使用 ReactiveCrudRepository
的情况下,关于您想要做什么,您的代码需要进行以下更改.
In the case of using ReactiveCrudRepository
and regarding what you want to do, your code needs the following changes.
- 将
WRRepository.findByCol1AndCol2AndCol3(String, boolean, String)
的返回类型从List
更改为Flux
,以充分利用反应式功能. - 将
persist(List)
的返回类型从 boolean 更改为Mono
,使结果也非阻塞. - 将您的
persist(List)
更改为以下内容.
- change the return type of
WRRepository.findByCol1AndCol2AndCol3(String, boolean, String)
fromList<WRpojo>
toFlux<WRpojo>
, in order to fully utilize the reactive functionality. - change the return type of
persist(List<WRpojo>)
from boolean toMono<Void>
, making the result non-blocking too. - change your
persist(List<WRpojo>)
to the following.
public Mono<Void> persist(List<WRpojo> l_wr) {
Flux<WRpojo> l_old_wr = objWRRepository.findByCol1AndCol2AndCol3("1", false, "2").doOnNext(s -> s.setStale(true));
return objWRRepository.saveAll(l_old_wr).thenMany(objWRRepository.saveAll(l_wr)).then();
}
在响应式编程中,基本上我们不阻塞任何代码,这意味着返回的 Mono
应该在下游某处订阅,如果您确实想阻塞并等待所有操作完成,你可以在 Mono
上调用 block()
,不推荐.
In reactive programming, basically we don't block any code, this means that somewhere the returned Mono<Void>
should be subscribed somewhere downstream, if you do want to block and wait for all operations complete, you can call block()
on Mono<Void>
, which is not recommended.
这篇关于AsyncCassandraOperations 示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!