我在纯python中有以下函数:

import numpy as np

def subtractPython(a, b):
    xAxisCount = a.shape[0]
    yAxisCount = a.shape[1]

    shape = (xAxisCount, yAxisCount, xAxisCount)
    results = np.zeros(shape)
    for index in range(len(b)):
        subtracted = (a - b[index])
        results[:, :, index] = subtracted
    return results

我试着用这种方式把它cythonize化:
import numpy as np
cimport numpy as np

DTYPE = np.int
ctypedef np.int_t DTYPE_t

def subtractPython(np.ndarray[DTYPE_t, ndim=2] a, np.ndarray[DTYPE_t, ndim=2] b):
    cdef int xAxisCount = a.shape[0]
    cdef int yAxisCount = a.shape[1]

    cdef np.ndarray[DTYPE_t, ndim=3] results = np.zeros([xAxisCount, yAxisCount, xAxisCount], dtype=DTYPE)

    cdef int lenB = len(b)

    cdef np.ndarray[DTYPE_t, ndim=2] subtracted
    for index in range(lenB):
        subtracted = (a - b[index])
        results[:, :, index] = subtracted
    return results

不过,我没有看到任何加速。是不是有什么东西我遗漏了或者这个过程不能加快?
编辑->我已经意识到我实际上并没有在上面的代码中使用减法算法。我已经成功地对它进行了Cython处理,但是它与-B[[,NON] ]具有完全相同的运行时,所以我想这是这个操作的最大速度。
这基本上是a-b[:,None]>具有相同的运行时
%%cython

import numpy as np
cimport numpy as np


DTYPE = np.int
ctypedef np.int_t DTYPE_t

cimport cython
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def subtract(np.ndarray[DTYPE_t, ndim=2] a, np.ndarray[DTYPE_t, ndim=2] b):
    cdef np.ndarray[DTYPE_t, ndim=3] result = np.zeros([b.shape[0], a.shape[0], a.shape[1]], dtype=DTYPE)

    cdef int lenB = b.shape[0]
    cdef int lenA = a.shape[0]
    cdef int lenColB = b.shape[1]

    cdef int rowA, rowB, column

    for rowB in range(lenB):
        for rowA in range(lenA):
            for column in range(lenColB):
                result[rowB, rowA, column] = a[rowA, column] - b[rowB, column]
    return result

最佳答案

当试图优化一个函数时,人们总是应该知道这个函数的瓶颈是什么——如果没有你会花很多时间朝错误的方向运行。
让我们使用您的python函数作为基线(实际上我使用result=np.zeros(shape,dtype=a.dtype)否则您的方法返回floats,这可能是一个错误):

>>> import numpy as np
>>> a=np.random.randint(1,1000,(300,300), dtype=np.int)
>>> b=np.random.randint(1,1000,(300,300), dtype=np.int)
>>> %timeit subtractPython(a,b)
274 ms ± 3.61 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们应该问自己的第一个问题是:这个任务是内存还是CPU受限?显然,这是一个内存受限的任务—与所需的内存读写访问相比,减法根本算不上什么。
这意味着,我们必须优化内存布局,以减少缓存未命中。根据经验,我们的内存访问应该一个接一个地访问连续的内存地址。
是这样吗?不,数组result按C顺序排列,即行主顺序,因此访问
results[:, :, index] = subtracted

不是连续的。另一方面,
results[index, :, :] = subtracted

将是一个连续的访问。让我们改变信息存储在result中的方式:
def subtract1(a, b):
    xAxisCount = a.shape[0]
    yAxisCount = a.shape[1]

    shape = (xAxisCount,  xAxisCount, yAxisCount) #<=== Change order
    results = np.zeros(shape, dtype=a.dtype)
    for index in range(len(b)):
        subtracted = (a - b[index])
        results[index, :, :] = subtracted   #<===== consecutive access
    return results

现在的时间安排是:
>>> %timeit subtract1(a,b)
>>> 35.8 ms ± 285 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

还有另外两个小的改进:我们不必用零初始化结果,我们可以节省一些python开销,但这只会给我们带来大约5%的开销:
def subtract2(a, b):
    xAxisCount = a.shape[0]
    yAxisCount = a.shape[1]

    shape = (xAxisCount,  xAxisCount, yAxisCount)
    results = np.empty(shape, dtype=a.dtype)        #<=== no need for zeros
    for index in range(len(b)):
        results[index, :, :] = (a-b[index])   #<===== less python overhead
    return results

>>> %timeit subtract2(a,b)
34.5 ms ± 203 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

现在这比原来的版本快8倍。
你可以使用Cython来进一步加速这一过程,但是任务可能仍然是内存受限的,所以不要指望它能更快地完成,毕竟Cython不能让内存更快地工作。然而,如果没有适当的分析,很难说有多大的改进是可能的——如果有人能想出一个更快的版本,也不会感到惊讶。

关于python - numpy函数的cythonization,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48974747/

10-13 08:50
查看更多