我有一个ROUTER,其目的是累积来自多个DEALER客户端的图像数据并在完整图像上执行OCR。我发现处理OCR的最有效方法是利用Python的多处理库。累积的图像字节被put放入Queue,以便在单独的Process中进行适当的处​​理。但是,我需要确保当客户端遇到超时时,Process将被正确终止并且不会无谓地浪费资源。

在当前解决方案中,我将每个新连接的客户端插入到dict中,其中value是我的ClientHandler类,该类拥有所有图像数据并生成一个Thread,该boolean将名为“ timeout”的变量设置为True 5秒钟后。如果在该5秒帧内收到新消息,则调用bump并将计时器重置为0,否则我将在线程终止之前进行清理,并在主循环中从dict删除引用:

import threading
import time
import zmq

class ClientHandler(threading.Thread):
    def __init__(self, socket):
        self.elapsed = time.time()
        self.timeout = False

        self.socket = socket

        super(ClientHandler, self).__init__()

    def run(self):
        while time.time() - self.elapsed < 5.0:
            pass

        self.timeout = True

        # CLIENT TIMED OUT
        # HANDLE TERMINATION AND CLEAN UP HERE

    def bump(self):
        self.elapsed = time.time()

    def handle(self, id, header, data):
        # HANDLE CLIENT DATA HERE
        # ACCUMULATE IMAGE BYTES, ETC

        self.socket.send_multipart([id, str(0)])

def server_task():
    clients = dict()

    context = zmq.Context.instance()
    server = context.socket(zmq.ROUTER)

    server.setsockopt(zmq.RCVTIMEO, 0)

    server.bind("tcp://127.0.0.1:7777")

    while True:
        try:
            id, header, data = server.recv_multipart()

            client = clients.get(id)

            if client == None:
                client = clients[id] = ClientHandler(server)

                client.start()

            client.bump()
            client.handle(id, header, data)
        except zmq.Again:
            for id in clients.keys():
                if clients[id].timeout:
                    del clients[id]

    context.term()

if __name__ == "__main__":
    server_task()


但是,整个方法感觉都不对。我会不适当地处理吗?如果是这样,如果有人能指出我正确的方向,我将不胜感激。

最佳答案

我自己想通了,希望对别人有帮助。

相反,我在分配的端口上具有ROUTER,该ROUTER将唯一的端口分配给每个客户端,此客户端随后连接到所述唯一的端口上新绑定的套接字。当客户端断开连接时,将回收端口以进行重新分配。

import sys
import zmq
from multiprocessing import Process, Queue, Value

def server_task():
    context = zmq.Context.instance()

    server = context.socket(zmq.ROUTER)

    server.bind("tcp://127.0.0.1:7777")

    timeout_queue = Queue()
    port_list = [ 1 ]

    proc_list = [ ]

    while True:
        try:
            id = server.recv_multipart()[0]

            # Get an unused port from the list
            # Ports from clients that have timed out are recycled here

            while not timeout_queue.empty():
                port_list.append(timeout_queue.get())

            port = port_list.pop()

            if len(port_list) == 0:
                port_list.append(port + 1)

            # Spawn a new worker task, binding the port to a socket

            proc_running = Value("b", True)

            proc_list.append(proc_running)

            Process(target=worker_task, args=(proc_running, port, timeout_queue)).start()

            # Send the new port to the client

            server.send_multipart([id, str(7777 + port)])

        except KeyboardInterrupt:
            break

    # Safely allow our worker processes to terminate
    for proc_running in proc_list:
        proc_running.value = False

    context.term()

def worker_task(proc_running, port, timeout_queue):
    context = zmq.Context.instance()

    worker = context.socket(zmq.ROUTER)

    worker.setsockopt(zmq.RCVTIMEO, 5000)
    worker.bind("tcp://127.0.0.1:%d" % (7777 + port, ))

    while proc_running.value:
        try:
            id, data = worker.recv_multipart()

            worker.send_multipart([id, data])
        except zmq.Again:
            timeout_queue.put(port)

            context.term()

            break

    print("Client on port %d disconnected" % (7777 + port, ))

10-06 10:34
查看更多