让A成为一个5到2千万的核一维数组。
我想确定,对于每个iA[i-1000000], A[i-999999], ..., A[i-2], A[i-1]中有多少项小于A[i]

我测试了各种方法,在Rolling comparison between a value and a past window, with percentile/quantile中给出了一些答案:

import numpy as np
A = np.random.random(5*1000*1000)
n = 1000*1000
B = (np.lib.stride_tricks.as_strided(A, shape=(n,A.size-n), strides=(A.itemsize,A.itemsize)) <= A[n:]).sum(0)
#or similar version with "view_as_windows(A, n)"

最后最快的解决方案是一些简单的代码+numba:
from numba import jit, prange
@jit(parallel=True)
    def doit(A, n):
        Q = np.zeros(len(A))
        for i in prange(n, len(Q)):
            Q[i] = np.sum(A[i-n:i] <= A[i])
        return(Q)
C = doit(A, n)

但即使有了这段代码,对于我来说它也太慢了,长度为500万,n=100万:大约30分钟就可以完成这个计算了!
有没有一个更聪明的想法可以使用,避免对输出的每个元素重新比较100万个条目?
注:具有近似10的(3)精度的比例,比如“1-百万个项目中的34.3%个小于一个[i]”就足够了。

最佳答案

。。
。我不确定能不能使它更容易阅读。
基本思想是将数组划分成类似二叉树的东西(抱歉,没有正式的scicomp训练)。


。原则上,在每次迭代中,排序的成本应该保持线性,因为在理论上(而不是在numpy中),我们可以充分利用较小块的排序性。
。。。
Anyway, here is a sample run

size 5_000_000, lookback 1_000_000 -- took 14.593 seconds
seems correct -- 10_000 samples checked

and the code:
更新:代码简化了一点,运行速度也更快了
UPDATE 2: added a version that does ""
import numpy as np
from numpy.lib.stride_tricks import as_strided

def add_along_axis(a, indices, values, axis):
    if axis<0:
        axis += a.ndim
    I = np.ogrid[(*map(slice, a.shape),)]
    I = *I[:axis], indices, *I[axis+1:]
    a[I] += values

aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index

def inv_perm(p):
    i = np.empty_like(p)
    paa(i, p, np.arange(p.shape[-1]), -1)
    return i

def rolling_count_smaller(data, n):
    N = len(data)
    b = n.bit_length()
    NN = (((N-1)>>b)+2)<<b
    d0 = np.empty(NN, data.dtype)
    d0[NN-N:] = data[::-1]
    d0[:NN-N] = data.max() + 1
    dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
    ch, ch2 = 1, 2
    for i in range(b-1):
        d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
        sh = dt.shape
        (il, ir), (jl, jr), (k, _) = f2m(m2f(np.add(sh, (-1, -2, -1)), sh) - (n, n-ch), sh)
        I = min(il, ir) + 1
        bab = np.empty((I, ch2), dt.dtype)
        bab[:, ch:] = dt[sh[0]-I:, 0]
        IL, IR = np.s_[il-I+1:il+1, ir-I+1:ir+1]
        bab[:, k:ch] = d0[IL, jl, k:]
        bab[:, :k] = d0[IR, jr, :k]
        o = bab.argsort(1, kind='stable')
        ns, io = (o>=ch).cumsum(1), inv_perm(o)
        r0[IL, jl, k:] += taa(ns, io[:, k:ch], 1)
        r0[IR, jr, :k] += taa(ns, io[:, :k], 1)

        it[:, 1, :] += ch
        dt.shape = it.shape = r0.shape = -1, ch2
        o = dt.argsort(1, kind='stable')
        ns, io = (o>=ch).cumsum(1), inv_perm(o)
        aaa(r0, it[:, :ch], taa(ns, io[:, :ch], 1), 1)
        dt, it = taa(dt, o, 1), taa(it, o, 1)
        ch, ch2 = ch2, ch2<<1
    si, sj = dt.shape
    o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
    ns, io = (o>=ch).cumsum(1), inv_perm(o)
    r0[:-1, ch2-n-1:] += taa(ns, taa(io, inv_perm(it)[:-1, ch2-n-1:], 1), 1)
    return r0.ravel()[:NN-N-1:-1]

l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')

"
import numpy as np
from numpy.lib.stride_tricks import as_strided

def add_along_axis(a, indices, values, axis):
    if axis<0:
        axis += a.ndim
    I = np.ogrid[(*map(slice, a.shape),)]
    I = *I[:axis], indices, *I[axis+1:]
    a[I] += values

aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index

def inv_perm(p):
    i = np.empty_like(p)
    paa(i, p, np.arange(p.shape[-1]), -1)
    return i

def rolling_count_smaller(data, n):
    N = len(data)
    b = n.bit_length()
    NN = (((N-1)>>b)+2)<<b
    d0 = np.empty(NN, data.dtype)
    d0[:N] = data
    d0[N:] = data.max() + 1
    dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
    ch, ch2 = 1, 2
    for i in range(b-1):
        d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
        sh = dt.shape
        (il, ir), (jl, jr), (k, _) = f2m(m2f((0, 1, 0), sh) + (n-ch+1, n+1), sh)
        I = sh[0] - max(il, ir)
        bab = np.empty((I, ch2), dt.dtype)
        bab[:, :ch] = dt[:I, 1]
        IL, IR = np.s_[il:il+I, ir:ir+I]
        bab[:, ch+k:] = d0[IL, jl, k:]
        bab[:, ch:ch+k] = d0[IR, jr, :k]
        o = bab.argsort(1, kind='stable')
        ns, io = (o<ch).cumsum(1), inv_perm(o)
        r0[IL, jl, k:] += taa(ns, io[:, ch+k:], 1)
        r0[IR, jr, :k] += taa(ns, io[:, ch:ch+k], 1)
        it[:, 1, :] += ch
        dt.shape = it.shape = r0.shape = -1, ch2
        o = dt.argsort(1, kind='stable')
        ns, io = (o<ch).cumsum(1), inv_perm(o)
        aaa(r0, it[:, ch:], taa(ns, io[:, ch:], 1), 1)
        dt, it = taa(dt, o, 1), taa(it, o, 1)
        ch, ch2 = ch2, ch2<<1
    si, sj = dt.shape
    o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
    ns, io = (o<ch).cumsum(1), inv_perm(o)
    r0[1:, :n+1-ch] += taa(ns, taa(io, ch+inv_perm(it)[1:, :n+1-ch], 1), 1)
    return r0.ravel()[:N]

l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<=data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')

关于python - 在A [i]之前的100万个项目中,有多少个比A [i]小?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53673775/

10-10 08:05