让A成为一个5到2千万的核一维数组。
我想确定,对于每个i
,A[i-1000000], A[i-999999], ..., A[i-2], A[i-1]
中有多少项小于A[i]
。
。
我测试了各种方法,在Rolling comparison between a value and a past window, with percentile/quantile中给出了一些答案:
import numpy as np
A = np.random.random(5*1000*1000)
n = 1000*1000
B = (np.lib.stride_tricks.as_strided(A, shape=(n,A.size-n), strides=(A.itemsize,A.itemsize)) <= A[n:]).sum(0)
#or similar version with "view_as_windows(A, n)"
最后最快的解决方案是一些简单的代码+numba:
from numba import jit, prange
@jit(parallel=True)
def doit(A, n):
Q = np.zeros(len(A))
for i in prange(n, len(Q)):
Q[i] = np.sum(A[i-n:i] <= A[i])
return(Q)
C = doit(A, n)
但即使有了这段代码,对于我来说它也太慢了,长度为500万,n=100万:大约30分钟就可以完成这个计算了!
有没有一个更聪明的想法可以使用,避免对输出的每个元素重新比较100万个条目?
注:具有近似10的(3)精度的比例,比如“1-百万个项目中的34.3%个小于一个[i]”就足够了。
最佳答案
。。
。我不确定能不能使它更容易阅读。
基本思想是将数组划分成类似二叉树的东西(抱歉,没有正式的scicomp训练)。
。
。
。原则上,在每次迭代中,排序的成本应该保持线性,因为在理论上(而不是在numpy中),我们可以充分利用较小块的排序性。
。。。
Anyway, here is a sample run
size 5_000_000, lookback 1_000_000 -- took 14.593 seconds
seems correct -- 10_000 samples checked
and the code:
更新:代码简化了一点,运行速度也更快了
UPDATE 2: added a version that does ""
import numpy as np
from numpy.lib.stride_tricks import as_strided
def add_along_axis(a, indices, values, axis):
if axis<0:
axis += a.ndim
I = np.ogrid[(*map(slice, a.shape),)]
I = *I[:axis], indices, *I[axis+1:]
a[I] += values
aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index
def inv_perm(p):
i = np.empty_like(p)
paa(i, p, np.arange(p.shape[-1]), -1)
return i
def rolling_count_smaller(data, n):
N = len(data)
b = n.bit_length()
NN = (((N-1)>>b)+2)<<b
d0 = np.empty(NN, data.dtype)
d0[NN-N:] = data[::-1]
d0[:NN-N] = data.max() + 1
dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
ch, ch2 = 1, 2
for i in range(b-1):
d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
sh = dt.shape
(il, ir), (jl, jr), (k, _) = f2m(m2f(np.add(sh, (-1, -2, -1)), sh) - (n, n-ch), sh)
I = min(il, ir) + 1
bab = np.empty((I, ch2), dt.dtype)
bab[:, ch:] = dt[sh[0]-I:, 0]
IL, IR = np.s_[il-I+1:il+1, ir-I+1:ir+1]
bab[:, k:ch] = d0[IL, jl, k:]
bab[:, :k] = d0[IR, jr, :k]
o = bab.argsort(1, kind='stable')
ns, io = (o>=ch).cumsum(1), inv_perm(o)
r0[IL, jl, k:] += taa(ns, io[:, k:ch], 1)
r0[IR, jr, :k] += taa(ns, io[:, :k], 1)
it[:, 1, :] += ch
dt.shape = it.shape = r0.shape = -1, ch2
o = dt.argsort(1, kind='stable')
ns, io = (o>=ch).cumsum(1), inv_perm(o)
aaa(r0, it[:, :ch], taa(ns, io[:, :ch], 1), 1)
dt, it = taa(dt, o, 1), taa(it, o, 1)
ch, ch2 = ch2, ch2<<1
si, sj = dt.shape
o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
ns, io = (o>=ch).cumsum(1), inv_perm(o)
r0[:-1, ch2-n-1:] += taa(ns, taa(io, inv_perm(it)[:-1, ch2-n-1:], 1), 1)
return r0.ravel()[:NN-N-1:-1]
l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')
"
import numpy as np
from numpy.lib.stride_tricks import as_strided
def add_along_axis(a, indices, values, axis):
if axis<0:
axis += a.ndim
I = np.ogrid[(*map(slice, a.shape),)]
I = *I[:axis], indices, *I[axis+1:]
a[I] += values
aaa, taa, paa = add_along_axis, np.take_along_axis, np.put_along_axis
m2f, f2m = np.ravel_multi_index, np.unravel_index
def inv_perm(p):
i = np.empty_like(p)
paa(i, p, np.arange(p.shape[-1]), -1)
return i
def rolling_count_smaller(data, n):
N = len(data)
b = n.bit_length()
NN = (((N-1)>>b)+2)<<b
d0 = np.empty(NN, data.dtype)
d0[:N] = data
d0[N:] = data.max() + 1
dt, it, r0 = d0.copy(), np.zeros(NN, int), np.zeros(NN, int)
ch, ch2 = 1, 2
for i in range(b-1):
d0.shape = dt.shape = it.shape = r0.shape = -1, 2, ch
sh = dt.shape
(il, ir), (jl, jr), (k, _) = f2m(m2f((0, 1, 0), sh) + (n-ch+1, n+1), sh)
I = sh[0] - max(il, ir)
bab = np.empty((I, ch2), dt.dtype)
bab[:, :ch] = dt[:I, 1]
IL, IR = np.s_[il:il+I, ir:ir+I]
bab[:, ch+k:] = d0[IL, jl, k:]
bab[:, ch:ch+k] = d0[IR, jr, :k]
o = bab.argsort(1, kind='stable')
ns, io = (o<ch).cumsum(1), inv_perm(o)
r0[IL, jl, k:] += taa(ns, io[:, ch+k:], 1)
r0[IR, jr, :k] += taa(ns, io[:, ch:ch+k], 1)
it[:, 1, :] += ch
dt.shape = it.shape = r0.shape = -1, ch2
o = dt.argsort(1, kind='stable')
ns, io = (o<ch).cumsum(1), inv_perm(o)
aaa(r0, it[:, ch:], taa(ns, io[:, ch:], 1), 1)
dt, it = taa(dt, o, 1), taa(it, o, 1)
ch, ch2 = ch2, ch2<<1
si, sj = dt.shape
o = as_strided(dt, (si-1, sj<<1), dt.strides).argsort(1, kind='stable')
ns, io = (o<ch).cumsum(1), inv_perm(o)
r0[1:, :n+1-ch] += taa(ns, taa(io, ch+inv_perm(it)[1:, :n+1-ch], 1), 1)
return r0.ravel()[:N]
l = 1000
data = np.random.randint(-99, 100, (5*l,))
from time import perf_counter as pc
t = pc()
x = rolling_count_smaller(data, l)
t = pc() - t
print(f'size {data.size:_d}, lookback {l:_d} -- took {t:.3f} seconds')
check = 1000
sample = np.random.randint(0, len(x), check)
y = np.array([np.count_nonzero(data[max(0, i-l):i]<=data[i]) for i in sample])
assert np.all(y==x[sample])
print(f'seems correct -- {check:_d} samples checked')
关于python - 在A [i]之前的100万个项目中,有多少个比A [i]小?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53673775/