我是套接字编程的新手,此刻我面临着一个我无法解决的问题。我从多个来源了解到C++标准模板(STL)容器是而不是线程安全的,因此,作为程序员,必须采用一种机制来确保多个线程不会同时修改容器的数据。

例如Thread safety std::vector push_back and reserve

我使用了std::mutex类来确保在对threads进行编程时,没有人同时在同一容器中写入数据。但是,当我使用sockets时,这对我不起作用。

假设我有4个客户端,每个客户端按以下顺序将数据(int)发送到服务器:

client_0: 4
client_1: 8
client_2: 5
client_4: 7

对于简单的服务器,请遵循以下代码:
#define PORT 60000

#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <vector>
#include <string>
#include <iostream>
#include <mutex>

using namespace std;

vector<int> inputQueue; //<--------!
mutex mtx; //<---------------------!

void printVector(vector<int> input) {
    cout << "inputQueue: [";
    for (unsigned int i = 0; i < input.size(); i++ ) {
        if (i != input.size() - 1)
            cout << input[i] << ", ";
        else
            cout << input[i];
    }
    cout << "]." << endl;
}

int main(int argc, char const *argv[])
{
    int server_fd, client_fd;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons( PORT );
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address))<0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    if (listen(server_fd, 10) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }
    while(1) {
        char buffer[4];
        if ((client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))<0) {
            perror("accept");
            exit(EXIT_FAILURE);
        }
        if (!fork()) {
            recv(client_fd, buffer, 4, MSG_WAITALL);
            int receivedInt = int(
                    (unsigned char)(buffer[0]) << 24 |
                    (unsigned char)(buffer[1]) << 16 |
                    (unsigned char)(buffer[2]) << 8 |
                    (unsigned char)(buffer[3])
            );
            mtx.lock(); //<-------------------------------------!
            inputQueue.push_back(receivedInt); //<--------------!
            cout << "Client context. Integer registered: " << receivedInt << ": inputQueue length is " << inputQueue.size() << endl;
            printVector(inputQueue); //<------------------------!
            mtx.unlock(); //<-----------------------------------!
            close(server_fd); close(client_fd);
        }
        cout << "Server context: inputQueue length is " << inputQueue.size() << endl;
        printVector(inputQueue);
    }
    return 0;
}

服务器必须接收数据以确保它们以相同的顺序进行接收,并使用std::vector<int> inputQueue方法将它们各自的数据注册在一个整数向量中,即push_back(),以便inputQueue = {4, 8, 5, 7}在接收到所有数据后通过客户。

我必须澄清inputQueue是一个全局变量,在开始执行服务器时,它不包含元素,但是会在客户端注册时添加它们。

问题是没有一个客户端在inputQueue中注册任何元素。请注意,在以下代码中,根据放置cout <<指令的位置,您可以看到inputQueue的大小不同。这表明在客户端的上下文中,每个客户端都覆盖inputQueue的第一个元素,但是在客户端之外,没有一个客户端能够在inputQueue中注册单个元素。

显然,每个套接字都有其自己的inputQueue副本,因此当销毁它时,修改后的inputQueue副本也将被销毁。

输出如下:
Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 4: inputQueue length is 1
inputQueue: [4].
Server context: inputQueue length is 1
inputQueue: [4].
Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 8: inputQueue length is 1
inputQueue: [8].
Server context: inputQueue length is 0
inputQueue: [].
Server context: inputQueue length is 1
inputQueue: [8].
Client context. Integer registered: 5: inputQueue length is 1
inputQueue: [5].
Server context: inputQueue length is 1
inputQueue: [5].
Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 7: inputQueue length is 1
inputQueue: [7].
Server context: inputQueue length is 1
inputQueue: [7].

有谁知道为什么会这样以及如何解决呢?我希望你能帮助我。谢谢

最佳答案

if (!fork()) {
fork()使用其自己的虚拟内存地址空间创建一个全新的独立进程。显然,所示的代码期望子进程和原始进程都通过同一个对象(即由互斥锁锁定的向量)进行交互。

那不是事实。您现在有两个完全独立的过程。这与两次或同时连续运行您的程序没有什么不同。您是否希望程序的两个运行副本都以某种方式共享相同的向量和互斥锁?当然不是。

相反,您要做的是在同一进程中使用std::thread创建一个新的执行线程。您的C++书籍应包含更多有关如何使用std::thread创建新执行线程的信息。

此外,即使您用类似的执行线程替换fork():仍然不能解决这里的所有问题。您还需要在多个执行线程之间正确处理同步。具体来说:不能保证在另一个执行线程尝试对其内容进行printVector之前,新的执行线程会将某些内容插入向量中。在原始执行线程输入printVector之前,新的执行线程可以设法做到这一点。否则可能不会,并且printVector找到一个完全为空的向量,因为另一个执行线程还没有设法足够快地将某些内容插入其中。现在,您有两个完全独立的执行线程同时运行,并且您无法保证哪个线程首先执行操作。

每次运行所示程序的多线程版本时,您甚至可以获得不同的结果(而且您可能会这样做)。

当您准备开始解决这个新问题时,您的C++书将解释如何使用条件变量和互斥锁来正确实现多线程同步。不幸的是,这不是在stackoverflow.com上的简短回答中可以完全涵盖的主题,但是在C++书中应该有几个专门的章节,您可以在其中找到更多信息。

P.S.您的输出在输入队列中显示任何内容的唯一原因是,没有什么可以阻止子进程在退出if语句并最终调用printVector时继续执行该程序。它不是来自父进程。每个子进程最终都会打印自己插入其自己的向量中的值。

关于linux - 套接字传入连接不能将push_back元素并发到全局定义的std::vector,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56676515/

10-13 08:42