假设有一个像std::vector这样的数据结构和一个初始化为零的全局变量int syncToken。
还给出了作为读取器/写入器的两个线程,为什么以下(伪)代码有效(无效)?
void reader_thread(){
while(1){
if(syncToken!=0){
while(the_vector.length()>0){
// ... process the std::vector
}
syncToken = 0; // let the writer do it's work
}
sleep(1);
}
}
void writer_thread(){
while(1){
std::string data = waitAndReadDataFromSomeResource(the_resource);
if(syncToken==0){
the_vector.push(data);
syncToken = 1; // would syncToken++; be a difference here?
}
// drop data in case we couldn't write to the vector
}
}
尽管这段代码不是(时间)高效的,但据我所知,代码是有效的,因为两个线程仅以不会导致未定义行为的方式在全局变量值上进行同步。唯一的问题可能是在同时使用 vector 时发生的,但不应发生这种情况,因为仅在零和一之间作为同步值进行了切换,对吗?
更新
由于我犯了只问是/否问题的错误,因此我将问题更新为为什么希望得到一个非常具体的案例作为答案。
似乎问题本身也根据答案得出了错误的图画,因此我将详细说明上述代码对我的问题/问题的影响。
在此之前,我想指出的是,我要一个特定的用例/示例/证明/详细说明,以确切说明不同步的地方。即使是让示例计数器表现为非单调递增的C示例代码,也只会回答是/否的问题,而不是为什么!
我对为什么感兴趣。因此,如果您提供一个示例来说明它有问题,我会对原因感兴趣。
根据(我)的定义,上述代码应在且仅当if语句中的代码(不包括if块底部的syncToken分配)只能在给定时间由这两个给定线程之一准确执行时,命名为同步。
基于这种想法,我正在寻找一个可能基于汇编程序的示例,其中两个线程同时执行if块-这意味着它们不同步或不同步。
作为引用,让我们看一下gcc生成的汇编代码的相关部分:
; just the declaration of an integer global variable on a 64bit cpu initialized to zero
syncToken:
.zero 4
.text
.globl main
.type main, @function
; writer (Cpu/Thread B): if syncToken == 0, jump not equal to label .L1
movl syncToken(%rip), %eax
testl %eax, %eax
jne .L1
; reader (Cpu/Thread A): if syncToken != 0, jump to Label L2
movl syncToken(%rip), %eax
testl %eax, %eax
je .L2
; set syncToken to be zero
movl $0, syncToken(%rip)
现在我的问题是,我看不出为什么这些指令可能不同步。
假设两个线程都在自己的CPU内核上运行,例如线程A在内核A上运行,线程B在内核B上运行。初始化是全局的,并且在两个线程开始执行之前完成,因此我们可以忽略初始化,并假设两个线程都以syncToken =开始0;
例:
老实说,我已经构建了一个效果很好的示例,但是它说明了我没有看到为什么变量不同步以使两个线程同时执行if块的方法。
我的观点是:尽管上下文切换将导致%eax与RAM中的syncToken的实际值不一致,但是代码应该做正确的事情,并且如果不是允许运行的唯一线程,则不要执行if块它。
更新2
可以假设syncToken将仅在所示的代码中使用。不允许任何其他函数(如waitAndReadDataFromSomeResource)以任何方式使用它
更新3
让我们更进一步,提出一个稍微不同的问题:是否可以使用int syncToken同步两个线程,一个读取器和一个写入器,从而通过并发执行if块而不会一直保持线程不同步?如果是的话-这很有趣^^
如果没有-为什么?
最佳答案
简短答案:否,此示例未正确同步,因此(始终)无法正常工作。
对于软件,通常可以理解,有时但并非总是正常工作与坏事是一样的。现在,您可能会问类似“在优化级别为-O0的带有XYZ编译器的ACME品牌32位微 Controller 上使中断 Controller 与前台任务同步的这项工作,”答案肯定是肯定的。但是在一般情况下,答案是否定的。实际上,在任何实际情况下执行此操作的可能性都很低,因为“使用STL”和“足够简单的硬件和编译器才能正常工作”的交集可能为空。
正如其他评论/回答所述,从技术上讲,它也是未定义行为(UB)。真正的实现是免费的,也可以使UB正常工作。因此,仅由于它不是“标准”,它仍然可以工作,但它并不严格符合标准或不可移植。它是否有效取决于具体情况,很大程度上取决于处理器和编译器,也许还取决于操作系统。
有效的方法
就像您的(代码)注释所暗示的那样,很可能会删除数据,因此可以认为这是故意的。此示例的性能很差,因为仅在添加,删除或测试数据时才需要“锁定” vector 。但是reader_thread()拥有 vector ,直到完成测试,删除和处理所有项目为止。这比期望的时间更长,因此丢弃数据的可能性比其他情况要高。
但是,只要变量访问是同步的并且语句以“天真”的程序顺序发生,逻辑就似乎是正确的。直到它“拥有”它(syncToken == 0),writer_thread()才访问该 vector 。同样,reader_thread()直到拥有它为止才访问该 vector (syncToken == 1)。即使没有原子写入/读取操作(例如,这是一台16位计算机,syncToken是32位),这仍然会
“工作”。
注1:模式if(flag){... flag = x}是非原子测试和设置。通常这是一个竞赛条件。但是在这种非常特殊的情况下,那场比赛是回避的。通常(例如,一个以上的读者或作家)这也是一个问题。
注意2:与之相比,syncToken++不太可能是原子的,而不是syncToken =1。通常,这是不当行为的另一个领头羊,因为它涉及读-修改-写。在这种特定情况下,应该没有区别。
出了什么问题
正如其他评论/答案提到的那样,缓存的可能性可能是个问题。但是对于普通系统内存中的大多数体系结构而言,事实并非如此。硬件将通过诸如MESI之类的缓存一致性协议(protocol)来确保所有处理器上的所有缓存都保持一致性。如果将syncToken写入处理器P1上的L1高速缓存,则当P2尝试访问同一位置时,硬件将确保在P2加载之前清空来自P1的脏高速缓存行。因此,对于普通的高速缓存一致性系统内存,这可能是“确定”。
但是,如果写入到设备或IO内存中的缓存和缓冲区未自动同步的情况下,这种情况并非完全难以实现。例如,需要PowerPC EIEIO instruction来同步外部总线存储器,并且PCI发布的写操作可能会被网桥缓冲,并且必须以编程方式进行刷新。如果 vector 或syncToken都没有存储在普通的高速缓存一致性系统内存中,这也可能导致同步问题。
the_vector.push(data)
和syncToken = 1
没有依赖关系,因此可以自由地先移动syncToken = 1
。显然,这通过允许reader_thread()与writer_thread()同时与vector混淆来破坏事情。仅仅将syncToken声明为volatile也是不够的。仅保证对 volatile 访问与其他 volatile 访问进行排序,但不能保证在 volatile 和非 volatile 访问之间进行排序。因此,除非 vector 也易失,否则这仍然是一个问题。由于vector可能是STL类,因此声明它为volatile甚至还不可行。
the_vector.push(data)
编译成的最后一条指令与syncToken = 1
之间没有依赖关系,因此处理器可以在movl $0x1, syncToken(%rip)
的其他指令完成之前决定执行the_vector.push(data)
,例如,保存新的length字段。这与汇编语言操作码的顺序无关。通常,CPU知道指令#3取决于指令#1的结果,因此它知道必须在#1之后执行#3。也许#2指令与这两个指令都不相关,并且可能在它们之前或之后。此调度根据当前可用的CPU资源在运行时动态发生。
出问题的是,访问the_vector的指令和访问syncToken的指令之间没有明确的依赖关系。但是程序仍然隐式要求对它们进行排序以进行正确的操作。 CPU无法知道这一点。
防止重新排序的唯一方法是使用内存隔离栅,屏障或其他特定于特定CPU的同步指令。例如,可以在触摸the_vector和syncToken之间插入英特尔
mfence
指令或PPC sync
。对于CPU型号和情况,只是哪一条指令或一系列指令以及需要将它们放置在何处。 最终,使用“适当的”同步原语会容易得多。同步库调用还可以处理将编译器和CPU障碍放置在正确的位置。此外,如果您执行以下操作,它的性能会更好,并且不需要删除数据(尽管sleep(1)仍然是躲闪的-使用条件变量或信号量更好):
void reader_thread(){
while(1){
MUTEX_LOCK()
if(the_vector.length()>0){
std::string data = the_vector.pop();
MUTEX_UNLOCK();
// ... process the data
} else {
MUTEX_UNLOCK();
}
sleep(1);
}
}
void writer_thread(){
while(1){
std::string data = waitAndReadDataFromSomeResource(the_resource);
MUTEX_LOCK();
the_vector.push(data);
MUTEX_UNLOCK();
}
}