本文介绍了共享内存IPC同步(无锁)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 29岁程序员,3月因学历无情被辞! 请考虑以下情况: 要求: Intel x64服​​务器(多个CPU插槽=> NUMA) Ubuntu 12,GCC 4.6 两个进程共享大量数据 内存安排在循环缓冲区(含M个元素) 程序顺序(伪代码): (生产者): int bufferPos = 0; while(true) { if(isBufferEmpty(bufferPos)) { writeData(bufferPos); setBufferFull(bufferPos); bufferPos =(bufferPos + 1)%M; } } Process B(Consumer): int bufferPos = 0; while(true) { if(isBufferFull(bufferPos)) { readData(bufferPos); setBufferEmpty(bufferPos); bufferPos =(bufferPos + 1)%M; } } 现在老问题:如何同步 使用互斥锁保护每个读/写访问 引入宽限期 ,以允许写入完成:当缓冲区(N + 3)已标记为已满(危险,但似乎工作...)时,读取缓冲区N中的数据 ? 理想情况下,我想要一个沿着内存屏障的线,确保所有以前的读/写在所有CPU ,沿着以下行: writeData(i); MemoryBarrier(); //所有写入和可见的数据,设置标志 setBufferFull(i); 这样,我只需要监视缓冲区标志,然后可以安全地读取大数据块。 一般来说,我正在寻找像Preshing这里所描述的获取/释放栅栏的东西: http://preshing.com/20130922/acquire-and-release-fences/ (如果我理解它正确的C ++ 11原子只适用于单个进程的线程,而不是多个进程。) 但是,GCC自己的内存屏障(__sync_synchronize与编译器屏障asm volatile(:::memory)结合使用)似乎不能按预期工作,因为写入变得可见 任何帮助都会感激... BTW:在windows下,这只是工作正常使用易变量(微软的具体行为)... 解决方案 Boost Interprocess支持共享内存。 Boost Lockfree拥有单生产者单用户队列类型( spsc_queue )。这是基本上你称为循环缓冲区。 这是一个演示,传递IPC消息(在这种情况下,类型 string )使用此队列,以无锁方式。 定义类型 首先,我们定义类型: 命名空间bip = boost :: interprocess; namespace shm { template< typename T> using alloc = bip :: allocator< T,bip :: managed_shared_memory :: segment_manager> ;; 使用char_alloc = alloc< char> ;; using shared_string = bip :: basic_string< char,std :: char_traits< char>,char_alloc> ;; using string_alloc = alloc< shared_string> ;; 使用ring_buffer = boost :: lockfree :: spsc_queue< shared_string, boost :: lockfree :: capacity< 200> //或者,pass // boost :: lockfree :: allocator< string_alloc> > ;;为了简单起见,我选择演示运行时大小 } $ b shared_string typedef定义一个字符串,它将从共享内存段中透明地分配,因此它们也与其他进程神奇地共享。 这是最简单的,因此: int main { //创建段和相应的分配器 bip :: managed_shared_memory段(bip :: open_or_create,MySharedMemory,65536); shm :: string_alloc char_alloc(segment.get_segment_manager()); shm :: ring_buffer * queue = segment.find_or_construct< shm :: ring_buffer>(queue)(); 这将打开共享内存区域,如果共享队列存在, 注意。应该在现实生活中同步。 现在进行实际演示: while(true) { std :: this_thread :: sleep_for(std :: chrono :: milliseconds(10)); shm :: shared_string v(char_alloc); if(queue-> pop(v)) std :: cout<< Processed:'< v<< '\\\; } 消费者无限地监视队列中的待处理作业,并处理一个〜10ms > p> int main() { bip :: managed_shared_memory segment(bip :: open_or_create,MySharedMemory 65536); shm :: char_alloc char_alloc(segment.get_segment_manager()); shm :: ring_buffer * queue = segment.find_or_construct< shm :: ring_buffer>(queue)(); 再次,向初始化阶段添加适当的同步。此外,你可能使生产者负责在适当的时间释放共享内存段。在这个演示中,我只是让它挂。 for(const char * s:{hello world,答案是42,你的毛巾在哪里}) { std :: this_thread :: sleep_for (std :: chrono :: milliseconds(250)); queue-> push({s,char_alloc}); } } 右键,制作者产生 请注意,因此,如果我们这样做(假设一个带有作业控制的POSIX shell): ./ producer& ./producer& ./producer& wait ./consumer& 将立即打印3x3消息,做 ./ producer& ./producer& ./producer&在此之后,再次显示涓流的消息涓流(在3的突发在〜〜),然后再次显示消息涓流。( 请参阅本提示中的完整代码在线:https://gist.github.com/sehe/9376856 Consider the following scenario:Requirements:Intel x64 Server (multiple CPU-sockets => NUMA)Ubuntu 12, GCC 4.6Two processes sharing large amounts of data over (named) shared-memoryClassical producer-consumer scenarioMemory is arranged in a circular buffer (with M elements)Program sequence (pseudo code):Process A (Producer):int bufferPos = 0;while( true ){ if( isBufferEmpty( bufferPos ) ) { writeData( bufferPos ); setBufferFull( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; }}Process B (Consumer):int bufferPos = 0;while( true ){ if( isBufferFull( bufferPos ) ) { readData( bufferPos ); setBufferEmpty( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; }}Now the age-old question: How to synchronize them effectively!?Protect every read/write access with mutexesIntroduce a "grace period", to allow writes to complete: Read data in buffer N, when buffer(N+3) has been marked as full (dangerous, but seems to work...)?!?Ideally I would like something along the lines of a memory-barrier, that guarantees that all previous reads/writes are visible across all CPUs, along the lines of:writeData( i );MemoryBarrier();//All data written and visible, set flagsetBufferFull( i );This way, I would only have to monitor the buffer flags and then could read the large data chunks safely.Generally I'm looking for something along the lines of acquire/release fences as described by Preshing here:http://preshing.com/20130922/acquire-and-release-fences/(if I understand it correctly the C++11 atomics only work for threads of a single process and not along multiple processes.)However the GCC-own memory barriers (__sync_synchronize in combination with the compiler barrier asm volatile( "" ::: "memory" ) to be sure) don't seem to work as expected, as writes become visible after the barrier, when I expected them to be completed.Any help would be appreciated...BTW: Under windows this just works fine using volatile variables (a Microsoft specific behaviour)... 解决方案 Boost Interprocess has support for Shared Memory.Boost Lockfree has a Single-Producer Single-Consumer queue type (spsc_queue). This is basically what you refer to as a circular buffer.Here's a demonstration that passes IPC messages (in this case, of type string) using this queue, in a lock-free fashion.Defining the typesFirst, let's define our types:namespace bip = boost::interprocess;namespace shm{ template <typename T> using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>; using char_alloc = alloc<char>; using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc >; using string_alloc = alloc<shared_string>; using ring_buffer = boost::lockfree::spsc_queue< shared_string, boost::lockfree::capacity<200> // alternatively, pass // boost::lockfree::allocator<string_alloc> >;}For simplicity I chose to demo the runtime-size spsc_queue implementation, randomly requesting a capacity of 200 elements.The shared_string typedef defines a string that will transparently allocate from the shared memory segment, so they are also "magically" shared with the other process.The consumer sideThis is the simplest, so:int main(){ // create segment and corresponding allocator bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::string_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();This opens the shared memory area, locates the shared queue if it exists. NOTE This should be synchronized in real life.Now for the actual demonstration:while (true){ std::this_thread::sleep_for(std::chrono::milliseconds(10)); shm::shared_string v(char_alloc); if (queue->pop(v)) std::cout << "Processed: '" << v << "'\n";}The consumer just infinitely monitors the queue for pending jobs and processes one each ~10ms.The Producer sideThe producer side is very similar:int main(){ bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::char_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();Again, add proper synchronization to the initialization phase. Also, you would probably make the producer in charge of freeing the shared memory segment in due time. In this demonstration, I just "let it hang". This is nice for testing, see below.So, what does the producer do? for (const char* s : { "hello world", "the answer is 42", "where is your towel" }) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); queue->push({s, char_alloc}); }}Right, the producer produces precisely 3 messages in ~750ms and then exits.Note that consequently if we do (assume a POSIX shell with job control):./producer& ./producer& ./producer&wait./consumer&Will print 3x3 messages "immediately", while leaving the consumer running. Doing./producer& ./producer& ./producer&again after this, will show the messages "trickle in" in realtime (in burst of 3 at ~250ms intervals) because the consumer is still running in the backgroundSee the full code online in this gist: https://gist.github.com/sehe/9376856 这篇关于共享内存IPC同步(无锁)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
08-04 04:56
查看更多