我正在开发可产生数十Gbit/s吞吐量的实时数据流的应用程序。我不需要任何响应,因此我使用UDP。我将数据包发送到多播地址。我的系统(Centos 7)每个都有2个10 Gbit/s的网络端口。

当我尝试同时通过两个端口发送数据时遇到了麻烦。我原本期望的东西会少一点20 Gbit/s,但实际上我得到了11-12 Gbit。如果仅使用1个端口,则应达到9.5 Gbit/s的速度。

我使用select()和非阻塞套接字。这是可执行的演示:

#include <string.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string>
#include <stdio.h>
#include <cerrno>
#include <cstring>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include <ctime>
#include <sys/time.h>
#include <sys/fcntl.h>
// #include <sys/resource.h>



#define GROUP_PORT 3490
#define GROUP_ADDR "225.0.0.37"
#define INTERFACES 2
#define LOCAL_INTERFACE_IP0 "192.168.2.3"
#define LOCAL_INTERFACE_IP1 "192.168.2.4"



inline long duration_mcs(timespec t1, timespec t2) {
    return ((t2.tv_sec - t1.tv_sec)*1000000+(t2.tv_nsec - t1.tv_nsec)/1000);
}



int main(int argc, char* argv[])
{

    // if (setpriority(PRIO_PROCESS, 0, -15) == -1) {
    //     printf("PRIO failed: %s.\n", std::strerror(errno));
    //     return -1;
    // }

    //bind thread to specific core
    cpu_set_t set;
    CPU_ZERO(&set); //clear cpu set
    int cpuId = 5;
    CPU_SET(cpuId, &set); //dedicate cpu for current thread (add cpuId to set)
    //bind current thread (pId=0) to dedicated cpu
    if (sched_setaffinity(0, sizeof(set), &set) == -1) {
        printf("sched_setaffinity failed: %s.\n", std::strerror(errno));
        return -1;
    }


    // SETUP INTERFACES ADDRESSES ----------------------------------
    // -------------------------------------------------------------
    in_addr localInterface[INTERFACES];
    localInterface[0].s_addr = inet_addr(LOCAL_INTERFACE_IP0);
    localInterface[1].s_addr = inet_addr(LOCAL_INTERFACE_IP1);


    // SETUP SOCKETS -----------------------------------------------
    // -------------------------------------------------------------
    int fdmax = 0;
    int fds[INTERFACES];
    int flags;

    for (int i=0; i<INTERFACES; ++i) {
        fds[i] = socket(AF_INET, SOCK_DGRAM, 0);
        if (fds[i] == -1) {
            printf("Socket %d failed: %s.\n", i, std::strerror(errno));
            return -1;
        }

        //make sockets NONBLOCK
        if ((flags = fcntl(fds[i], F_GETFL, 0)) < 0)
        {
            printf("F_GETFL on socket %d failed: %s.\n", i, std::strerror(errno));
        }

        if (fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0)
        {
            printf("O_NONBLOCK on socket %d failed: %s.\n", i, std::strerror(errno));
        }

        if (fds[i] > fdmax) fdmax = fds[i];
        printf("Socket %d success.\n", i);
    }



    // SETUP SOCKET OPTIONS ----------------------------------------
    // -------------------------------------------------------------
    // send packets through particular interface
    for (int i=0; i<INTERFACES; ++i) {
        if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_IF, (char*) &localInterface[i], sizeof(localInterface[i])) == -1) {
            printf("IP_MULTICAST_IF on interface %s failed: %s.\n", inet_ntoa(localInterface[i]),  std::strerror(errno));
            return -1;
        }
    }

    // disable multicast loop
    char loopch=0;
    for (int i=0; i<INTERFACES; ++i) {
        if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_LOOP, (char*) &loopch, sizeof(loopch)) == -1) {
            printf("IP_MULTICAST_LOOP on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
            return -1;
        }
    }


    // SETUP ADDRESS STRUCTURE FOR SENDING PACKETS TO --------------
    // -------------------------------------------------------------
    sockaddr_in address;
    address.sin_family = AF_INET;
    address.sin_port = htons(GROUP_PORT);
    address.sin_addr.s_addr = inet_addr(GROUP_ADDR);


    // SETUP DATA BUFFER -------------------------------------------
    // -------------------------------------------------------------
    size_t buf_size = 50000;
    char* buffer = (char*) memalign(256, buf_size);


    // SETUP SELECT() STRUCTURES -----------------------------------
    // -------------------------------------------------------------
    fd_set master, writefds;
    FD_ZERO(&master);
    FD_ZERO(&writefds);
    for (int i=0; i<INTERFACES; ++i) {
        FD_SET(fds[i], &master);
    }


    // SENDING PACKETS ---------------------------------------------
    // -------------------------------------------------------------
    size_t packets = 10000; //number of packets to send
    size_t nbytes = 0;
    int snt;

    bool pckt_flag = false; //flag for all packets are sent

    size_t cnt[INTERFACES]; //counter for sent packets per each interface
    for (int ifs=0; ifs<INTERFACES; ++ifs) cnt[ifs] = 0;

    timespec t1, t2;
    timespec t1_sel, t2_sel;
    timespec t1_proc, t2_proc;
    timespec t1_snd, t2_snd;
    long tsum_sel = 0, tsum_proc = 0, tsum_snd = 0;

    clock_gettime(CLOCK_MONOTONIC_RAW, &t1);

    while (!pckt_flag) {
        writefds = master;

        clock_gettime(CLOCK_MONOTONIC_RAW, &t1_sel);
        if (select(fdmax+1, NULL, &writefds, NULL, NULL) == -1) {
            printf("select() failed: %s.\n", std::strerror(errno));
            return -1;
        }
        clock_gettime(CLOCK_MONOTONIC_RAW, &t2_sel);
        tsum_sel += duration_mcs(t1_sel, t2_sel);

        clock_gettime(CLOCK_MONOTONIC_RAW, &t1_proc);
        for (int ifs=0; ifs<INTERFACES; ++ifs) {
            if (FD_ISSET(fds[ifs], &writefds)) {
                //check for how many packets were sent over the interface
                if (cnt[ifs] < packets) {

                    clock_gettime(CLOCK_MONOTONIC_RAW, &t1_snd);
                    snt = sendto(fds[ifs], buffer, buf_size, 0, (sockaddr*) &address, sizeof(address));
                    clock_gettime(CLOCK_MONOTONIC_RAW, &t2_snd);
                    tsum_snd += duration_mcs(t1_snd, t2_snd);

                    if (snt < buf_size) {
                        printf("Sending error: sent %d of %d bytes\n", snt, buf_size);
                    } else {
                        nbytes += snt;
                        ++cnt[ifs];
                    }
                }
            }
        }
        //renew flag
        pckt_flag = true;
        for (int ifs=0; ifs<INTERFACES; ++ifs) {
            pckt_flag = (pckt_flag && (cnt[ifs] == packets));
        }
        clock_gettime(CLOCK_MONOTONIC_RAW, &t2_proc);
        tsum_proc += duration_mcs(t1_proc, t2_proc);
    }
    clock_gettime(CLOCK_MONOTONIC_RAW, &t2);

    size_t traf_tot_bytes = nbytes;
    double duration_sec = (double) duration_mcs(t1, t2)/1000000;

    printf("Time %f s.\n", duration_sec);
    printf("Total bytes sent %d.\n", traf_tot_bytes);
    printf("Total throughput %f Gbit/s.\n", 8*(traf_tot_bytes/duration_sec)/1000000000);
    printf("Packets sent by interfaces %d/%d\n", cnt[0], cnt[1]);
    printf("tsum_sel = %d\n", tsum_sel);
    printf("tsum_proc = %d\n", tsum_proc);
    printf("tsum_snd = %d\n", tsum_snd);


    free(buffer);

    return 0;

}

在此演示中,我插入了计时器,用于等待 select()(tsum_sel),数据包处理(tsum_proc)和由 sendto()本身(tsum_snd)发送的总时间。

在我的系统上,以INTERFACE = 1的输出:
Socket 0 success.
Time 0.429122 s.
Total bytes sent 500000000.
Total throughput 9.321358 Gbit/s.
Packets sent by interfaces 10000/0
tsum_sel = 51086
tsum_proc = 362756
tsum_snd = 358939

对于INTERFACE = 2:
Socket 0 success.
Socket 1 success.
Time 0.697962 s.
Total bytes sent 1000000000.
Total throughput 11.461942 Gbit/s.
Packets sent by interfaces 10000/10000
tsum_sel = 2383
tsum_proc = 662971
tsum_snd = 652629

我看到 sendto()函数几乎消耗了所有时间。因此,看起来我在第一个接口(interface)上发送数据包,等待sendto返回,然后发送到第二个接口(interface)。为了避免这种情况,我将套接字设为非阻塞。我不明白发生了什么。

我的问题是:

1)为什么此代码不以20 Gbit/s的速率发送数据?

2)为什么非阻塞sendto()需要这么多时间?

3)如何在这里获得20 Gbit/s?

最佳答案

好吧,我得到了19 Gbit/s。

我做了2个线程-每个线程独立发送数据。看起来很简单,但是几乎没有阴影。如果我将线程绑定(bind)到相同的虚拟核心-问题仍然存在-它的速度为14-15 Gbit/s。仅当我将线程绑定(bind)到不同的虚拟内核时,它才能很好地工作。甚至这些内核都在同一物理内核上。这是我所希望的。我可以使用1个物理核心进行系统维护和联网。

谢谢所有发表评论的人。

关于c++ - 无法同时在两个10Gbps接口(interface)上达到全线速,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48786450/

10-12 07:27