参考:https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/multisocket/zmqpoller.html
实现zmqpolling.py.py

点击(此处)折叠或打开

  1. import zmq
  2. import time
  3. import sys
  4. import random
  5. from multiprocessing import Process

  6. def server_push(port="5556"):
  7.     context = zmq.Context()
  8.     socket = context.socket(zmq.PUSH)
  9.     socket.bind("tcp://*:%s" % port)
  10.     print("Running server on port: ", port)
  11.     # serves only 5 request and dies
  12.     for reqnum in range(10):
  13.         if reqnum < 6:
  14.             socket.send("Continue".encode("ascii") )
  15.         else:
  16.             socket.send("Exit".encode("ascii") )
  17.             break
  18.         time.sleep (1)

  19. def server_pub(port="5558"):
  20.     context = zmq.Context()
  21.     socket = context.socket(zmq.PUB)
  22.     socket.bind("tcp://*:%s" % port)
  23.     publisher_id = random.randrange(0,9999)
  24.     print ("Running server on port: ", port)
  25.     # serves only 5 request and dies
  26.     for reqnum in range(10):
  27.         # Wait for next request from client
  28.         topic = random.randrange(8,10)
  29.         messagedata = "server#%s" % publisher_id
  30.         print("%s %s" % (topic, messagedata))
  31.         socket.send( ("%d %s" % (topic, messagedata)).encode("ascii") )
  32.         time.sleep(1)

  33. def client(port_push, port_sub):
  34.     context = zmq.Context()
  35.     socket_pull = context.socket(zmq.PULL)
  36.     socket_pull.connect ("tcp://localhost:%s" % port_push)
  37.     print ("Connected to server with port %s" % port_push)
  38.     socket_sub = context.socket(zmq.SUB)
  39.     socket_sub.connect ("tcp://localhost:%s" % port_sub)
  40.     socket_sub.setsockopt(zmq.SUBSCRIBE, "9".encode("ascii") )
  41.     print ("Connected to publisher with port %s" % port_sub)
  42.     # Initialize poll set
  43.     poller = zmq.Poller()
  44.     poller.register(socket_pull, zmq.POLLIN)
  45.     poller.register(socket_sub, zmq.POLLIN)
  46.     # Work on requests from both server and publisher
  47.     should_continue = True
  48.     while should_continue:
  49.         socks = dict(poller.poll())
  50.         if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
  51.             message = socket_pull.recv()
  52.             print ("Recieved control command: %s" % message)
  53.         if message == "Exit":
  54.             print ("Recieved exit command, client will stop recieving messages")
  55.             should_continue = False

  56.         if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
  57.             string = socket_sub.recv()
  58.             topic, messagedata = string.split()
  59.             print("Processing ... ", topic, messagedata)



  60. if __name__ == "__main__":
  61.     # Now we can run a few servers
  62.     server_push_port = "5556"
  63.     server_pub_port = "5558"
  64.     Process(target=server_push, args=(server_push_port,)).start()
  65.     Process(target=server_pub, args=(server_pub_port,)).start()
  66.     Process(target=client, args=(server_push_port,server_pub_port,)).start()


运行:
#python3 zmqpolling.py.py
10-04 21:26