我正在尝试将从大文件行中提取的信息发送到在某些服务器上运行的进程。

为了加快速度,我想并行处理一些线程。

使用concurrent.futures的Python 2.7反向端口,我尝试了以下操作:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
    for line in f:
        e.submit(send_line_function, line)
f.close()


但是,这是有问题的,因为所有期货都会立即提交,因此我的机器会用光内存,因为整个文件都已加载到内存中。

我的问题是,是否有一种简便的方法仅在有免费工作人员可用时才提交新的未来。

最佳答案

您可以使用以下命令遍历文件的大块

for chunk in zip(*[f]*chunksize):


(这是grouper recipe的应用程序,它将来自迭代器f的项目收集为大小为chunksize的组。注意:由于zip在Python3中返回了迭代器,因此不会一次使用整个文件。)



import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

def worker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk in zip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)




现在,您可以在注释中正确指出这不是最佳选择。
可能有一些工人要花很长时间,并且要承担很多工作。

通常,如果每次致电工人花费的时间大致相同,那么这没什么大不了的。但是,这是一种按需推进文件句柄的方法。它使用threading.Condition通知sprinkler推进文件句柄。

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
    for line in iter(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

def sprinkler(cond, queue, num_workers):
    with open("big_file") as f:
        for line in f:
            logger.info('advancing filehandle')
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ in range(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()

关于python - 仅在有免费 worker 的情况下如何产生 future ,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/18770534/

10-11 23:12