本文介绍了并行遍历迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码遇到性能问题.步骤# IIII耗时数小时.我曾经实现之前的itertools.prodct,但是由于用户的缘故,我不再做pro_data = product(array_b,array_a)了.这帮助我解决了内存问题,但是仍然很耗时.我想用多线程或multiprocesisng来形容它,无论您能提出什么建议,我都感激不尽.

I am having performance issues with my code.step # IIII consumes hours of time. I used to materialize thethe itertools.prodct before, but thanks to a user I dont do pro_data = product(array_b,array_a) anymore. This helped me with memory issues, but the still is heavily time consuming.I would like to paralellize it with multithreading or multiprocesisng, whatever you can suggest, I am grateful.

说明.我有两个包含粒子的x和y值的数组.对于每个粒子(由两个坐标定义),我想计算另一个函数.对于组合,我使用itertools.product方法并遍历每个粒子.我总共运行了超过50000个粒子,所以我要计算N * N/2个组合.

Explanation. I have two arrays that contain x and y values of particles. For each particle (defined by two coordinates) I want to calculate a function with another. For combinations I use the itertools.product method and loop over every particle. I run over 50000 particels in total, so I have N*N/2 combinations to calculate.

预先感谢

import numpy as np
import matplotlib.pyplot as plt
from itertools import product,combinations_with_replacement

def func(ar1,ar2,ar3,ar4): #example func that takes four arguments
  return (ar1*ar2**22+np.sin(ar3)+ar4)

def newdist(a):
  return func(a[0][0],a[0][1],a[1][0],a[1][1])

x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram

x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2)
x_width=x_edges[1:]-x_edges[:-1]

hist_data=np.zeros([len(x_edges)-1])

array1=np.random.uniform(0.,10.,100)
array2=np.random.uniform(0.,10.,100)

array_a = np.dstack((array1,array1))[0]
array_b = np.dstack((array2,array2))[0]
# IIII
for i in product(array_a,array_b):
  (result,bins) = np.histogram(newdist(i),bins=x_edges)
  hist_data+=result

hist_data = np.array(map(float, hist_data))
plt.bar(x_mean,hist_data,width=x_width,color='r')
plt.show()

-----编辑-----我现在使用此代码:

-----EDIT-----I used this code now:

def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
  def worker(array_ab, out_q):
      """ push result in queue """
      outdict = {}
      outdict = vec_chunk(array_ab, bins)
      out_q.put(outdict)
  out_q = mp.Queue()
  a = np.swapaxes(array_a, 0 ,1)
  b = np.swapaxes(array_b, 0 ,1)
  array_size_a=len(array_a)-(len(array_a)%d)
  array_size_b=len(array_b)-(len(array_b)%d)
  a_chunk = array_size_a / d
  b_chunk = array_size_b / d
  procs = []
  #prepare arrays for mp
  array_ab = np.empty((4, a_chunk, b_chunk))
  for j in xrange(d):
    for k in xrange(d):
      array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
      array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
      p = mp.Process(target=worker, args=(array_ab, out_q))
      procs.append(p)
      p.start()
  resultarray = np.empty(len(bins)-1)
  for i in range(d):
      resultarray+=out_q.get()
  # Wait for all worker processes to finish
  for pro in procs:
      pro.join()
  print resultarray
  return resultarray

这里的问题是我无法控制进程数.如何使用mp.Pool()代替?比

Problem here is that I cannot control the numbers of processes. How Can I use a mp.Pool() instead?than

推荐答案

使用矢量化numpy操作.通过使用newdist()调用替换product()上的for循环. ="nofollow noreferrer"> meshgrid() .

Use vectorized numpy operations. Replace the for-loop over product() with a single newdist() call by creating arguments using meshgrid().

要使问题并行化,请在与meshgrid()的子块相对应的array_aarray_b的切片上计算newdist(). 这是一个使用切片和多处理的示例.

To parallize the problem compute newdist() on slices of array_a, array_b that correspond to subblocks of meshgrid(). Here's an example using slices and multiprocessing.

这是另一个演示步骤的示例:python循环->向量化的numpy版本->并行:

Here's another example to demonstrate the steps: python loop -> vectorized numpy version -> parallel:

#!/usr/bin/env python
from __future__ import division
import math
import multiprocessing as mp
import numpy as np

try:
    from itertools import izip as zip
except ImportError:
    zip = zip # Python 3

def pi_loop(x, y, npoints):
    """Compute pi using Monte-Carlo method."""
    #  note: the method converges to pi very slowly.
    return 4 * sum(1 for xx, yy in zip(x, y) if (xx**2 + yy**2) < 1) / npoints

def pi_vectorized(x, y, npoints):
    return 4 * ((x**2 + y**2) < 1).sum() / npoints # or just .mean()

def mp_init(x_shared, y_shared):
    global mp_x, mp_y
    mp_x, mp_y = map(np.frombuffer, [x_shared, y_shared]) # no copy

def mp_pi(args):
    # perform computations on slices of mp_x, mp_y
    start, end = args
    x = mp_x[start:end] # no copy
    y = mp_y[start:end]
    return ((x**2 + y**2) < 1).sum()

def pi_parallel(x, y, npoints):
    # compute pi using multiple processes
    pool = mp.Pool(initializer=mp_init, initargs=[x, y])
    step = 100000
    slices = ((start, start + step) for start in range(0, npoints, step))
    return 4 * sum(pool.imap_unordered(mp_pi, slices)) / npoints

def main():
    npoints = 1000000

    # create shared arrays
    x_sh, y_sh = [mp.RawArray('d', npoints) for _ in range(2)]

    # initialize arrays
    x, y = map(np.frombuffer, [x_sh, y_sh])
    x[:] = np.random.uniform(size=npoints)
    y[:] = np.random.uniform(size=npoints)

    for f, a, b in [(pi_loop, x, y),
                    (pi_vectorized, x, y),
                    (pi_parallel, x_sh, y_sh)]:
        pi = f(a, b, npoints)
        precision = int(math.floor(math.log10(npoints)) / 2 - 1 + 0.5)
        print("%.*f %.1e" % (precision + 1, pi, abs(pi - math.pi)))

if __name__=="__main__":
    main()

npoints = 10_000_000的时间性能:

pi_loop pi_vectorized pi_parallel
   32.6         0.159       0.069 # seconds

它显示出主要的性能优势是将python循环转换为其向量化的numpy模拟量.

It shows that the main performance benefit is from converting the python loop to its vectorized numpy analog.

这篇关于并行遍历迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-23 03:24