我需要一种环形缓冲区(或一些更合适的数据结构,以防)和一种算法/模式,用于在以下情况下处理环形缓冲区。
连续产生实时数据的写入程序必须始终能够写入第一个空闲的“插槽”(不在读取过程中的插槽),或者等待一个插槽空闲时写入。每当写入程序完成将数据写入一个插槽时,它就可以为读卡器“提交”该插槽。
在给定的时间内,可能有n个并发读卡器。每个读卡器在发出读取请求时,应始终从环形缓冲区中最后提交的插槽中获取最新的写入数据,但不能多次读取同一数据。如果自上次读取以来,没有新的数据被写入和提交到一个槽中,那么读取器应该在等待(考虑一个快速读取器)。
注意,一个读卡器不能为另一个读卡器“使用”数据。换句话说,两个不同的读卡器可以读取相同的数据同样,一个读卡器可以从同一个插槽读取数据两次或更多次,但前提是该读卡器在两个读取请求之间写入该插槽。
请注意,干扰器可能不足以满足我的情况(或我只是未能使它工作,因为我想)。干扰器的问题是,写入程序可能前进得太快(与速度较慢的读卡器相比),以至于在读卡过程中可能会覆盖某些插槽。在这种情况下,编写器应该能够跳过这些“忙”时隙,直到找到第一个空闲时隙(一旦编写,它也必须只发布该时隙),但是disruptor模式似乎没有考虑到这种情况。序列本身还有一个问题,在我正在使用的disruptor实现中,它是一个原子整数,因此它可能溢出,导致一些未定义的行为。
你知道吗?如果你知道的话,我会欣赏现代C++中的解决方案。
最佳答案
我不知道这个问题的现成解决方案(注意:这并不意味着没有现成的解决方案!)但如果我尝试使用c++11工具来实现这一点,我会看到如下内容:
保存数据的数组
一个std::堆栈,用于保存已写入元素的索引
一个std::队列,用于保存已读取元素的索引
控制堆栈访问的互斥锁。因此,读写器在弹出/推送到堆栈之前获取互斥锁。
用于控制对队列的访问的互斥锁。因此,读写器在排队/出列到队列之前获取互斥锁。
当堆栈为空时,供读取器阻塞的读取器条件变量
队列为空时写入程序要阻止的写入程序条件变量。
开始将所有索引(环形缓冲区的大小)排队到队列中。
writer获取索引并写入数组,然后将索引推送到堆栈上并通知读卡器。
读取器将索引从堆栈中弹出,并在完成后将它们排队到队列中并通知编写器。
空堆栈使读卡器等待读卡器,空队列使读卡器等待读卡器。
A nice article on the C++11 concurrency features.
根据进一步的思考编辑这个建议试图做的是将数组中的插槽视为一个资源池。读取器获取资源、插槽的索引,并在完成处理后将其返回。
更新的解决方案:因为时隙的数量限制在8以上,所以我建议您尽可能地保持这个解决方案。使每个插槽对自己的状态负责:
#include <atomic>
template <typename T>
class Slot {
atomic_uint readers ;
int iteration ;
T data ;
}
然后创建一个std:vector的插槽,并使读写器在该向量上迭代。
写入程序需要找到一个插槽,其中
readers == 0
(可能最小值为iteration
),当它找到时,需要减少读卡器计数,然后检查读卡器计数是否为-1,以确保读卡器不会在if和减量之间开始读取。如果是,则重新增加readers
并重新开始。读者在数组中迭代寻找最大的
iteration
值,它们尚未读取,并且在哪里readers >= 0
。然后,他们重新检查readers
的值,如果编写器没有将其设置为-1,他们将其递增并开始读取。如果写入程序已开始写入该插槽,则读取器将重新开始。假设作者已将某个插槽标识为空闲,并且读者已决定需要读取该插槽,则有两个可供选择的顺序,并且在读写器中都返回并重试。
Execution Writer Reader
1 readers == 0
2 readers >= 0
3 readers--
4 readers++
5 readers == -1 is false
6 readers > 0 is false!
7 readers++
8 readers--
Execution Writer Reader
1 readers >= 0
2 readers == 0
3 readers++
4 readers--
5
6 readers == -1 is false
7 readers > 0 is false
8 readers++
9 readers--
如果a Reader和Writer确实以这种方式发生冲突,则两者都无法访问,都需要重试。
如果您对这种方法不满意,那么可以使用互斥锁和lock/try_锁,类似于Slot类中的内容(注意:目前无法通过编译器运行,因此可能有语法错误)
typedef enum LOCK_TYPE {READ, WRITE} ;
std::mutex mtx ;
bool lockSlot(LOCK_TYPE lockType)
{
bool result = false ;
if (mtx.try_lock()) {
if ((lockType == READ) && (readers >= 0))
readers++ ;
if ((lockType == WRITE) && (readers == 0))
readers-- ;
result = true ;
}
return result ;
}
void unlockSlot (LOCK_TYPE lockType)
{
if (mtx.lock()) { // Wait for a lock this time
if (lockType == READ)
readers-- ;
if (lockType == WRITE)
readers++ ;
}
}
当/如果读卡器用完工作时,它可以等待一个条件变量,当有新数据可用时,编写器可以使用该变量来通知所有读卡器。