我有一个concurrent.futures.ThreadPoolExecutor和一个列表。并使用以下代码将 future 添加到ThreadPoolExecutor:

for id in id_list:
    future = self._thread_pool.submit(self.myfunc, id)
    self._futures.append(future)

然后我等待列表:
concurrent.futures.wait(self._futures)

但是,self.myfunc会执行一些网络I/O,因此会出现一些网络异常。发生错误时,self.myfunc将具有相同self.myfunc的新id提交到相同的线程池,并将新的future添加到相同的列表中,就像上面的一样:
try:
    do_stuff(id)
except:
    future = self._thread_pool.submit(self.myfunc, id)
    self._futures.append(future)
    return None

问题来了:concurrent.futures.wait(self._futures)上出现错误:
File "/usr/lib/python3.4/concurrent/futures/_base.py", line 277, in wait
    f._waiters.remove(waiter)
ValueError: list.remove(x): x not in list

在等待时如何将新的 future 适本地添加到列表中?

最佳答案

查看wait()的实现,当然不希望concurrent.futures之外的任何内容都会使传递给它的列表发生变化。因此,我认为您永远不会“工作”。这不仅是因为它不希望列表发生突变,而且还需要对列表条目进行大量处理,并且实现方式无法知道您添加了更多条目。

未经测试,我建议改用这种方法:跳过所有步骤,仅保持运行中的运行线程数仍处于 Activity 状态。一种简单的方法是使用保护计数的Condition

初始化:

self._count_cond = threading.Condition()
self._thread_count = 0

输入my_func后(即新线程启动时):
with self._count_cond:
    self._thread_count += 1

完成my_func后(即线程结束时),无论出于何种原因(无论是否异常):
with self._count_cond:
    self._thread_count -= 1
    self._count_cond.notify() # wake up the waiting logic

最后是主要的等待逻辑:
with self._count_cond:
    while self._thread_count:
        self._count_cond.wait()

可能的比赛

在提交新线程的工作期间,但在其my_func调用开始运行之前(因此在_thread_count递增以解决新线程之前),线程计数似乎可能达到0。

所以:
with self._count_cond:
    self._thread_count += 1

实际上,应该在每次出现之前立即完成部分操作
self._thread_pool.submit(self.myfunc, id)

或编写一个新方法来封装该模式;例如,像这样:
def start_new_thread(self, id):
    with self._count_cond:
        self._thread_count += 1
    self._thread_pool.submit(self.myfunc, id)

不同的方法

暂时,我希望这也可以工作(但是,再次,它没有经过测试):除了更改您的等待方式之外,所有代码都保持相同:
while self._futures:
    self._futures.pop().result()

因此,这仅一次等待一个线程,直到没有线程可用为止。

请注意,列表中的.pop().append()在CPython中是原子的,因此不需要您自己的锁。并且由于您的my_func()代码在其运行的线程结束之前追加,因此列表不会在所有线程真正完成之前为空。

还有另一种方法

保留原始的等待代码,但对其余部分进行重新处理,以防万一发生异常时不创建新线程。就像重写my_func以返回True(如果由于异常而退出)一样,否则返回False,然后启动运行包装器的线程:
def my_func_wrapper(self, id):
    keep_going = True
    while keep_going:
        keep_going = self.my_func(id)

如果您有朝一日决定使用多个进程而不是多个线程,那么这可能会特别有吸引力(在某些平台上创建新进程的成本可能会高得多)。

和使用cf.wait()的方法

另一种方法是只更改等待的代码:
while self._futures:
    fs = self._futures[:]
    for f in fs:
        self._futures.remove(f)
    concurrent.futures.wait(fs)

清除?这会使列表的副本传递给.wait(),并且该副本永远不会发生突变。新线程将显示在原始列表中,并且重复整个过程,直到没有新线程显示为止。

在我看来,哪种方式最有意义,似乎主要取决于语用,但是您没有足够的信息让我对此做出猜测。

关于python - Python 3 : How to properly add new Futures to a list while already waiting upon it?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38258774/

10-10 01:11