我通过Relacy运行了一堆算法来验证它们的正确性,然后偶然发现了我并不真正理解的东西。这是它的简化版本:
#include <thread>
#include <atomic>
#include <iostream>
#include <cassert>
struct RMW_Ordering
{
std::atomic<bool> flag {false};
std::atomic<unsigned> done {0}, counter {0};
unsigned race_cancel {0}, race_success {0}, sum {0};
void thread1() // fail
{
race_cancel = 1; // data produced
if (counter.fetch_add(1, std::memory_order_release) == 1 &&
!flag.exchange(true, std::memory_order_relaxed))
{
counter.store(0, std::memory_order_relaxed);
done.store(1, std::memory_order_relaxed);
}
}
void thread2() // success
{
race_success = 1; // data produced
if (counter.fetch_add(1, std::memory_order_release) == 1 &&
!flag.exchange(true, std::memory_order_relaxed))
{
done.store(2, std::memory_order_relaxed);
}
}
void thread3()
{
while (!done.load(std::memory_order_relaxed)); // livelock test
counter.exchange(0, std::memory_order_acquire);
sum = race_cancel + race_success;
}
};
int main()
{
for (unsigned i = 0; i < 1000; ++i)
{
RMW_Ordering test;
std::thread t1([&]() { test.thread1(); });
std::thread t2([&]() { test.thread2(); });
std::thread t3([&]() { test.thread3(); });
t1.join();
t2.join();
t3.join();
assert(test.counter == 0);
}
std::cout << "Done!" << std::endl;
}
两个线程争先进入一个 protected 区域,最后一个线程完成修改,从无限循环中释放第三个线程。该示例有些虚构,但是原始代码需要通过标志声明该区域以发出“完成”信号。
最初,fetch_add具有acq_rel排序,因为我担心交换可能会在此之前重新排序,从而可能导致一个线程声明该标志,首先尝试执行fetch_add检查,并阻止另一个线程(通过增量检查)成功修改日程安排。在使用Relacy进行测试时,我想知道如果我从acq_rel切换到Release,是否会发生我期望发生的 Activity 锁,而令我惊讶的是,没有发生。然后,我使用轻松进行了所有操作,再次,没有 Activity 锁。
我试图在C++标准中找到与此相关的任何规则,但仅设法将它们进行了挖掘:
我是否可以始终依赖于不对RMW操作进行重新排序-即使它们影响不同的内存位置-标准中是否有任何保证这种行为的方法?
编辑:
我想出了一个更简单的设置,应该可以更好地说明我的问题。这是CppMem脚本:
int main()
{
atomic_int x = 0; atomic_int y = 0;
{{{
{
if (cas_strong_explicit(&x, 0, 1, relaxed, relaxed))
{
cas_strong_explicit(&y, 0, 1, relaxed, relaxed);
}
}
|||
{
if (cas_strong_explicit(&x, 0, 2, relaxed, relaxed))
{
cas_strong_explicit(&y, 0, 2, relaxed, relaxed);
}
}
|||
{
// Is it possible for x and y to read 2 and 1, or 1 and 2?
x.load(relaxed).readsvalue(2);
y.load(relaxed).readsvalue(1);
}
}}}
return 0;
}
我认为该工具不够完善,无法评估这种情况,尽管它似乎表明有可能。这是几乎等效的Relacy设置:
#include "relacy/relacy_std.hpp"
struct rmw_experiment : rl::test_suite<rmw_experiment, 3>
{
rl::atomic<unsigned> x, y;
void before()
{
x($) = y($) = 0;
}
void thread(unsigned tid)
{
if (tid == 0)
{
unsigned exp1 = 0;
if (x($).compare_exchange_strong(exp1, 1, rl::mo_relaxed))
{
unsigned exp2 = 0;
y($).compare_exchange_strong(exp2, 1, rl::mo_relaxed);
}
}
else if (tid == 1)
{
unsigned exp1 = 0;
if (x($).compare_exchange_strong(exp1, 2, rl::mo_relaxed))
{
unsigned exp2 = 0;
y($).compare_exchange_strong(exp2, 2, rl::mo_relaxed);
}
}
else
{
while (!(x($).load(rl::mo_relaxed) && y($).load(rl::mo_relaxed)));
RL_ASSERT(x($) == y($));
}
}
};
int main()
{
rl::simulate<rmw_experiment>();
}
断言不会被违反,因此根据Relacy不可能1和2(或相反)。
最佳答案
我还没有完全理解您的代码,但是粗体的问题有一个简单的答案:
不,你不能。非常允许在同一线程中对两个宽松的RMW进行编译时重新排序。 (我认为在大多数CPU上实际上不可能对两个RMW进行运行时重新排序。为此,ISO C++不能区分编译时与运行时。)
但是请注意,原子RMW既包含负载又包含存储,并且这两个部分必须保持在一起。因此,任何一种RMW都不能更早地通过获取操作,或以后通过发布操作。
同样,当然,作为释放和/或获取操作的RMW本身可以停止在一个或另一个方向上的重新排序。
当然,C++内存模型并不是根据对缓存相关的共享内存的本地重新排序来正式定义的,仅是与另一个线程同步并创建事前/事后关系时才正式定义的。但是,如果您忽略了IRIW重新排序(2个读取器线程不同意两个写入器线程对不同变量进行独立存储的顺序),则几乎有2种不同的方法来对同一事物建模。