我有一个ROUTER
,其目的是累积来自多个DEALER
客户端的图像数据并在完整图像上执行OCR。我发现处理OCR的最有效方法是利用Python的多处理库。累积的图像字节被put
放入Queue
,以便在单独的Process
中进行适当的处理。但是,我需要确保当客户端遇到超时时,Process
将被正确终止并且不会无谓地浪费资源。
在当前解决方案中,我将每个新连接的客户端插入到dict
中,其中value
是我的ClientHandler
类,该类拥有所有图像数据并生成一个Thread
,该boolean
将名为“ timeout”的变量设置为True
5秒钟后。如果在该5秒帧内收到新消息,则调用bump
并将计时器重置为0,否则我将在线程终止之前进行清理,并在主循环中从dict
删除引用:
import threading
import time
import zmq
class ClientHandler(threading.Thread):
def __init__(self, socket):
self.elapsed = time.time()
self.timeout = False
self.socket = socket
super(ClientHandler, self).__init__()
def run(self):
while time.time() - self.elapsed < 5.0:
pass
self.timeout = True
# CLIENT TIMED OUT
# HANDLE TERMINATION AND CLEAN UP HERE
def bump(self):
self.elapsed = time.time()
def handle(self, id, header, data):
# HANDLE CLIENT DATA HERE
# ACCUMULATE IMAGE BYTES, ETC
self.socket.send_multipart([id, str(0)])
def server_task():
clients = dict()
context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.setsockopt(zmq.RCVTIMEO, 0)
server.bind("tcp://127.0.0.1:7777")
while True:
try:
id, header, data = server.recv_multipart()
client = clients.get(id)
if client == None:
client = clients[id] = ClientHandler(server)
client.start()
client.bump()
client.handle(id, header, data)
except zmq.Again:
for id in clients.keys():
if clients[id].timeout:
del clients[id]
context.term()
if __name__ == "__main__":
server_task()
但是,整个方法感觉都不对。我会不适当地处理吗?如果是这样,如果有人能指出我正确的方向,我将不胜感激。
最佳答案
我自己想通了,希望对别人有帮助。
相反,我在分配的端口上具有ROUTER,该ROUTER将唯一的端口分配给每个客户端,此客户端随后连接到所述唯一的端口上新绑定的套接字。当客户端断开连接时,将回收端口以进行重新分配。
import sys
import zmq
from multiprocessing import Process, Queue, Value
def server_task():
context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.bind("tcp://127.0.0.1:7777")
timeout_queue = Queue()
port_list = [ 1 ]
proc_list = [ ]
while True:
try:
id = server.recv_multipart()[0]
# Get an unused port from the list
# Ports from clients that have timed out are recycled here
while not timeout_queue.empty():
port_list.append(timeout_queue.get())
port = port_list.pop()
if len(port_list) == 0:
port_list.append(port + 1)
# Spawn a new worker task, binding the port to a socket
proc_running = Value("b", True)
proc_list.append(proc_running)
Process(target=worker_task, args=(proc_running, port, timeout_queue)).start()
# Send the new port to the client
server.send_multipart([id, str(7777 + port)])
except KeyboardInterrupt:
break
# Safely allow our worker processes to terminate
for proc_running in proc_list:
proc_running.value = False
context.term()
def worker_task(proc_running, port, timeout_queue):
context = zmq.Context.instance()
worker = context.socket(zmq.ROUTER)
worker.setsockopt(zmq.RCVTIMEO, 5000)
worker.bind("tcp://127.0.0.1:%d" % (7777 + port, ))
while proc_running.value:
try:
id, data = worker.recv_multipart()
worker.send_multipart([id, data])
except zmq.Again:
timeout_queue.put(port)
context.term()
break
print("Client on port %d disconnected" % (7777 + port, ))