我有一个现有的Python代码,它在我的机器的核心上并行运行。它完成的工作基本上是打开一个输入文件,读取内容,执行一些相当繁重的数学运算,将结果写入一个输出文件,在for循环中获取下一个文件并再次执行为了在多个核心上实现这种并行,我使用了Pool库中的multiprocessing函数。举个简单的例子:

import multiprocessing
import time

data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(8)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

这个例子只是用来说明我是如何在8个核上实现multiprocessing.Pool函数的。本质上,我的代码中的mp_worker函数要复杂得多,但您可以理解我的意思。
我逐渐意识到,我工作的网络中有好几台机器在99%的时间内都处于空闲状态因此,我想知道是否有办法在这段代码中使用它们的核心以及我的本地核心。
在伪代码中,代码可能会变成:
def mp_handler():
    p = multiprocessing.Pool(servers=['local host', 192.168.0.1, 192.168.0.2], ncores=[8,8,4])
    p.map(mp_worker, data)

在这里,我现在可以指定我的本地机器和其他IP地址作为服务器,以及我想在每台机器上使用的核心数。
因为我网络上的其他机器都归我所有,而且都没有连接到Internet,所以出于安全目的使用ssh并不麻烦。
在google上搜索我注意到pathosscoop库可以帮助我解决这个问题。看起来pathos的命令与multiprocessing库非常相似,这对我很有吸引力。但是,在这两种情况下,我都找不到一个简单的例子来说明如何将本地并行作业转换为分布式并行作业。我希望尽可能靠近multiprocessing库的pool/map功能。
任何帮助或例子将不胜感激!

最佳答案

pathos中的示例非常类似于伪代码。

from pathos.parallel import stats
from pathos.parallel import ParallelPool as Pool
pool = Pool()

def host(id):
    import socket
    import time
    time.sleep(1.0)
    return "Rank: %d -- %s" % (id, socket.gethostname())


print "Evaluate 10 items on 2 cpus"
pool.ncpus = 2
pool.servers = ('localhost:5653',)
res5 = pool.map(host, range(10))
print pool
print '\n'.join(res5)
print stats()
print ''

上面,您可以在初始化ncpus实例时将serversPool设置为关键字。
结果如下:
Evaluate 10 items on 2 cpus
<pool ParallelPool(ncpus=2, servers=('localhost:5653',))>
Rank: 0 -- hilbert.local
Rank: 1 -- hilbert.local
Rank: 2 -- hilbert.local
Rank: 3 -- hilbert.local
Rank: 4 -- hilbert.local
Rank: 5 -- hilbert.local
Rank: 6 -- hilbert.local
Rank: 7 -- hilbert.local
Rank: 8 -- hilbert.local
Rank: 9 -- hilbert.local
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
        10 |        100.00 |      10.0459 |     1.004588 | local
Time elapsed since server creation 5.0402431488
0 active tasks, 2 cores

如果您有多个服务器(可能有远程服务器),只需向servers元组添加更多条目。所以这并不是一个完美的例子,因为它并没有显示如何让服务器在另一台机器上运行。然而,这是一个很好的例子,如果您确实计划使用sshtunnel,您应该知道您没有指向远程机器,而是指向带有隧道端口的pathos,并且连接到远程机器。
由于localhost使用pathos(这是ppft的分支),您可以查看pp中有关如何设置远程服务器的示例基本上,您可以使用shell脚本执行以下操作:
for i in $nodes
do
    ssh -f $i /home/username/bin/ppserver.py -p $portnum -w 2 -t 30 &
done

在这里,循环在接收的节点(节点)上对于每个节点,使用pp命令启动具有指定端口(-p)、两个工作线程(-w)和空闲30秒后超时(-t)的ssh -f。请参阅ppserver文档(http://www.parallelpython.com/content/view/15/30)。使用pp,您只需要启动一个pathos,并指定使其工作的端口然后,将主机名和端口添加到第一个代码块中的ppserver元组。
但是,如果您不喜欢手动设置,那么server确实提供了设置pathostunnel的脚本使用脚本比手工操作有点不灵活,当事情出错时更难诊断……但是……参见这里的脚本:https://github.com/uqfoundation/pathos/tree/master/scripts

关于python - Python中的分布式多处理池,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36167335/

10-11 03:56