我正在考虑如何有效地从蒸汽中每n个数据中采样一个数据,n是动态的,它将随时间变化并等于Total data sent / capacity of the buffer

我有一个缓冲区,它可以包含10,000个点,并且数据源是一个流,它将每次都向该缓冲区发送一个点。如果已发送的原始数据总量为20,000,则缓冲区中的点应为2,4,6,8,10,... 20,000(index),只需将20,000点缩放为10,000个插槽。采样点的索引不必精确地为2,4,6,但是在这种情况下,两个索引之间的间隔平均应该为2。

因为数据点的总量在变化,所以如果我每次发送新数据点时都进行采样,则性能会很慢(每次从N个点中拾取10,000个点,并且N随时间增加)。我想知道是否有更好的算法可以减少计算量,但仍然保持较高的采样精度?

*我尝试使用概率来解决此问题,它可以做类似的事情,但结果不准确。所以我不知道如何实现我的目标。

from time import sleep

samplingCout = 10

reservoir1 = []
reservoir2 = []
reservoir3 = []
avg = []

count = 0

def sample(arr, data):
    global count
    count += 1
    if len(arr) < samplingCout:
        arr.append(data)
    else:
        if randint(0, int(count / samplingCout)) == int(count / samplingCout):
            index = randint(0, samplingCout - 1)
            sleep(0.001)
            del arr[0]
            arr.append(data)

for i in range(1, 1000):
    sample(reservoir1,i)
    sample(reservoir2,i)
    sample(reservoir3,i)

for i in range(0, samplingCout):
    avg.append(int((reservoir1[i] + reservoir2[i] + reservoir3[i])/3))

print(avg)


谢谢。

最佳答案

看起来您需要Reservoir Sampling-从未知事件流中公平采样固定大小的缓冲区

一些简单的代码

import numpy as np
import random

x = 0
N = 20000
EOS = -1

def next_event():
    global x
    global N
    global EOS
    q = x
    x += 1

    if x == N:
        return EOS
    return q

def sample(count):

    sampled = list()

    index = 0
    q = next_event()
    while q >= 0:
        if index < count:
            sampled.append(q)
        else:
            r = random.randint(0, index)
            if r < count:
                    sampled[r] = q

        index += 1
        q = next_event()

    sampled.sort()
    return sampled

if __name__ == "__main__":

    random.seed(32345)
    res = sample(10000)
    diffs = np.array(res[1:]) - np.array(res[:-1])
    print(np.mean(diffs))


备注:


代码适用于整数,如果您有复杂的记录,则存储(索引,记录)的元组
按索引排序,应该便宜
我计算出样本之间距离的平均值,结果为1.9997999799979997。

10-01 20:55
查看更多