本文介绍了Python多线程ZeroMQ REQ-REP的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找使用Python和ZeroMQ使用多线程实现REQ-REP模式的方法.

I am looking to implement a REQ-REP pattern with Python and ZeroMQ using multithreading.

使用Python,当新客户端连接到服务器时,我可以创建一个新线程.该线程将处理与该特定客户端的所有通信,直到关闭套接字:

With Python, I can create a new thread when a new client connects to the server. This thread will handle all communications with that particular client, until the socket is closed:

# Thread that will handle client's requests
class ClientThread(threading.Thread):
    # Implementation...
    def __init__(self, socket):
        threading.Thread.__init__(self)
        self.socket = socket
    def run(self):
        while keep_alive:
            # Thread can receive from client
            data = self.socket.recv(1024)
            # Processing...
            # And send back a reply
            self.socket.send(reply)

while True:
    # The server accepts an incoming connection
    conn, addr = sock.accept()
    # And creates a new thread to handle the client's requests
    newthread = ClientThread(conn)
    # Starting the thread
    newthread.start()

是否可以使用ZeroMQ进行相同[*]?我已经看到了使用ZeroMQ和Python进行多线程处理的一些示例,但是在所有这些示例中,创建的线程池在开始时都具有固定数量的线程,并且似乎更着重于负载平衡.

Is it possible to do the same[*] using ZeroMQ? I have seen some examples of multithreading with ZeroMQ and Python, but in all of them a pool of threads is created with a fixed number of threads at the beginning and it seems to be more oriented to load balancing.

[*]注意我想要的是保持客户端与其线程之间的连接处于活动状态,因为该线程期望来自客户端的多个REQ消息,并且它将存储必须在消息之间保留的信息(即:一个变量)计数器,以在新的REQ消息上增加其值;因此,每个线程都有其自己的变量,并且任何其他客户端都不能访问该线程).新客户=新线程.

[*] Notice what I want is to keep the connection between a client and its thread alive, as the thread is expecting multiple REQ messages from the client and it will store information that must be kept between messages (i.e.: a variable counter that increments its value on a new REQ message; so each thread has its own variable and no other client should ever be able to access that thread). New client = new thread.

推荐答案

是的,ZeroMQ是功能强大的Can-Do工具箱

但是,最大的惊喜是,样本中使用的ZeroMQ< socket >-的结构远比普通的结构更复杂.

Yes, ZeroMQ is a powerfull Can-Do toolbox

However, the major surprise will be, that ZeroMQ <socket>-s are by far more structured than their plain counterparts, you use in the sample.

ZeroMQ在"单个" ZMQ- Context 的框架下构建了一个出色的,抽象丰富的框架,这是(并且将继续存在)唯一的用作共享"的东西.

ZeroMQ builds a remarkable, abstraction-rich framework, under a hood of a "singleton" ZMQ-Context, which is (and shall remain) the only thing used as "shared".

线程不应共享"任何其他派生"对象,无论它们的状态如何,因为出于简洁设计和高性能以及高性能的考虑,已经实现了强大的分布式责任框架体系结构.低延迟.

Threads shall not "share" any other "derived" objects, the less any their state, as there is a strong distributed-responsibility framework architecture implemented, both in the sake of clean-design and a high performance & low-latency.

对于所有ZMQ- Socket -来说,他们应该想象一个更加智能的分层子结构,其中的子结构将减轻对I/O活动的担忧(在ZMQ- Context 责任-因此,保持活动,计时问题和公平队列缓冲/选择轮询问题停止对您来说是可见的...),具有一种正式的沟通方式行为(由选定的ZMQ- Socket型原型赋予).

For all ZMQ-Socket-s one shall rather imagine a much smarter, layered sub-structure, where one receives off-loaded worries about I/O-activities ( managed inside ZMQ-Context responsibility -- thus keep-alive issues, timing issues and fair-queue buffering / select-polling issues simply cease to be visible for you ... ), with one sort of a formal communication pattern behaviour ( given by a chosen ZMQ-Socket-type archetype ).

ZeroMQ 和类似的 nanomsg 库都是类似LEGO的项目,它们可以使您成为建筑师和工程师.设计师比起初通常意识到的要多.

ZeroMQ and similarly nanomsg libraries are rather LEGO-alike projects, that empower you, as an architect & designer, more, than one typically realises at the very beginning.

因此,一个人可以专注于分布式系统的行为,而不是浪费时间和精力来解决另一个噩梦".

One thus can focus on distributed-system behaviour, as opposed to lose time and energy on solving just-another-socket-messaging-[nightmare].

(绝对是值得一看,它们来自ZeroMQ的联合父亲Pieter Hintjens.在这个很棒的主题上,您会发现很多Aha!时刻.)

( Definitely worth to have a look into both books from Pieter Hintjens, co-father of the ZeroMQ. There you find plenty Aha!-moments on this great subject. )

...以及作为蛋糕上的樱桃-无论是在 inproc:// 上传递消息还是在上传递消息,您都可以在不依赖传输的通用环境中获得所有这些ipc:// ,还可以在 tcp:// 层上并行收听/讲话.

... and as a cherry on a cake -- you get all of this as a Transport-agnostic, universal environment, whether passing some messages on inproc://, other over ipc:// and also in parallel listening / speaking over tcp:// layers.

EDIT#1 2014-08-19 17:00 [UTC+0000]

请仔细检查下面的评论,并进一步审查您的-基本及高级设计选项,以进行< 容易发生故障的>分拆处理,以实现< 负载均衡的>-REP-worker队列,用于可扩展< >的处理和带有-REP-worker二进制开始的阴影处理.

Kindly check comments below and further review your -- both elementary and advanced -- design-options for a <trivial-failure-prone>-spin-off processing, for a <load-balanced>-REP-worker queueing, for a <scale-able>-distributed processing and a <_faul-resilient_mode_>-REP-worker binary-start shaded processing.

这在设计分布式消息传递系统上成倍地有效.

This is exponentially valid in designing distributed messaging systems.

很抱歉这么说.

受伤,但是是真的.

"""REQ/REP modified with QUEUE/ROUTER/DEALER add-on ---------------------------

   Multithreaded Hello World server

   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>

"""
import time
import threading
import zmq

print "ZeroMQ version sanity-check: ", zmq.__version__

def aWorker_asRoutine( aWorker_URL, aContext = None ):
    """Worker routine"""
    #Context to get inherited or create a new one trick------------------------------
    aContext = aContext or zmq.Context.instance()

    # Socket to talk to dispatcher --------------------------------------------------
    socket = aContext.socket( zmq.REP )

    socket.connect( aWorker_URL )

    while True:

        string  = socket.recv()

        print( "Received request: [ %s ]" % ( string ) )

        # do some 'work' -----------------------------------------------------------
        time.sleep(1)

        #send reply back to client, who asked --------------------------------------
        socket.send( b"World" )

def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets ------------------------------------------------
    aLocalhostCentralContext = zmq.Context.instance()

    # Socket to talk to clients ------------------------------------------------------
    clients = aLocalhostCentralContext.socket( zmq.ROUTER )
    clients.bind( url_client )

    # Socket to talk to workers ------------------------------------------------------
    workers = aLocalhostCentralContext.socket( zmq.DEALER )
    workers.bind( url_worker )

    # --------------------------------------------------------------------||||||||||||--
    # Launch pool of worker threads --------------< or spin-off by one in OnDemandMODE >
    for i in range(5):
        thread = threading.Thread( target = aWorker_asRoutine, args = ( url_worker, ) )
        thread.start()

    zmq.device( zmq.QUEUE, clients, workers )

    # ----------------------|||||||||||||||------------------------< a fair practice >--
    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    aLocalhostCentralContext.term()

if __name__ == "__main__":
    main()

这篇关于Python多线程ZeroMQ REQ-REP的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-21 02:52