我正在开发可产生数十Gbit/s吞吐量的实时数据流的应用程序。我不需要任何响应,因此我使用UDP。我将数据包发送到多播地址。我的系统(Centos 7)每个都有2个10 Gbit/s的网络端口。
当我尝试同时通过两个端口发送数据时遇到了麻烦。我原本期望的东西会少一点20 Gbit/s,但实际上我得到了11-12 Gbit。如果仅使用1个端口,则应达到9.5 Gbit/s的速度。
我使用select()和非阻塞套接字。这是可执行的演示:
#include <string.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string>
#include <stdio.h>
#include <cerrno>
#include <cstring>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include <ctime>
#include <sys/time.h>
#include <sys/fcntl.h>
// #include <sys/resource.h>
#define GROUP_PORT 3490
#define GROUP_ADDR "225.0.0.37"
#define INTERFACES 2
#define LOCAL_INTERFACE_IP0 "192.168.2.3"
#define LOCAL_INTERFACE_IP1 "192.168.2.4"
inline long duration_mcs(timespec t1, timespec t2) {
return ((t2.tv_sec - t1.tv_sec)*1000000+(t2.tv_nsec - t1.tv_nsec)/1000);
}
int main(int argc, char* argv[])
{
// if (setpriority(PRIO_PROCESS, 0, -15) == -1) {
// printf("PRIO failed: %s.\n", std::strerror(errno));
// return -1;
// }
//bind thread to specific core
cpu_set_t set;
CPU_ZERO(&set); //clear cpu set
int cpuId = 5;
CPU_SET(cpuId, &set); //dedicate cpu for current thread (add cpuId to set)
//bind current thread (pId=0) to dedicated cpu
if (sched_setaffinity(0, sizeof(set), &set) == -1) {
printf("sched_setaffinity failed: %s.\n", std::strerror(errno));
return -1;
}
// SETUP INTERFACES ADDRESSES ----------------------------------
// -------------------------------------------------------------
in_addr localInterface[INTERFACES];
localInterface[0].s_addr = inet_addr(LOCAL_INTERFACE_IP0);
localInterface[1].s_addr = inet_addr(LOCAL_INTERFACE_IP1);
// SETUP SOCKETS -----------------------------------------------
// -------------------------------------------------------------
int fdmax = 0;
int fds[INTERFACES];
int flags;
for (int i=0; i<INTERFACES; ++i) {
fds[i] = socket(AF_INET, SOCK_DGRAM, 0);
if (fds[i] == -1) {
printf("Socket %d failed: %s.\n", i, std::strerror(errno));
return -1;
}
//make sockets NONBLOCK
if ((flags = fcntl(fds[i], F_GETFL, 0)) < 0)
{
printf("F_GETFL on socket %d failed: %s.\n", i, std::strerror(errno));
}
if (fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0)
{
printf("O_NONBLOCK on socket %d failed: %s.\n", i, std::strerror(errno));
}
if (fds[i] > fdmax) fdmax = fds[i];
printf("Socket %d success.\n", i);
}
// SETUP SOCKET OPTIONS ----------------------------------------
// -------------------------------------------------------------
// send packets through particular interface
for (int i=0; i<INTERFACES; ++i) {
if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_IF, (char*) &localInterface[i], sizeof(localInterface[i])) == -1) {
printf("IP_MULTICAST_IF on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
return -1;
}
}
// disable multicast loop
char loopch=0;
for (int i=0; i<INTERFACES; ++i) {
if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_LOOP, (char*) &loopch, sizeof(loopch)) == -1) {
printf("IP_MULTICAST_LOOP on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
return -1;
}
}
// SETUP ADDRESS STRUCTURE FOR SENDING PACKETS TO --------------
// -------------------------------------------------------------
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(GROUP_PORT);
address.sin_addr.s_addr = inet_addr(GROUP_ADDR);
// SETUP DATA BUFFER -------------------------------------------
// -------------------------------------------------------------
size_t buf_size = 50000;
char* buffer = (char*) memalign(256, buf_size);
// SETUP SELECT() STRUCTURES -----------------------------------
// -------------------------------------------------------------
fd_set master, writefds;
FD_ZERO(&master);
FD_ZERO(&writefds);
for (int i=0; i<INTERFACES; ++i) {
FD_SET(fds[i], &master);
}
// SENDING PACKETS ---------------------------------------------
// -------------------------------------------------------------
size_t packets = 10000; //number of packets to send
size_t nbytes = 0;
int snt;
bool pckt_flag = false; //flag for all packets are sent
size_t cnt[INTERFACES]; //counter for sent packets per each interface
for (int ifs=0; ifs<INTERFACES; ++ifs) cnt[ifs] = 0;
timespec t1, t2;
timespec t1_sel, t2_sel;
timespec t1_proc, t2_proc;
timespec t1_snd, t2_snd;
long tsum_sel = 0, tsum_proc = 0, tsum_snd = 0;
clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
while (!pckt_flag) {
writefds = master;
clock_gettime(CLOCK_MONOTONIC_RAW, &t1_sel);
if (select(fdmax+1, NULL, &writefds, NULL, NULL) == -1) {
printf("select() failed: %s.\n", std::strerror(errno));
return -1;
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_sel);
tsum_sel += duration_mcs(t1_sel, t2_sel);
clock_gettime(CLOCK_MONOTONIC_RAW, &t1_proc);
for (int ifs=0; ifs<INTERFACES; ++ifs) {
if (FD_ISSET(fds[ifs], &writefds)) {
//check for how many packets were sent over the interface
if (cnt[ifs] < packets) {
clock_gettime(CLOCK_MONOTONIC_RAW, &t1_snd);
snt = sendto(fds[ifs], buffer, buf_size, 0, (sockaddr*) &address, sizeof(address));
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_snd);
tsum_snd += duration_mcs(t1_snd, t2_snd);
if (snt < buf_size) {
printf("Sending error: sent %d of %d bytes\n", snt, buf_size);
} else {
nbytes += snt;
++cnt[ifs];
}
}
}
}
//renew flag
pckt_flag = true;
for (int ifs=0; ifs<INTERFACES; ++ifs) {
pckt_flag = (pckt_flag && (cnt[ifs] == packets));
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_proc);
tsum_proc += duration_mcs(t1_proc, t2_proc);
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
size_t traf_tot_bytes = nbytes;
double duration_sec = (double) duration_mcs(t1, t2)/1000000;
printf("Time %f s.\n", duration_sec);
printf("Total bytes sent %d.\n", traf_tot_bytes);
printf("Total throughput %f Gbit/s.\n", 8*(traf_tot_bytes/duration_sec)/1000000000);
printf("Packets sent by interfaces %d/%d\n", cnt[0], cnt[1]);
printf("tsum_sel = %d\n", tsum_sel);
printf("tsum_proc = %d\n", tsum_proc);
printf("tsum_snd = %d\n", tsum_snd);
free(buffer);
return 0;
}
在此演示中,我插入了计时器,用于等待 select()(tsum_sel),数据包处理(tsum_proc)和由 sendto()本身(tsum_snd)发送的总时间。
在我的系统上,以INTERFACE = 1的输出:
Socket 0 success.
Time 0.429122 s.
Total bytes sent 500000000.
Total throughput 9.321358 Gbit/s.
Packets sent by interfaces 10000/0
tsum_sel = 51086
tsum_proc = 362756
tsum_snd = 358939
对于INTERFACE = 2:
Socket 0 success.
Socket 1 success.
Time 0.697962 s.
Total bytes sent 1000000000.
Total throughput 11.461942 Gbit/s.
Packets sent by interfaces 10000/10000
tsum_sel = 2383
tsum_proc = 662971
tsum_snd = 652629
我看到 sendto()函数几乎消耗了所有时间。因此,看起来我在第一个接口(interface)上发送数据包,等待sendto返回,然后发送到第二个接口(interface)。为了避免这种情况,我将套接字设为非阻塞。我不明白发生了什么。
我的问题是:
1)为什么此代码不以20 Gbit/s的速率发送数据?
2)为什么非阻塞sendto()需要这么多时间?
3)如何在这里获得20 Gbit/s?
最佳答案
好吧,我得到了19 Gbit/s。
我做了2个线程-每个线程独立发送数据。看起来很简单,但是几乎没有阴影。如果我将线程绑定(bind)到相同的虚拟核心-问题仍然存在-它的速度为14-15 Gbit/s。仅当我将线程绑定(bind)到不同的虚拟内核时,它才能很好地工作。甚至这些内核都在同一物理内核上。这是我所希望的。我可以使用1个物理核心进行系统维护和联网。
谢谢所有发表评论的人。
关于c++ - 无法同时在两个10Gbps接口(interface)上达到全线速,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48786450/