关于
我有一个需要使用API凭证实例化的类DataRetriever
。我有五组不同的API凭据,因此我想实例化DataRetriever
的五个实例。 DataRetriever
只有一个公共方法retrieve
,顾名思义,该方法将基于传递给该方法的subprocess
使用id
检索某些数据。
给定的API凭证不能同时打开一个以上的流(具有任何ID)
一个DataRetriever
最多只能有一个与API的连接,因此不能在仍在检索数据流的DataRetriever#retrieve(id)
实例上调用DataRetriever
数据量各不相同,因此直到子流程退出的时间可以是几秒钟到几分钟之间的任何时间
目前的方法
如示例片段所示,我正在使用queue
。我用所有需要检索的数据流的所有id
填充队列。
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
题
我总是可以使用观察者模式,但是我想知道是否有Python的方式可以做到这一点?
如何确保上面的代码片段中的
worker
在无缝使用DataRetriever
的所有五个实例的同时将排队的工作负载分配给仅空闲的DataRetriever
?在研究时,我发现有关
ProcessPoolExecutor
的示例无法适应我的情况。这可能是解决方案吗? 最佳答案
您可以执行以下操作:
def worker(q_request, q_response, api_cred):
dr = DataRetriever(api_cred)
while True:
stream_id = q_request.get() # that's blocking unless q.get(False)
if stream_id == "stop":
sys.exit(0)
dr.retrieve(stream_id) # that can take some time (assume blocking)
q_response.put(stream_id) # signal job has ended to parent process
api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
t.start()
threads.append(t)
for item in source():
q_request.put(item)
print("Stream ID %s was successfully retrieved." %q_response.get())
这假定
dr.retrieve(stream_id)
正在阻塞,或者您有某种方式知道由dr.retrieve(stream_id)
启动的子进程尚未完成,因此您的工作人员将阻塞直到完成为止(否则DataRetriever
的实现必须更改)。q.get()
默认情况下处于阻止状态,因此您的worker
进程将与其他进程并列等待某个对象接收它。 Queue()
对象也是FIFO,因此您可以确保工作将在worker
进程之间平均分配。