问题描述
我有一个包含10,000行的文件,每一行代表一个下载作业的参数.我喜欢5个自定义下载器.每个作业可能需要5秒钟到2分钟的时间.我将如何创建遍历10,000行的内容,如果该下载器当前无法正常工作,则将其分配给该下载器?
I have a file with like 10,000 rows, each row represents parameters to a download job. I have like 5 custom downloaders. Each job can take anywhere from 5 seconds to 2 minutes. How would I create something that iterates through the 10,000 rows, assigning each job to a downloader if that downloader isn't currently working?
对我来说困难的部分是每个Downloader
是一个类的实例,实例之间的区别是实例化5个Downloader
对象中的每一个时我指定的port_numbers.所以我有a = Downloader(port_number=7751) ... e = Downloader(port_number=7755)
.然后,如果要使用Downloader
,我将执行a.run(row)
.
The difficult part for me is that each Downloader
is an instance of a class, and the differences between the instances are the port_numbers I specify when I instantiate each of the 5 Downloader
objects. So I have a = Downloader(port_number=7751) ... e = Downloader(port_number=7755)
. Then, if I were to use a Downloader
I would do a.run(row)
.
我如何将这些工人定义为这些a, b, c, d, e
而不是downloader function
?
How do I define the workers as these a, b, c, d, e
rather than a downloader function
?
推荐答案
有很多方法可以做到-最简单的方法就是只使用multiprocessing.Pool
并让它为您组织工作人员-1万行不是全部这么多,比方说,一个普通的URL甚至整整一个KB长,它仍然只需要10MB的内存,而内存却很便宜.
There are many ways to do it - the simplest way would be to just use multiprocessing.Pool
and let it organize the workers for you - 10k rows is not all that much, let's say that an average URL is even a full kilobyte long it will still take only 10MB of memory and memory is cheap.
因此,只需读取内存中的文件并将其映射到multiprocessing.Pool
即可进行出价:
So, just read the file in memory and map it to multiprocessing.Pool
to do your bidding:
from multiprocessing import Pool
def downloader(param): # our downloader process
# download code here
# param will hold a line from your file (including newline at the end, strip before use)
# e.g. res = requests.get(param.strip())
return True # lets provide some response back
if __name__ == "__main__": # important protection for cross-platform use
with open("your_file.dat", "r") as f: # open your file
download_jobs = f.readlines() # store each line in a list
download_pool = Pool(processes=5) # make our pool use 5 processes
responses = download_pool.map(downloader, download_jobs) # map our data, line by line
download_pool.close() # lets exit cleanly
# you can check the responses for each line in the `responses` list
如果需要共享内存,也可以使用threading
代替multiprocessing
(或multiprocessing.pool.ThreadPool
的替代品)在单个进程内完成所有操作.除非您正在执行其他处理,否则单线程足以满足下载目的.
You can also use threading
instead of multiprocessing
(or multiprocessing.pool.ThreadPool
as a drop-in replacement for this) to do everything within a single process if you need shared memory. A single thread is more than enough for download purposes unless you're doing additional processing.
如果希望下载程序作为类实例运行,则可以将downloader
函数转换为Downloader
实例的工厂,然后只需传递将实例化所需的实例以及URL即可.这是一种简单的循环方法:
If you want your downloaders to run as class instances, you can transform the downloader
function into a factory for your Downloader
instances, and then just pass what you need to instantiate those instances alongside your URLs. Here is a simple Round-Robin approach:
from itertools import cycle
from multiprocessing import Pool
class Downloader(object):
def __init__(self, port_number=8080):
self.port_number = port_number
def run(self, url):
print("Downloading {} on port {}".format(url, self.port_number))
def init_downloader(params): # our downloader initializator
downloader = Downloader(**params[0]) # instantiate our downloader
downloader.run(params[1]) # run our downloader
return True # you can provide your
if __name__ == "__main__": # important protection for cross-platform use
downloader_params = [ # Downloaders will be initialized using these params
{"port_number": 7751},
{"port_number": 7851},
{"port_number": 7951}
]
downloader_cycle = cycle(downloader_params) # use cycle for round-robin distribution
with open("your_file.dat", "r") as f: # open your file
# read our file line by line and attach downloader params to it
download_jobs = [[next(downloader_cycle), row.strip()] for row in f]
download_pool = Pool(processes=5) # make our pool use 5 processes
responses = download_pool.map(init_downloader, download_jobs) # map our data
download_pool.close() # lets exit cleanly
# you can check the responses for each line in the `responses` list
请记住,这不是最平衡的解决方案,因为它可能碰巧有两个运行相同端口的Downloader
实例,但是会平均足够大的数据.
Keep in mind that this is not the most balanced solution as it can happen to have two Downloader
instances with the same port running, but it will average over large enough data.
如果要确保没有两个Downloader
实例在同一端口上运行,则要么需要构建自己的池,要么需要创建一个中央进程来在需要时将端口分配给您的Downloader
实例.
If you want to make sure that you don't have two Downloader
instances running off of the same port, you'll either need to build your own pool, or you'll need to create a central process that will issue ports to your Downloader
instances when they need them.
这篇关于Python 3-如何正确设置此多处理作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!