RPC(远程过程调用)

  解决的问题:之前的模式都是基于一对一的发,另外端收到,不能继续发。

  使用多个队列对多个消费者之间分配耗时的任务

  

 客服端

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3
 4 import pika
 5 import uuid
 6
 7
 8 class FibonacciRpcClient(object):
 9
10     def __init__(self):
11         self.credentials = pika.PlainCredentials('admin', 'admin123456')
12         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=self.credentials))
13         self.channel = self.connection.channel()
14
15         result = self.channel.queue_declare(queue='', exclusive=True)
16         self.callback_queue = result.method.queue
17
18         self.channel.basic_consume(
19             queue=self.callback_queue,
20             on_message_callback=self.on_response,
21             auto_ack=True)
22
23     def on_response(self, ch, method, props, body):
24         if self.corr_id == props.correlation_id:
25             self.response = body
26
27     def call(self, n):
28         self.response = None
29         self.corr_id = str(uuid.uuid4())   # 唯一标识 发的消息标个记号 然后服务端处理之后才能一一对应
30         self.channel.basic_publish(
31             exchange='',
32             routing_key='rpc_queue',  # 服务端生成的rpc_queue
33             properties=pika.BasicProperties(
34                 reply_to=self.callback_queue,  # 客服端申明回调queue
35                 correlation_id=self.corr_id,   # 唯一值  服务端收到请求,在响应的时候,不清楚属于哪个请求的
36             ),
37             body=str(n))
38         count = 0
39         while self.response is None:
40             # 一直循环检测 并且在这里不会阻塞
41             self.connection.process_data_events()
42             count += 1
43             print(".......select.....", count)
44         return int(self.response)
45
46
47 fibonacci_rpc = FibonacciRpcClient()
48
49 print(" [x] Requesting fib(5)")
50 response = fibonacci_rpc.call(5)
51 print(" [.] Got %r" % response)

服务端

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3
 4 import pika
 5
 6 credentials = pika.PlainCredentials('admin', 'admin123456')
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials))
 8 channel = connection.channel()
 9
10 channel.queue_declare(queue='rpc_queue')
11
12
13 def fib(n):
14     if n == 0:
15         return 0
16     elif n == 1:
17         return 1
18     else:
19         return fib(n - 1) + fib(n - 2)
20
21
22 def on_request(ch, method, props, body):
23     n = int(body)
24
25     print(" [.] fib(%s)" % n)
26     response = fib(n)
27
28     ch.basic_publish(exchange='',
29                      routing_key=props.reply_to,
30                      properties=pika.BasicProperties(correlation_id = \
31                                                          props.correlation_id),
32                      body=str(response))
33     ch.basic_ack(delivery_tag=method.delivery_tag)
34
35
36 channel.basic_qos(prefetch_count=1)  # 根据任务处理能力,如果客服端处理不过来,那么就不会取任务继续处理
37 channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
38
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

运行结果:

 1 D:\python\python.exe F:/abc/messagequeue/rpc_client.py
 2  [x] Requesting fib(5)
 3 .......select..... 1
 4 .......select..... 2
 5 .......select..... 3
 6 .......select..... 4
 7 .......select..... 5
 8 .......select..... 6
 9 .......select..... 7
10 .......select..... 8
11 .......select..... 9
12 .......select..... 10
13 .......select..... 11
14 .......select..... 12
15 .......select..... 13
16 .......select..... 14
17 .......select..... 15
18 .......select..... 16
19 .......select..... 17
20 .......select..... 18
21 .......select..... 19
22 .......select..... 20
23 .......select..... 21
24 .......select..... 22
25 .......select..... 23
26 .......select..... 24
27 .......select..... 25
28 .......select..... 26
29 .......select..... 27
30 .......select..... 28
31  [.] Got 5
01-16 12:00