本文介绍了具有 pthreads 和锁且没有 boost 的单读取器多写入器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑下一段代码.

#include <iostream>
#include <vector>
#include <map>

using namespace std;

map<pthread_t,vector<int>> map_vec;
vector<pair<pthread_t ,int>> how_much_and_where;

pthread_cond_t CV = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* writer(void* args)
{
    while(*some condition*)
    {
        int howMuchPush = (rand() % 5) + 1;
        for (int i = 0; i < howMuchPush; ++i)
        {
            // WRITE
            map_vec[pthread_self()].push_back(rand() % 10);
        }

        how_much_and_where.push_back(make_pair(pthread_self(), howMuchPush));
        // Wake up the reader - there's something to read.
        pthread_cond_signal(&CV);
    }

    cout << "writer thread: " <<  pthread_self()  << endl;
    return nullptr;
}

void* reader(void* args) {

    pair<pthread_t, int> to_do;

    pthread_cond_wait(&CV, &mutex);
    while(*what condition??*)
    {
        to_do = how_much_and_where.front();
        how_much_and_where.erase(how_much_and_where.begin());

        // READ
        cout << to_do.first << " wrote " << endl;
        for (int i = 0; i < to_do.second; i++)
        {
            cout << map_vec[to_do.first][i] << endl;
        }

        // Done reading. Go to sleep.
        pthread_cond_wait(&CV, &mutex);
    }

    return nullptr;
}

//----------------------------------------------------------------------------//


int main()
{
    pthread_t threads[4];

    // Writers
    pthread_create(&threads[0], nullptr, writer, nullptr);
    pthread_create(&threads[1], nullptr, writer, nullptr);
    pthread_create(&threads[2], nullptr, writer, nullptr);
    // reader
    pthread_create(&threads[4], nullptr, reader, nullptr);


    pthread_join(threads[0], nullptr);
    pthread_join(threads[1], nullptr);
    pthread_join(threads[2], nullptr);
    pthread_join(threads[3], nullptr);

    return 0;
}

背景

每个作者都有自己的容器来写入数据.假设有一个读者知道作者何时完成写入数据块,以及该块的大小是多少(读者有一个容器,作者可以将这些数据写入成对).

Every writer have his own container to which he writes data.And suppose that there's a reader who knows when a writer finished writing chunk of data, and what is the size of that chunk (The reader has a container to which writers write pairs of this data).

问题

  • 显然我应该锁定共享源 - map_vechow_much_and_where.但我不明白什么,在这种情况下,是 -有效在此资源上定位锁的方法(例如,在 for 循环中的每个 push_back 之前锁定 map_vec?或者可能在for 循环 - 但是推送到队列不是一种浪费且耗时的操作,这可能会导致读者等待太多?)/安全定位锁以防止死锁的方法.
  • 我不明白什么是正确的条件while 循环 - 我想也许只要 how_much_and_where 是不是空的,但很明显,在作者添加一对之前,读者可能会清空 how_much_and_where.
  • 假设一个作者发送了一个信号,而读者正忙着阅读一些数据.据我所知,这个信号将被忽略,并且作者推送的配对可能永远不会被处理(信号的#of收到并处理<#of对\读者的任务).我怎样才能防止这种情况发生?
  • Obviously i should put locks on the shared sources - map_vec and how_much_and_where. But i don't understand what ,in this case, is the -efficent way to to position locks on this resources (For example, locking map_vec before every push_back in the for loop? Or maybe lock it before the for loop - But isn't pushing to a queue is a wasteful and long operation, that may cause the reader to wait too much?) /safe way to position locks in order to prevent deadlocks.
  • I don't understand what is the right condition that should be in thewhile loop - i thought that maybe as long as how_much_and_where isnot empty, but obviously a situation in which the reader emptied how_much_and_where right before a writer added a pair may accour.
  • Suppose a writer sent a signal while the reader was busy reading somedata. As far as i understand, this signal will be ignored, and thepair the which the writer pushed, may never be dealt with (#of of signalsreceived and dealt with < #of pairs\tasks for the reader). How can iprevent such scenario?

推荐答案

为了简化事情,我们应该将通用/可重用生产者-消费者队列(或简称为阻塞队列")的实现解耦正如我通常所说的那样)来自实际生产者和消费者(不是通用/可重用的 - 它们特定于您的程序)的实现.从设计的角度来看,这将使代码更加清晰和易于管理.

To simplify things we should decouple the implementation of the general-purpose/reusable producer-consumer queue (or simply "blocking queue" as I usually call it) from the implementation of the actual producers and the consumer (that aren't general-purpose/reusable - they are specific to your program). This will make the code much more clear and manageable from a design perspective.

首先你应该实现一个阻塞队列",它可以管理多个生产者和一个消费者.这个阻塞队列将包含处理多线程/同步的代码,消费者线程可以使用它从多个生产者线程接收项目.这样的阻塞队列可以通过多种不同的方式实现(不仅使用互斥量+cond 组合),这取决于您是否有 1 个或多个消费者和 1 个或多个生产者(有时可以引入不同类型的 [特定于平台的] 优化)当您只有 1 个消费者或 1 个生产者时).使用 mutex+cond 对的最简单队列实现可根据需要自动处理多个生产者和多个消费者.

First you should implement a "blocking queue" that can manage multiple multiple producers and a single consumer. This blocking queue will contain the code that handles multithreading/synchronization and it can be used by a consumer thread to receive items from several producer threads. Such a blocking queue can be implemented in a lot of different ways (not only with mutex+cond combo) depending whether you have 1 or more consumers and 1 or more producers (sometimes it is possible to introduce different kinds of [platform specific] optimizations when you have only 1 consumer or 1 producer). The simplest queue implementation with mutex+cond pair automatically handles multiple producers and multiple consumers if needed.

队列只有一个内部容器(它可以是一个非线程安全的 std::queue、向量或列表)来保存项目和一个相关联的 mutex+cond 对,以保护这个容器免受多个线程的并发访问.队列必须提供两个操作:

The queue has only an internal container (it can be a non-thread safe std::queue, vector or list) that holds the items and an associated mutex+cond pair that protects this container from concurrent access of multiple threads. The queue has to provide two operations:

  • produce(item):将一项放入队列并立即返回.伪代码如下所示:

  • produce(item): puts one item into the queue and returns immediately. The pseudo code looks like this:

  1. 锁定互斥锁
  2. 将新项目添加到内部容器
  3. 通过 cond 发出信号
  4. 解锁互斥锁
  5. 返回

  • wait_and_get():如果队列中至少有一项,那么它会删除最旧的一项并立即返回,否则它会等待某人将一项与produce(item) 操作.

  • wait_and_get(): if there is at least one item in the queue then it removes the oldest one and returns immediately, otherwise it waits util someone puts an item to the queue with the produce(item) operation.

    1. 锁定互斥锁
    2. 如果容器为空:

    1. lock mutex
    2. if container is empty:

    1. 等待 cond (pthread_cond_wait)

  • 删除最旧的项目

  • remove oldest item

  • 现在您有了一个可重用的阻塞队列来构建,我们可以实现生产者和消费者以及控制事物的主线程.

    Now that you have a reusable blocking queue to build on we can implement the producers and the consumer along with the main thread that controls things.

    他们只是将一堆项目扔进队列(通过调用阻塞队列的produce(item))然后他们退出.如果项目的生产计算量不大或不需要等待大量 IO 操作,那么这将在您的示例程序中很快完成.要模拟线程执行繁重工作的现实世界场景,您可以执行以下操作:在每个生产者线程上,您只将 X(假设为 5)个项目放入队列,但在每个项目之间您等待随机秒数,假设1 到 3 秒之间.请注意,一段时间后,您的生产者线程在完成工作后会自行退出.

    They just throw a bunch of items into the queue (by calling produce(item) of the blocking queue) and then they exit. If the production of items isn't computation heavy or doesn't require waiting for a lot of IO operations then this will finish very quickly in your example program. To simulate real world scenarios where the threads do heavy work you could the the following: On each producer thread you put only X (lets say 5) number of items to the queue but between each item you wait for a random number of seconds let's say between 1 and 3 seconds. Note that after some time your producer threads quit by themselves when they finished their job.

    消费者有一个无限循环,它总是使用 wait_and_get() 从队列中获取下一个项目并以某种方式处理它.如果它是一个特殊的项目,表示处理结束,那么它会跳出无限循环而不是处理该项目.伪代码:

    The consumer has an infinite loop in which it always gets the next item from the queue with wait_and_get() and processes it somehow. If it is a special item that signals the end of processing then it breaks out of the infinite loop instead of processing the item. Pseudo code:

    1. 无限循环:

    1. Infinite loop:

    1. 从队列中获取下一项(wait_and_get())
    2. 如果这是指示处理结束的特殊项,则跳出循环...
    3. 否则让我们处理这个项目

    主线程

    1. 以任意顺序启动包括生产者和消费者在内的所有线程.
    2. 等待所有生产者线程完成(pthread_join() 他们).

    请记住,生产者在没有外部刺激的情况下一段时间后自行完成并退出.当您完成加入所有生产者时,这意味着每个生产者都已退出,因此没有人会再次调用队列的 produce(item) 操作.然而,队列中可能仍有未处理的项目,消费者可能仍在处理这些项目.

    Remember that producers finish and quit by themselves after some time without external stimuli. When you finish join-ing all producers it means that every producer has quit so no one will call the produce(item) operation of the queue again. However the queue may still have unprocessed items and consumer may still work on crunching those.

    将最后一个特殊的处理结束"项放入消费者队列.

    Put the last special "end of processing" item to the queue for the consumer.

    当消费者处理完生产者生产的最后一个项目时,它仍然会用 wait_and_get() 向队列请求下一个项目——这可能会因为等待下一个项目而导致死锁永远不会到达.为了在主线程上提供帮助,我们将最后一个特殊项目放入队列,该队列标志着消费者处理结束.请记住,我们的消费者实现包含对这个特殊项目的检查,以确定何时完成处理.重要的是,只有在生产者完成后(加入他们之后),这个特殊的项目才能被放入主线程的队列中!

    When the consumer finishes processing the last item produced by the producers it will still ask the queue for the next item with wait_and_get() - this may result in a deadlock because of waiting for the next item that never arrives. To aid this on the main thread we put the last special item to the queue that signals the end of processing for the consumer. Remember that our consumer implementation contains a check for this special item to find out when to finish processing. Important that this special item has to be placed to the queue on the main thread only after the producers have finished (after joining them)!

    如果您有多个消费者,那么将多个特殊的处理结束"项目放入队列(每个消费者 1 个)比使队列更智能以能够仅用 1 个处理结束"处理多个消费者更容易物品.由于主线程协调整个过程(线程创建、线程加入等),它确切知道消费者的数量,因此很容易将相同数量的处理结束"项目放入队列.

    If you have multiple consumers then its easier to put multiple special "end of processing" items to the queue (1 for each consumer) than making the queue smarter to be able to handle multiple consumers with only 1 "end of processing" item. Since the main thread orchestrates the whole thing (thread creation, thread joining, etc) it knows exactly the number of consumers so it's easy to put the same number of "end of processing" items to the queue.

    等待消费者线程加入它而终止.

    Wait for the consumer thread to terminate by joining it.

    在将处理结束的特殊项目放入队列后,我们等待消费者线程处理剩余的项目(由生产者生产)以及最后一个特殊项目(由主协调器"线程生产)要求消费者完成.我们通过pthread_join()在主线程上等待——在消费者线程中.

    After putting the end-of-processing special item to the queue we wait for the consumer thread to process the remaining items (produced by the producers) along with our last special item (produced by the main "coordinator" thread) that asks consumer to finish. We do this waiting on the main thread by pthread_join()-in the consumer thread.

    附加说明:

    • 在我的线程系统实现中,阻塞队列的项目通常是指针 - 指向必须执行/处理的作业"对象的指针.(您可以将阻塞队列实现为模板类,在这种情况下,阻塞队列的用户可以指定项目的类型).在我的例子中,很容易将一个特殊的处理结束"项放入消费者队列:为此我通常使用一个简单的 NULL 作业指针.在您的情况下,您必须找出您可以在队列中使用什么样的特殊值来向消费者发出处理结束的信号.
    • 生产者可能有自己的队列和一大堆其他数据结构,他们可以用来生产物品",但消费者并不关心这些数据结构.消费者只关心通过自己的阻塞队列接收到的单个项目.如果生产者想从消费者那里得到一些东西,那么它必须通过队列向消费者发送一个项目(作业").阻塞队列实例属于消费者线程——它提供了任意线程和消费者线程之间的单向通信通道.甚至消费者线程本身也可以将项目放入自己的队列中(在某些情况下这很有用).
    • pthread_cond_wait 文档说这个函数可以在没有实际信号的情况下唤醒(尽管我在我的生活中从未见过由这个函数的虚假唤醒引起的错误).为了帮助实现这一点,如果容器是空的,那么 pthread_cond_wait 部分代码应该替换为 而容器是空的 pthread_cond_wait 但同样,这种虚假的唤醒事情可能是一个湖水怪物这仅存在于具有特定 linux 线程原语实现的某些架构上,因此您的代码可能可以在台式机上运行而无需关心此问题.
    • Additional notes:

      • In my threaded system implementations the items of the blocking queue are usually pointers - Pointers to "job" objects that have to be executed/processed. (You can implement the blocking queue as a template class, in this case the user of the blocking queue can specify the type of the item). In my case it is easy to put a special "end of processing" item to the queue for the consumer: I usually use a simple NULL job pointer for this purpose. In your case you will have to find out what kind of special value can you use in the queue to signal the end of processing for the consumer.
      • The producers may have their own queues and a whole bunch of other data structures with which they play around to "produce items" but the consumer doesn't care about those data structures. The consumer cares only about individual items received through its own blocking queue. If a producer wants something from the consumer then it has to send an item (a "job") to the consumer through the queue. The blocking queue instance belongs to the consumer thread - it provides a one-way communication channel between an arbitrary thread and the consumer thread. Even the consumer thread itself can put an item into its own queue (in some cases this is useful).
      • The pthread_cond_wait documentation says that this function can wake up without actual signaling (although I've never seen a single bug caused by the spurious wakup of this function in my life). To aid this the if container is empty then pthread_cond_wait part of the code should be replaced to while the container is empty pthread_cond_wait but again, this spurious wakeup thing is probably a lochness monster that is present only on some architectures with specific linux implementations of threading primitives so your code would probably work on desktop machines without caring about this problem.
      • 这篇关于具有 pthreads 和锁且没有 boost 的单读取器多写入器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

    09-06 03:17