我有一个包含10,000行的文件,每一行代表一个下载作业的参数。我喜欢5个自定义下载器。每个作业可能需要5秒钟到2分钟。如果该下载器当前无法正常工作,我将如何创建遍历10,000行的对象,将每个作业分配给该下载器?

编辑:

对我来说,最困难的部分是每个Downloader是一个类的实例,实例之间的差异是实例化5个Downloader对象中的每一个时我指定的port_numbers。所以我有a = Downloader(port_number=7751) ... e = Downloader(port_number=7755)。然后,如果要使用Downloader,我将执行a.run(row)

如何将工人定义为这些a, b, c, d, e而不是downloader function

最佳答案

有很多方法可以做到-最简单的方法就是使用multiprocessing.Pool并让它为您组织工作人员-1万行并不是很多,比方说一个平均URL甚至整整一个千字节。仍然只占用10MB的内存,而且内存很便宜。

因此,只需读取内存中的文件并将其映射到multiprocessing.Pool即可进行出价:

from multiprocessing import Pool

def downloader(param):  # our downloader process
    # download code here
    # param will hold a line from your file (including newline at the end, strip before use)
    # e.g. res = requests.get(param.strip())
    return True  # lets provide some response back

if __name__ == "__main__":  # important protection for cross-platform use

    with open("your_file.dat", "r") as f:  # open your file
        download_jobs = f.readlines()  # store each line in a list

    download_pool = Pool(processes=5)  # make our pool use 5 processes
    responses = download_pool.map(downloader, download_jobs)  # map our data, line by line
    download_pool.close()  # lets exit cleanly
    # you can check the responses for each line in the `responses` list


如果需要共享内存,还可以使用threading代替multiprocessing(或multiprocessing.pool.ThreadPool的替代品)在单个进程内完成所有操作。除非您正在执行其他处理,否则单线程足以满足下载目的。

更新

如果希望下载程序作为类实例运行,则可以将downloader函数转换为Downloader实例的工厂,然后只需传递将URL实例化这些实例所需的内容即可。这是一种简单的循环方法:

from itertools import cycle
from multiprocessing import Pool

class Downloader(object):

    def __init__(self, port_number=8080):
        self.port_number = port_number

    def run(self, url):
        print("Downloading {} on port {}".format(url, self.port_number))

def init_downloader(params):  # our downloader initializator
    downloader = Downloader(**params[0])  # instantiate our downloader
    downloader.run(params[1])  # run our downloader
    return True  # you can provide your

if __name__ == "__main__":  # important protection for cross-platform use

    downloader_params = [  # Downloaders will be initialized using these params
        {"port_number": 7751},
        {"port_number": 7851},
        {"port_number": 7951}
    ]

    downloader_cycle = cycle(downloader_params)  # use cycle for round-robin distribution
    with open("your_file.dat", "r") as f:  # open your file
        # read our file line by line and attach downloader params to it
        download_jobs = [[next(downloader_cycle), row.strip()] for row in f]

    download_pool = Pool(processes=5)  # make our pool use 5 processes
    responses = download_pool.map(init_downloader, download_jobs)  # map our data
    download_pool.close()  # lets exit cleanly
    # you can check the responses for each line in the `responses` list


请记住,这不是最平衡的解决方案,因为它可能碰巧有两个运行相同端口的Downloader实例,但是它将平均处理足够大的数据。

如果要确保没有两个Downloader实例在同一个端口上运行,则要么需要构建自己的池,要么需要创建一个中央进程来发布端口到您的Downloader实例何时需要它们。

10-06 01:57