问题描述
我正在使用python(单个进程版本)进行频率字计数:
I'm doing a frequency word count using python, the single process version:
#coding=utf-8
import string
import time
from collections import Counter
starttime = time.clock()
origin = open("document.txt", 'r').read().lower()
for_split = [',','\n','\t','\'','.','\"','!','?','-', '~']
#the words below will be ignoered when counting
ignored = ['the', 'and', 'i', 'to', 'of', 'a', 'in', 'was', 'that', 'had',
'he', 'you', 'his','my', 'it', 'as', 'with', 'her', 'for', 'on']
i=0
for ch in for_split:
origin = string.replace(origin, ch, ' ')
words = string.split(origin)
result = Counter(words).most_common(40)
for word, frequency in result:
if not word in ignored and i < 10:
print "%s : %d" % (word, frequency)
i = i+1
print time.clock() - starttime
然后,多处理版本如下:
then the multiprocessing version looks like:
#coding=utf-8
import time
import multiprocessing
from collections import Counter
for_split = [',','\n','\t','\'','.','\"','!','?','-', '~']
ignored = ['the', 'and', 'i', 'to', 'of', 'a', 'in', 'was', 'that', 'had',
'he', 'you', 'his','my', 'it', 'as', 'with', 'her', 'for', 'on']
result_list = []
def worker(substr):
result = Counter(substr)
return result
def log_result(result):
result_list.append(result)
def main():
pool = multiprocessing.Pool(processes=5)
origin = open("document.txt", 'r').read().lower()
for ch in for_split:
origin = origin.replace(ch, ' ')
words = origin.split()
step = len(words)/4
substrs = [words[pos : pos+step] for pos in range(0, len(words), step)]
result = Counter()
for substr in substrs:
pool.apply_async(worker, args=(substr,), callback = log_result)
pool.close()
pool.join()
result = Counter()
for item in result_list:
result = result + item
result = result.most_common(40)
i=0
for word, frequency in result:
if not word in ignored and i < 10:
print "%s : %d" % (word, frequency)
i = i+1
if __name__ == "__main__":
starttime = time.clock()
main()
print time.clock() - starttime
"document.txt"大约为2200万,我的笔记本电脑具有内核,2G内存,第一个版本的结果是3.27s,第二个版本的结果是8.15s,我更改了进程数( pool = multiprocessing.Pool(processes = 5)),从2到10,结果几乎保持不变,为什么呢,我该如何使该程序比单进程版本运行得更快呢?
the "document.txt" is about 22M, my laptop has to cores, 2G memory, the result of first version is 3.27s, and the second one is 8.15s, I've changed num of processes(pool = multiprocessing.Pool(processes=5)), from 2 to 10, the results remain almost the same, why is that, how can I make this program run faser than the single process version?
推荐答案
我认为这是与将单个字符串分发给工作人员并接收结果相关的开销.如果我使用示例文档(Dostojevski的犯罪与处罚")按照上面给出的方式运行并行代码,则该过程需要大约0.32 s来运行,而单进程版本仅需要0.09 s.如果我修改worker
函数以仅处理字符串"test"而不是实际文档(仍将真实字符串作为参数传递),则运行时将降至0.22 s.但是,如果我将"test"作为参数传递给map_async
函数,则运行时将减少至0.06 s.因此,我想说的是,在您的情况下,程序的运行时受到进程间通信开销的限制.
I think it's the overhead associated with distributing the individual strings to the workers and receiving the results. If I run your parallel code as given above with an example document (Dostojevski's "Crime and Punishment") it takes about 0.32 s to run, whereas the single-process version takes just 0.09 s. If I modify the worker
function to just process the string "test" instead of the real document (still passing in the real string as an argument), the runtime goes down to 0.22 s. However, if I pass in "test" as argument to the map_async
function, the runtime decreases to 0.06 s. Hence I'd say that in your case the runtime of the program is limited by the inter-process communication overhead.
使用以下代码,可以将并行版本的运行时间降低到0.08 s:首先,我将文件划分为多个(几乎)相等长度的块,并确保各个块之间的边界确实与新队.然后,我简单地将块的长度和偏移量传递给每个工作进程,让它打开文件,读取块,对其进行处理并返回结果.与通过map_async函数直接分配字符串相比,这似乎引起的开销要少得多.对于较大的文件大小,您应该可以使用此代码看到运行时的改进.另外,如果您可以容忍少量错误,则可以省略确定正确块边界的步骤,而只需将文件分成相等的大块即可.在我的示例中,这将运行时间缩短到0.04 s,使mp代码比单进程的速度更快.
With the following code I get the runtime of the parallel version down to 0.08 s: First, I partition the file into a number of chunks with (almost) equal length, making sure that the boundary between individual chunks does coincide with a newline. Then, I simply pass in the length and offsets of the chunks to each worker process, let it open the file, read the chunk, process it and return the results. This seems to cause significantly less overhead than directly distributing the strings through the map_async function. For larger file sizes you should be able to see an improvement in the runtime using this code. Also, if you can tolerate small count errors, you can omit the step to determine correct chunk boundaries and just split the file into equally large chunks. In my example, this brings down the runtime to 0.04 s, making the mp code faster than the single-process one.
#coding=utf-8
import time
import multiprocessing
import string
from collections import Counter
import os
for_split = [',','\n','\t','\'','.','\"','!','?','-', '~']
ignored = ['the', 'and', 'i', 'to', 'of', 'a', 'in', 'was', 'that', 'had',
'he', 'you', 'his','my', 'it', 'as', 'with', 'her', 'for', 'on']
result_list = []
def worker(offset,length,filename):
origin = open(filename, 'r')
origin.seek(offset)
content = origin.read(length).lower()
for ch in for_split:
content = content.replace(ch, ' ')
words = string.split(content)
result = Counter(words)
origin.close()
return result
def log_result(result):
result_list.append(result)
def main():
processes = 5
pool = multiprocessing.Pool(processes=processes)
filename = "document.txt"
file_size = os.stat(filename)[6]
chunks = []
origin = open(filename, 'r')
while True:
lines = origin.readlines(file_size/processes)
if not lines:
break
chunks.append("\n".join(lines))
lengths = [len(chunk) for chunk in chunks]
offset = 0
for length in lengths:
pool.apply_async(worker, args=(offset,length,filename,), callback = log_result)
offset += length
pool.close()
pool.join()
result = Counter()
for item in result_list:
result = result + item
result = result.most_common(40)
i=0
for word, frequency in result:
if not word in ignored and i < 10:
print "%s : %d" % (word, frequency)
i = i+1
if __name__ == "__main__":
starttime = time.clock()
main()
print time.clock() - starttime
这篇关于python的前N个字数,为什么多进程比单进程慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!