我在纯python中有以下函数:
import numpy as np
def subtractPython(a, b):
xAxisCount = a.shape[0]
yAxisCount = a.shape[1]
shape = (xAxisCount, yAxisCount, xAxisCount)
results = np.zeros(shape)
for index in range(len(b)):
subtracted = (a - b[index])
results[:, :, index] = subtracted
return results
我试着用这种方式把它cythonize化:
import numpy as np
cimport numpy as np
DTYPE = np.int
ctypedef np.int_t DTYPE_t
def subtractPython(np.ndarray[DTYPE_t, ndim=2] a, np.ndarray[DTYPE_t, ndim=2] b):
cdef int xAxisCount = a.shape[0]
cdef int yAxisCount = a.shape[1]
cdef np.ndarray[DTYPE_t, ndim=3] results = np.zeros([xAxisCount, yAxisCount, xAxisCount], dtype=DTYPE)
cdef int lenB = len(b)
cdef np.ndarray[DTYPE_t, ndim=2] subtracted
for index in range(lenB):
subtracted = (a - b[index])
results[:, :, index] = subtracted
return results
不过,我没有看到任何加速。是不是有什么东西我遗漏了或者这个过程不能加快?
编辑->我已经意识到我实际上并没有在上面的代码中使用减法算法。我已经成功地对它进行了Cython处理,但是它与-B[[,NON] ]具有完全相同的运行时,所以我想这是这个操作的最大速度。
这基本上是a-b[:,None]>具有相同的运行时
%%cython
import numpy as np
cimport numpy as np
DTYPE = np.int
ctypedef np.int_t DTYPE_t
cimport cython
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False) # turn off negative index wrapping for entire function
def subtract(np.ndarray[DTYPE_t, ndim=2] a, np.ndarray[DTYPE_t, ndim=2] b):
cdef np.ndarray[DTYPE_t, ndim=3] result = np.zeros([b.shape[0], a.shape[0], a.shape[1]], dtype=DTYPE)
cdef int lenB = b.shape[0]
cdef int lenA = a.shape[0]
cdef int lenColB = b.shape[1]
cdef int rowA, rowB, column
for rowB in range(lenB):
for rowA in range(lenA):
for column in range(lenColB):
result[rowB, rowA, column] = a[rowA, column] - b[rowB, column]
return result
最佳答案
当试图优化一个函数时,人们总是应该知道这个函数的瓶颈是什么——如果没有你会花很多时间朝错误的方向运行。
让我们使用您的python函数作为基线(实际上我使用result=np.zeros(shape,dtype=a.dtype)
否则您的方法返回floats
,这可能是一个错误):
>>> import numpy as np
>>> a=np.random.randint(1,1000,(300,300), dtype=np.int)
>>> b=np.random.randint(1,1000,(300,300), dtype=np.int)
>>> %timeit subtractPython(a,b)
274 ms ± 3.61 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我们应该问自己的第一个问题是:这个任务是内存还是CPU受限?显然,这是一个内存受限的任务—与所需的内存读写访问相比,减法根本算不上什么。
这意味着,我们必须优化内存布局,以减少缓存未命中。根据经验,我们的内存访问应该一个接一个地访问连续的内存地址。
是这样吗?不,数组
result
按C顺序排列,即行主顺序,因此访问results[:, :, index] = subtracted
不是连续的。另一方面,
results[index, :, :] = subtracted
将是一个连续的访问。让我们改变信息存储在
result
中的方式:def subtract1(a, b):
xAxisCount = a.shape[0]
yAxisCount = a.shape[1]
shape = (xAxisCount, xAxisCount, yAxisCount) #<=== Change order
results = np.zeros(shape, dtype=a.dtype)
for index in range(len(b)):
subtracted = (a - b[index])
results[index, :, :] = subtracted #<===== consecutive access
return results
现在的时间安排是:
>>> %timeit subtract1(a,b)
>>> 35.8 ms ± 285 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
还有另外两个小的改进:我们不必用零初始化结果,我们可以节省一些python开销,但这只会给我们带来大约5%的开销:
def subtract2(a, b):
xAxisCount = a.shape[0]
yAxisCount = a.shape[1]
shape = (xAxisCount, xAxisCount, yAxisCount)
results = np.empty(shape, dtype=a.dtype) #<=== no need for zeros
for index in range(len(b)):
results[index, :, :] = (a-b[index]) #<===== less python overhead
return results
>>> %timeit subtract2(a,b)
34.5 ms ± 203 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
现在这比原来的版本快8倍。
你可以使用Cython来进一步加速这一过程,但是任务可能仍然是内存受限的,所以不要指望它能更快地完成,毕竟Cython不能让内存更快地工作。然而,如果没有适当的分析,很难说有多大的改进是可能的——如果有人能想出一个更快的版本,也不会感到惊讶。
关于python - numpy函数的cythonization,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48974747/