问题描述
我已经创建了一个与具有 20 个核心的指定线程池的连接.
I have create a connection with a specified thread pool with 20 cores.
ConnectionFactory factory = new ConnectionFactory();
....
//specified es
ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory);
con = factory.newConnection(consumerExecutor, addresses);
然后从此连接创建一个通道:
then create a channel from this connection:
final Channel channel = connection.createChannel();
并使用它来创建一个 DefaultConsumer.
and use this to create a DefaultConsumer.
虽然我发现虽然线程可以用来消费消息,但始终只有一个线程在消费消息,即使消息在服务器中大量积累.
While I find that though the threads can be used to consume messages, always, only one thread is consuming messages even though messages are massive accumulated in servers.
我查看源代码并找到:
private final class WorkPoolRunnable implements Runnable {
@Override
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
}
/* Basic work selector and state transition step */
private K readyToInProgress() {
K key = this.ready.poll();
if (key != null) {
this.inProgress.add(key);
}
return key;
}
/**
* Return the next <i>ready</i> client,
* and transfer a collection of that client's items to process.
* Mark client <i>in progress</i>.
* If there is no <i>ready</i> client, return <code><b>null</b></code>.
* @param to collection object in which to transfer items
* @param size max number of items to transfer
* @return key of client to whom items belong, or <code><b>null</b></code> if there is none.
*/
public K nextWorkBlock(Collection<W> to, int size) {
synchronized (this) {
K nextKey = readyToInProgress();
if (nextKey != null) {
VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey);
drainTo(queue, to, size);
}
return nextKey;
}
}
诀窍应该在ConsumerWorkService.this.workPool.nextWorkBlock
,它从就绪队列中轮询通道,并在运行回调后添加到完成块中的读取队列run()代码>.如果我错了,请纠正我.
The trick should be in ConsumerWorkService.this.workPool.nextWorkBlock
, it poll the channel from the ready queue, and add to the read queue in the finish block after running the callback run()
. Please correct me if I am wrong.
这很令人困惑,因为消费者绑定到一个通道,并且在最后一个任务块完成之前,该通道不会释放到队列中,这意味着线程池始终只为该消费者提供一个线程.
This is confusing since a consumer is bound to one channel, and the channel is not released to the queue until last task block is finished, which means the thread pool is always offering only one thread for that consumer.
问题:
- 为什么 RabbitMQ 设计这个模型
- 我们如何优化这个问题
- 在
handleDelivery
中将任务提交到一个独立的线程池来消费消息和ack(确保只有在任务完成后才确认消息)
- Why RabbitMQ designs this model
- How do we optimize this issue
- Is it good to submit the task to a standalone thread pool in
handleDelivery
to consume messages as well as ack(to ensure message ack only after task finishes)
推荐答案
>一、为什么RabbitMQ要设计这个模型
> 1. Why RabbitMQ designs this model
我想知道自己的原因.但这一事实清楚地反映在他们的文档中:
I would like to know the reason myself. But this fact is clearly reflected in their documentation:
每个 Channel 都有自己的调度线程.对于最常见的用例每个渠道一个消费者,这意味着消费者不会阻止其他消费者消费者.如果每个频道有多个消费者,请注意长时间运行的消费者可能会阻止向其他人发送回调该频道上的消费者.
>2.我们如何优化这个问题
> 2. How do we optimize this issue
您可以拥有多个通道,也可以通过将实际工作提交到另一个线程池来将消息消耗与处理分离.您可以在这篇文章中找到更多详细信息.
You can either have multiple channels or decouple message consumption from processing by submitting the actual work to another thread pool. You can find more details in this article.
>3. 将任务提交到handleDelivery中的独立线程池来消费消息和ack好不好(保证任务完成后才确认消息)
> 3. Is it good to submit the task to a standalone thread pool in handleDelivery to consume messages as well as ack(to ensure message ack only after task finishes)
引用自docs:
使用手动确认时,重要的是要考虑什么线程进行确认.如果它不同于接收交付的线程(例如 Consumer#handleDelivery将交付处理委托给不同的线程),确认将多个参数设置为 true 是不安全的,将导致双重确认,因此是通道级协议关闭通道的异常.确认单个消息时间可以是安全的.
这篇关于执行器服务RabbitMQ中只有一个线程同时运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!