一、rabbitmq实现rpc调用的原理
·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识。服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中。原理图如下:
二、代码实现
下面我们将模拟实现一个rpc客户端和rpc服务端。客户端给服务端发送message,服务端收到后处理message,再将处理后的消息返给客户端
rpc客户端
/**
* rpc客户端
*/
public class RpcClient {
//发送消息的队列名称
private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
//创建回调队列
String callbackQueue = channel.queueDeclare().getQueue();
//创建回调队列,消费者从回调队列中接收服务端传送的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(callbackQueue,true,consumer); //创建消息带有correlationId的消息属性
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build();
String message = "hello rabbitmq";
channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes());
System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId); //接收回调消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String receivedCorrelationId = delivery.getProperties().getCorrelationId();
if(correlationId.equals(receivedCorrelationId)){
System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId);
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
} }
}
rpc服务端
/**
* rpc服务器
*/
public class RpcServer {
private static final String RPC_QUEUE_NAME = "rpc_queue"; private static String format(String message){
return "......" + message + "......";
} public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = null;
try {
connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
QueueingConsumer consumer = new QueueingConsumer(channel);
//声明消费者预取的消息数量
channel.basicQos(1);
channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//采用手动回复消息
System.out.println("RpcServer waitting for receive message"); while (true){
//接收并处理消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("RpcServer receive message " + message);
String response = format(message);
//确认收到消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); //取出消息的correlationId
AMQP.BasicProperties properties = delivery.getProperties();
String correlationId = properties.getCorrelationId(); //创建具有与接收消息相同的correlationId的消息属性
AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build();
channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes());
} } catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
先运行服务端,再运行客户端,结果如下:
RpcClient
RpcServer