关于

我有一个需要使用API​​凭证实例化的类DataRetriever。我有五组不同的API凭据,因此我想实例化DataRetriever的五个实例。 DataRetriever只有一个公共方法retrieve,顾名思义,该方法将基于传递给该方法的subprocess使用id检索某些数据。


给定的API凭证不能同时打开一个以上的流(具有任何ID)
一个DataRetriever最多只能有一个与API的连接,因此不能在仍在检索数据流的DataRetriever#retrieve(id)实例上调用DataRetriever
数据量各不相同,因此直到子流程退出的时间可以是几秒钟到几分钟之间的任何时间


目前的方法

如示例片段所示,我正在使用queue。我用所有需要检索的数据流的所有id填充队列。

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()




我总是可以使用观察者模式,但是我想知道是否有Python的方式可以做到这一点?


如何确保上面的代码片段中的worker在无缝使用DataRetriever的所有五个实例的同时将排队的工作负载分配给仅空闲的DataRetriever
在研究时,我发现有关ProcessPoolExecutor的示例无法适应我的情况。这可能是解决方案吗?

最佳答案

您可以执行以下操作:

def worker(q_request, q_response, api_cred):
    dr = DataRetriever(api_cred)
    while True:
        stream_id = q_request.get() # that's blocking unless q.get(False)
        if stream_id == "stop":
            sys.exit(0)
        dr.retrieve(stream_id) # that can take some time (assume blocking)
        q_response.put(stream_id) # signal job has ended to parent process

api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
    t.start()
    threads.append(t)

for item in source():
    q_request.put(item)
    print("Stream ID %s was successfully retrieved." %q_response.get())


这假定dr.retrieve(stream_id)正在阻塞,或者您有某种方式知道由dr.retrieve(stream_id)启动的子进程尚未完成,因此您的工作人员将阻塞直到完成为止(否则DataRetriever的实现必须更改)。

q.get()默认情况下处于阻止状态,因此您的worker进程将与其他进程并列等待某个对象接收它。 Queue()对象也是FIFO,因此您可以确保工作将在worker进程之间平均分配。

10-06 02:07