和多个套接字的FIFO排序

和多个套接字的FIFO排序

本文介绍了Linux select()和多个套接字的FIFO排序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Linux select()呼叫中继事件排序有什么方法吗?

Is there any way for the Linux select() call relay event ordering?

我所看到的内容的描述:

A description of what I'm seeing:

在一台机器上,我编写了一个简单的程序,该程序发送三个多播数据包,每个三个不同的多播组一个.这些数据包是背对背发送的,之间没有延迟. IE. sendto(mcast_group1); sendto(mcast_group2); sendto(mcast_group3).

On one machine, I wrote a simple program which sends three multicast packets, one to each of three different multicast groups. These packets are sent back-to-back, with no delay in between. I.e. sendto(mcast_group1); sendto(mcast_group2); sendto(mcast_group3).

在另一台机器上,我有一个接收程序.该程序为每个多播组使用一个套接字.每个套接字对它侦听的地址执行bind()和IP_ADD_MEMBERSHIP(即加入/订阅).然后,程序在三个套接字上执行一次select().

On the other machine, I have a receiving program. The program uses one socket per multicast group. Each socket does a bind() and IP_ADD_MEMBERSHIP (i.e. join/subscribe) to the address to which it listens. The program then does a select() on the three sockets.

当select返回时,所有三个插槽均可读取.但是,哪个先到呢?准备读取的套接字列表是一个集合,因此没有顺序.我想要的是,如果select()对每个接收到的数据包按顺序恰好返回了一次(增加的开销在这里是可以接受的).或者,还有其他机制可以用来确定数据包的接收顺序吗?

When select returns, all three sockets are available for reading. But which one came first? The ready-for-reading list of sockets is a set, and therefore has no order. What I would like is if select() returned exactly once per received packet, in order (the increased overhead is acceptable here). Or, is there some other kind of mechanism I can use to determine packet receive order?

其他信息:

  • OS是x86_64上的CentOS 5(实际上是Redhat Enterprise Linux)
  • NIC硬件是Intel 82571EB
  • 我已经尝试过e1000e驱动程序版本1.3.10-k2和2.1.4-NAPI
  • 我尝试将NIC的中断固定到一个已卸载且隔离的CPU内核上
  • 我已通过设置驱动程序选项InterruptThrottleRate = 0并通过ethtool设置rx-usecs = 0来禁用硬件IRQ合并
  • 我也尝试使用epoll,它具有相同的行为

最后一点:如果我仅使用一个套接字,则将保留数据包排序.在这种情况下,我绑定到INADDR_ANY(0.0.0.0),并在同一套接字上多次执行IP_ADD_MEMBERSHIP.但这不适用于我们的应用程序,因为我们需要通过绑定到实际的多播地址来提供过滤.最终,同一台机器上将有多个多播接收程序,并且订阅集可能彼此交叉.因此,也许另一种解决方案是找到另一种方法来实现bind()的过滤效果,但不使用bind().

A final remark: packet ordering is preserved if I only use one socket. In this case, I bind to INADDR_ANY (0.0.0.0) and do the IP_ADD_MEMBERSHIP multiple times on the same socket. But this does not work for our application, because we need the filtering provided by binding to the actual multicast address. Ultimately, there will be multiple multicast receiving programs on the same machine, with subscription sets that may intersect with each other. So maybe an alternate solution is to find another way to achieve the filtering effect of bind(), but without bind().

推荐答案

您可以使用 IP_PKTINFO 获取数据包发送到的多播组的地址-即使套接字已预订一堆多播组.将其放置在适当的位置,您将获得有序的数据包,并能够按组地址进行过滤.请参见下面的示例:

You can use IP_PKTINFO to get the address of the multicast group the packet was send to - even if the socket is subscribed for a bunch of multicast groups. Having this in place, you will get the packets in order and the ability to filter by group addresses. See the example below:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <ctype.h>
#include <errno.h>

#define PORT 1234
#define PPANIC(msg) perror(msg); exit(1);
#define STATS_PATCH 0

int main(int argc, char **argv)
{
    fd_set master;
    fd_set read_fds;
    struct sockaddr_in serveraddr;
    int sock;
    int opt = 1;
    size_t i;
    int rc;

    char *mcast_groups[] = {
        "226.0.0.1",
        "226.0.0.2",
        NULL
    };
#if STATS_PATCH
    struct stat stat_buf;
#endif

    struct ip_mreq imreq;

    FD_ZERO(&master);
    FD_ZERO(&read_fds);

    rc = sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
    if(rc == -1)
    {
        PPANIC("socket() failed");
    }

    rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    if(rc == -1)
    {
        PPANIC("setsockopt(reuse) failed");
    }

    memset(&serveraddr, 0, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(PORT);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    rc = bind(sock, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    if(rc == -1)
    {
        PPANIC("bind() failed");
    }

    rc = setsockopt(sock, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt));
    if(rc == -1)
    {
        PPANIC("setsockopt(IP_PKTINFO) failed");
    }

    for (i = 0; mcast_groups[i] != NULL; i++)
    {
        imreq.imr_multiaddr.s_addr = inet_addr(mcast_groups[i]);
        imreq.imr_interface.s_addr = INADDR_ANY;
        rc = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&imreq, sizeof(struct ip_mreq));
        if (rc != 0)
        {
            PPANIC("joing mcast group failed");
        }
    }

    FD_SET(sock, &master);

    while(1)
    {
        read_fds = master;
        rc = select(sock + 1, &read_fds, NULL, NULL, NULL);

        if (rc == 0)
        {
            continue;
        }

        if(rc == -1)
        {
            PPANIC("select() failed");
        }

        if(FD_ISSET(sock, &read_fds))
        {
            char buf[1024];
            int inb;
            char ctrl_msg_buf[1024];
            struct iovec iov[1];
            iov[0].iov_base = buf;
            iov[0].iov_len = 1024;
            struct msghdr msg_hdr = {
                .msg_iov = iov,
                .msg_iovlen = 1,
                .msg_name = NULL,
                .msg_namelen = 0,
                .msg_control = ctrl_msg_buf,
                .msg_controllen = sizeof(ctrl_msg_buf),
            };
            struct cmsghdr *ctrl_msg_hdr;

            inb = recvmsg(sock, &msg_hdr, 0);
            if (inb < 0)
            {
                PPANIC("recvmsg() failed");
            }

            for (ctrl_msg_hdr = CMSG_FIRSTHDR(&msg_hdr); ctrl_msg_hdr != NULL; ctrl_msg_hdr = CMSG_NXTHDR(&msg_hdr, ctrl_msg_hdr))
            {
                if (ctrl_msg_hdr->cmsg_level == IPPROTO_IP && ctrl_msg_hdr->cmsg_type == IP_PKTINFO)
                {
                    struct in_pktinfo *pckt_info = (struct in_pktinfo *)CMSG_DATA(ctrl_msg_hdr);
                    printf("got data for mcast group: %s\n", inet_ntoa(pckt_info->ipi_addr));
                    break;
                }
            }

            printf("|");
            for (i = 0; i < inb; i++)
                printf("%c", isprint(buf[i])?buf[i]:'?');
            printf("|\n");
#if STATS_PATCH
            rc = fstat(sock, &stat_buf);
            if (rc == -1)
            {
                perror("fstat() failed");
            } else {
                printf("st_atime: %d\n", stat_buf.st_atime);
                printf("st_mtime: %d\n", stat_buf.st_mtime);
                printf("st_ctime: %d\n", stat_buf.st_ctime);
            }
#endif
        }
    }

    return 0;
}

下面的代码不能解决操作问题,但可以指导处理类似要求的人

(EDIT)一个人不应该在深夜做这些事情……即使有了这种解决方案,您也只能得到由select处理fd的命令-但这不会给您任何指示.帧到达的时间.

(EDIT) One should not do such things late at night... even with that solution you will only get the order the fd was handled by select - and this will give you no indication about the time of the frame arrival.

如所述,由于未为套接字inode设置所需的回调,因此当前无法检索套接字的顺序或它们更改的时间戳.但是,如果您能够修补内核,则可以通过在select系统调用中设置时间来解决该问题.

As stated here, it is currently not possible to retrieve the order of the sockets or the timestamps they changed as the required callback is not set for socket inodes. But if you are able to patch your kernel, you may work around the problem by setting the time within the select system call.

以下补丁可能会给您一个想法:

The following patch may give you an idea:

diff --git a/fs/select.c b/fs/select.c
index 467bb1c..3f2927e 100644
--- a/fs/select.c
+++ b/fs/select.c
@@ -435,6 +435,9 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
+           struct timeval tv;
+
+           do_gettimeofday(&tv);

            in = *inp++; out = *outp++; ex = *exp++;
            all_bits = in | out | ex;
@@ -452,6 +455,16 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
                f = fdget(i);
                if (f.file) {
                    const struct file_operations *f_op;
+                   struct kstat stat;
+
+                   int ret;
+                   u8 is_sock = 0;
+
+                   ret = vfs_getattr(&f.file->f_path, &stat);
+                   if(ret == 0 && S_ISSOCK(stat.mode)) {
+                       is_sock = 1;
+                   }
+
                    f_op = f.file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op->poll) {
@@ -464,16 +477,22 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
                        res_in |= bit;
                        retval++;
                        wait->_qproc = NULL;
+                       if(is_sock && f.file->f_inode)
+                           f.file->f_inode->i_ctime.tv_sec = tv.tv_sec;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {
                        res_out |= bit;
                        retval++;
                        wait->_qproc = NULL;
+                       if(is_sock && f.file->f_inode)
+                           f.file->f_inode->i_ctime.tv_sec = tv.tv_sec;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {
                        res_ex |= bit;
                        retval++;
                        wait->_qproc = NULL;
+                       if(is_sock && f.file->f_inode)
+                           f.file->f_inode->i_ctime.tv_sec = tv.tv_sec;
                    }
                    /* got something, stop busy polling */
                    if (retval) {

注意:

  1. 这是...只适合您:)-不要在主线中看到它

  1. this is... just for you :) - don't expect it in the mainline

do_gettimeofday()被称为 .为了获得更高的粒度,应该在每次迭代中进行(并且仅在需要时进行).因为stat-interface仅提供1秒的粒度您可以(!UGLY!)使用剩余的时间属性将一秒的秒数映射到这些字段.

do_gettimeofday() is called before each relevant fd is tested.to get higher granularity this should be done in each iteration (and only if needed). since the stat-interface only offers a granularity of one secondyou may (!UGLY!) use the remaining time attributes to map the fractions of a second to those fields.

这是使用内核3.16.0完成的,尚未经过良好测试.不要在太空飞船或医疗设备中使用它.如果您想尝试,请获取文件系统映像(例如 https ://people.debian.org/~aurel32/qemu/amd64/debian_wheezy_amd64_standard.qcow2 ),然后使用qemu对其进行测试:

this was done using kernel 3.16.0 and is not well tested. don't use it in a space ship or medical equipment. if you would like to try it, get a filesystem-image (eg. https://people.debian.org/~aurel32/qemu/amd64/debian_wheezy_amd64_standard.qcow2) and use qemu to test it:

sudo qemu-system-x86_64 -kernel arch/x86/boot/bzImage -hda debian_wheezy_amd64_standard.qcow2-追加"root =/dev/sda1"

sudo qemu-system-x86_64 -kernel arch/x86/boot/bzImage -hda debian_wheezy_amd64_standard.qcow2 -append "root=/dev/sda1"

这篇关于Linux select()和多个套接字的FIFO排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 22:08