所以我得到了n个异步的,带时间戳的数据流。每个流都有一个固定的ish速率。我想处理所有数据,但关键是我必须处理数据,以便尽可能接近数据到达的时间(这是一个实时流应用程序)。
到目前为止,我的实现是创建一个由k条消息组成的固定窗口,我使用优先级队列按时间戳进行排序。然后,我按顺序处理整个队列,然后再转到下一个窗口。这是可以的,但它不太理想,因为它会创建与缓冲区大小成比例的延迟,并且如果消息在缓冲区结束后到达,有时还会导致丢弃消息看起来像这样:
// Priority queue keeping track of the data in timestamp order.
ThreadSafeProrityQueue<Data> q;
// Fixed buffer size
int K = 10;
// The last successfully processed data timestamp
time_t lastTimestamp = -1;
// Called for each of the N data streams asyncronously
void receiveAsyncData(const Data& dat) {
q.push(dat.timestamp, dat);
if (q.size() > K) {
processQueue();
}
}
// Process all the data in the queue.
void processQueue() {
while (!q.empty()) {
const auto& data = q.top();
// If the data is too old, drop it.
if (data.timestamp < lastTimestamp) {
LOG("Dropping message. Too old.");
q.pop();
continue;
}
// Otherwise, process it.
processData(data);
lastTimestamp = data.timestamp;
q.pop();
}
}
关于数据的信息:它们保证在自己的流中被排序它们的频率在5到30赫兹之间它们由图像和其他数据组成。
一些例子说明了为什么这比看上去更难假设我有两个流,a和b都以1hz的频率运行,然后按以下顺序获取数据:
(stream, time)
(A, 2)
(B, 1.5)
(A, 3)
(B, 2.5)
(A, 4)
(B, 3.5)
(A, 5)
看看如果我按收到数据的先后顺序处理数据,B总是会被丢弃吗这是我想要避免的,现在在我的算法中,b每10帧就会被删除,我会以10帧的延迟处理数据到过去。
最佳答案
我建议采用生产者/消费者结构。让每个流将数据放入队列,并有一个单独的线程读取队列即:
// your asynchronous update:
void receiveAsyncData(const Data& dat) {
q.push(dat.timestamp, dat);
}
// separate thread that processes the queue
void processQueue()
{
while (!stopRequested)
{
data = q.pop();
if (data.timestamp >= lastTimestamp)
{
processData(data);
lastTimestamp = data.timestamp;
}
}
}
这可以防止处理批处理时在当前实现中看到的“延迟”。
processQueue
函数在一个单独的持久线程中运行。stopRequested
是程序要关闭时设置的一个标志——强制线程退出。有些人会为此使用volatile
标志。我更喜欢使用手动重置事件之类的东西。要使此工作正常,您需要一个允许并发更新的优先级队列实现,或者需要用同步锁包装队列特别是,要确保当队列为空时
q.pop()
等待下一个项目。或者当队列为空时从不调用q.pop()
。我不知道你的文章的具体内容,所以我不能确切地说你是怎么写的。时间戳检查仍然是必要的,因为后面的项可能在前面的项之前处理。例如:
从数据流1接收到的事件,但在将线程添加到队列之前将其交换出去。
从数据流2接收并添加到队列中的事件。
数据流2中的事件由
ThreadSafePriorityQueue
函数从队列中移除。上面步骤1中的线程获取另一个时间片,并将项添加到队列中。
这并不罕见,只是不常见。时间差通常是微秒量级的。
如果你经常得到不正常的更新,那么你可以引入一个人为的延迟。例如,在更新的问题中,您将显示500毫秒前出现无序的消息假设500毫秒是您想要支持的最大公差。也就是说,如果一条消息迟到超过500毫秒,那么它将被丢弃。
你要做的是当你把事情添加到优先级队列时,在时间戳上加上500毫秒。即:
q.push(AddMs(dat.timestamp, 500), dat);
在处理事情的循环中,在时间戳之前不要对事情进行出列。类似于:
while (true)
{
if (q.peek().timestamp <= currentTime)
{
data = q.pop();
if (data.timestamp >= lastTimestamp)
{
processData(data);
lastTimestamp = data.timestamp;
}
}
}
这会在处理所有项目时引入500毫秒的延迟,但会防止丢弃在500毫秒阈值内的“延迟”更新。你必须平衡你对“实时”更新的渴望和防止删除更新的渴望。