本文介绍了Cassandra 如何处理 datastax java 驱动程序中的阻塞执行语句的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

阻止从 com.datastax.driver.core.Session 执行方法

public ResultSet execute(Statement statement);

对此方法的评论:

此方法阻塞,直到至少收到一些结果数据库.但是,对于 SELECT 查询,它并不能保证结果已全部收到.但它确实保证了一些已收到来自数据库的响应,特别是保证如果请求无效,就会抛出异常用这种方法.

来自 com.datastax.driver.core.Session 的非阻塞执行方法

public ResultSetFuture executeAsync(Statement statement);

这个方法不会阻塞.它在查询完成后立即返回传递到底层网络堆栈.特别是从此方法不保证查询有效或什至已提交到活动节点.与失败有关的任何异常访问 {@link结果集未来}.

我有 02 个关于它们的问题,因此如果您能帮助我理解它们就太好了.

假设我有 100 万条记录,我希望所有记录都能到达数据库(没有任何丢失).

问题 1: 如果我有 n 个线程,所有线程都将具有相同数量的记录,它们需要发送到数据库.他们都使用阻塞执行调用继续向 cassandra 发送多个插入查询.如果我增加 n 的值,它是否也有助于加快我需要将所有记录插入到 cassandra 的时间?

这会导致 cassandra 的性能问题吗?Cassandra 是否必须确保对于每条插入记录,集群中的所有节点都应该立即知道新记录?为了保持数据的一致性.(我假设 cassandra 节点甚至不会考虑使用本地机器时间来控制记录插入时间).

问题 2: 使用非阻塞执行,我如何确保所有插入都成功?我知道的唯一方法是等待 ResultSetFuture 检查插入查询的执行情况.有什么更好的办法吗?非阻塞执行是否比阻塞执行更容易失败?

非常感谢您的帮助.

解决方案

在某种程度上.让我们把客户端的实现细节分开一点,从并发请求数"的角度来看事情,因为如果你使用 executeAsync,你不需要为每个正在进行的请求都有一个线程.在我的测试中,我发现虽然拥有大量并发请求有很多价值,但有一个阈值会导致收益递减或性能开始下降.我的一般经验法则是 (节点数 *native_transport_max_threads (default: 128)* 2),但您可能会发现更多或更少的最佳结果.

这里的想法是,将比 cassandra 一次处理的请求数量更多的请求加入队列并没有多大价值.在减少进行中的请求数量的同时,您可以限制驱动程序客户端和 cassandra 之间不必要的连接拥塞.

问题 2:使用非阻塞执行,如何确保所有插入都成功?我知道的唯一方法是等待 ResultSetFuture 检查插入查询的执行情况.有什么更好的办法吗?非阻塞执行是否比阻塞执行更容易失败?

通过 get 等待 ResultSetFuture 是一种方法,但如果您正在开发一个完全异步的应用程序,您希望尽可能避免阻塞.使用番石榴,你最好的两个武器是 Futures.addCallbackFutures.transform.

  • Futures.addCallback 允许您注册一个 FutureCallback 在驱动程序收到响应时执行.onSuccess 在成功案例中被执行,onFailure 否则.

  • Futures.transform 允许您有效地将返回的 ResultSetFuture 映射到其他内容.例如,如果您只想要 1 列的值,您可以使用它来将 ListenableFuture 转换为 ListenableFuture,而无需在ResultSetFuture 然后获取字符串值.

在编写数据加载程序的上下文中,您可以执行以下操作:

  1. 为了简单起见,请使用 Semaphore 或其他具有固定数量许可的结构(这将是您的最大飞行请求数).每当您使用 executeAsync 提交查询时,请获取许可.你真的应该只需要 1 个线程(但可能想要引入一个 # cpu cores 大小的池来执行此操作)从信号量获取许可并执行查询.它只会阻止获取,直到有可用的许可为止.
  2. Futures.addCallback 用于从 executeAsync 返回的未来.在 onSuccessonFailure 两种情况下,回调都应该调用 Sempahore.release().通过释放许可,这应该允许您在第 1 步中的线程继续并提交下一个请求.

为了进一步提高吞吐量,您可能需要考虑使用 BatchStatement 并批量提交请求.如果您保持较小的批次(50-250 是一个不错的数字)并且批次中的插入都共享相同的分区键,那么这是一个不错的选择.

Blocking execute fethod from com.datastax.driver.core.Session

public ResultSet execute(Statement statement);

Comment on this method:

Non-blocking execute fethod from com.datastax.driver.core.Session

public ResultSetFuture executeAsync(Statement statement);

I have 02 questions about them, thus it would be great if you can help me to understand them.

Let's say I have 1 million of records and I want all of them to be arrived in the database (without any lost).

Question 1: If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?

Will this cause performance problem for cassandra? Does Cassandra have to make sure that for every single insert record, all the nodes in the clusters should know about the new record immediately? In order to maintain the consistency in data. (I assume cassandra node won't even think about using the local machine time for controlling the record insertion time).

Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?

Thank you very much for your helps.

解决方案

To some extent. Lets divorce the client implementation details a bit and look at things from the perspective of "Number of concurrent requests", as you don't need to have a thread for each ongoing request if you use executeAsync. In my testing I have found that while there is a lot of value in having a high number of concurrent requests, there is a threshold for which there are diminishing returns or performance starts to degrade. My general rule of thumb is (number of Nodes *native_transport_max_threads (default: 128)* 2), but you may find more optimal results with more or less.

The idea here is that there is not much value in enqueuing more requests than cassandra will handle at a time. While reducing the number of inflight requests, you limit unnecessary congestion on the connections between your driver client and cassandra.

Waiting on the ResultSetFuture via get is one route, but if you are developing a fully async application, you want to avoid blocking as much as possible. Using guava, your two best weapons are Futures.addCallback and Futures.transform.

  • Futures.addCallback allows you to register a FutureCallback that gets executed when the driver has received the response. onSuccess gets executed in the success case, onFailure otherwise.

  • Futures.transform allows you to effectively map the returned ResultSetFuture into something else. For example if you only want the value of 1 column you could use it to transform ListenableFuture<ResultSet> to a ListenableFuture<String> without having to block in your code on the ResultSetFuture and then getting the String value.

In the context of writing a dataloader program, you could do something like the following:

  1. To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
  2. Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.

To further improve throughput, you might want to consider using BatchStatement and submitting requests in batches. This is a good option if you keep your batches small (50-250 is a good number) and if your inserts in a batch all share the same partition key.

这篇关于Cassandra 如何处理 datastax java 驱动程序中的阻塞执行语句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 21:38