AsyncCassandraOperations

AsyncCassandraOperations

本文介绍了AsyncCassandraOperations示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读AsyncCassandraOperations以执行异步插入以提高性能,这是基于另一篇帖子。但是我无法在Google或Spring数据文档中找到很多帮助。

I am reading up on AsyncCassandraOperations to perform async inserts to improve performance based on another post here. But I am unable to find a lot of help on google or spring data documentation.

以前,我使用Cassandra存储库进行所有数据提取和插入/更新,但我发现这非常慢。根据建议,我现在仅将AsyncCassandraOperations用于插入操作,但它不会让我使用。我遇到需要类型为'org.springframework.data.cassandra.core.AsyncCassandraOperations'的bean。错误。

Previously I was using Cassandra Repository for all data extraction and insert/updates which I found to be super slow. As per recommendation I am now using AsyncCassandraOperations for the insert operation alone, but it wont let me. I encounter required a bean of type 'org.springframework.data.cassandra.core.AsyncCassandraOperations' error.

正确的方法是什么要使用AsyncCassandraOperations吗?

What would be the correct way to use AsyncCassandraOperations please?

@Autowired private MyRepository repository_name;
@Autowired private AsyncCassandraOperations acops;
public void persist(List<POJO> l_POJO)
{
        System.out.println("Enter Persist: "+new java.util.Date());

        List<l_POJO> l_POJO_stale = repository_name.findBycol1AndStale("sample",false);

        l_POJO_stale.forEach(s -> s.setStale(true));

        l_POJO_stale.forEach(s -> acops.update(s));

        try
        {
            acops.insert(l_POJO);
        }
        catch (Exception e)
        {
            System.out.println("Error in persisting new data");
        }
}


推荐答案

Don不知道是否使用了spring boot,如果这样,应该自动创建AsyncCassandraOperations(AsyncCassandraTemplate是实现类)。
如果错误显示您需要AsyncCassandraOperations bean,则直接方法如下所示。

Don't know whether spring boot is used, if so the AsyncCassandraOperations(AsyncCassandraTemplate is the implementation class) should be created automatically.If the error shows you need an AsyncCassandraOperations bean, the straight way is to create one as shown below.

@Bean
AsyncCassandraTemplate asyncCassandraTemplate(Session session) {
    return new AsyncCassandraTemplate(session);
}

由于您使用的是Spring数据存储库界面,因此还可以使用 ReactiveCrudRepository 接口将实体对象更新或插入到Cassandra中,如,作为使用 AsyncCassandraTemplate 类的另一种方法。

Since you are using Spring data Repository interface, you can alse use the ReactiveCrudRepository interface to update or insert entity objects to Cassandra, which is shown in this spring data example project , as an alternative way to using the AsyncCassandraTemplate class.

如果使用 ReactiveCrudRepository 并考虑要执行的操作,则代码需要进行以下更改。

In the case of using ReactiveCrudRepository and regarding what you want to do, your code needs the following changes.


  1. 更改 WRRepository.findByCol1AndCol2AndCol3(String,boolean,String)的返回类型$ c> List< WRpojo> 到 Flux< WRpojo> ,以便充分利用反应性功能。

  2. persist(List< WRpojo>)的返回类型从布尔值更改为 Mono< Void> ,也使结果也不会阻塞。

  3. persist(List< WRpojo>)更改为以下内容。

  1. change the return type of WRRepository.findByCol1AndCol2AndCol3(String, boolean, String) from List<WRpojo> to Flux<WRpojo> , in order to fully utilize the reactive functionality.
  2. change the return type of persist(List<WRpojo>) from boolean to Mono<Void> , making the result non-blocking too.
  3. change your persist(List<WRpojo>) to the following.



  public Mono<Void> persist(List<WRpojo> l_wr) {
    Flux<WRpojo> l_old_wr = objWRRepository.findByCol1AndCol2AndCol3("1", false, "2").doOnNext(s -> s.setStale(true));
    return objWRRepository.saveAll(l_old_wr).thenMany(objWRRepository.saveAll(l_wr)).then();
  }

在反应式编程中,基本上我们不会阻塞任何代码,这意味着返回的 Mono< Void> 应该在下游的某个地方订阅,如果您确实要阻止并等待所有操作完成,则可以调用 block( ) Mono< Void> 上,不建议这样做。

In reactive programming, basically we don't block any code, this means that somewhere the returned Mono<Void> should be subscribed somewhere downstream, if you do want to block and wait for all operations complete, you can call block() on Mono<Void> , which is not recommended.

这篇关于AsyncCassandraOperations示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-24 21:11