本文档的Copyleft归wwwlkk所有,使用GPL发布,可以自由拷贝、转载,转载时请保持文档的完整性,严禁用于任何商业用途。
E-mail:[email protected]
来源:http://passport.baidu.com/?business&aid=6&un=wwwlkk#7
执行流程:
IRQi :进入中断号是i的硬中断处理程序。最先执行的指令是把当前寄存器的内容保存在内核堆栈中。
iret:从硬中断中返回,最后将会恢复寄存器的内容。
中断是可以嵌套执行的:
IRQi
IRQj
iret
iret
注意:中断信号(指令)是可以嵌套执行,但是,每个中断处理程序都是不同的,在中断处理程序中有可能关闭中断,或者使用自旋锁等措施来禁止中断的嵌套执行。
以上的内容在《linux中断分析》中有较详细的分析。
(2)e1000网卡硬件中断处理程序下面分析一下e1000类型的网卡的硬中断处理程序。
注册并分配中断号:
e1000_open(structnet_device *netdev)
err= e1000_request_irq(adapter);
request_irq(adapter->pdev->irq,handler, irq_flags, netdev->name, netdev);
中断处理程序:
1e1000_intr()
2 ew32(IMC,~0);关闭网卡中断
3 napi_schedule_prep(&adapter->napi)测试NAPI是否已经运行
4 !test_and_set_bit(NAPI_STATE_SCHED,&n->state)测试并设置NAPI运行位
5 如果NAPI未运行
6 __napi_schedule(&adapter->napi);加入本CPU的软中断处理队列
7 如果NAPI已经运行
8 直接返回
在1—2的程序段之间有可能产生多个中断信号,有可能出现如下的情况:
IRQEi //收到第一个中断信号
e1000_intr()
在1—2之间运行
IRQEi //收到第二个中断信号
e1000_intr()
ew32(IMC,~0);关闭网卡中断
执行napi_schedule_prep(&adapter->napi)
!test_and_set_bit(NAPI_STATE_SCHED,&n->state)设置NAPI运行位
__napi_schedule(&adapter->napi);
iret
第一个中断服务例程继续运行
!test_and_set_bit(NAPI_STATE_SCHED,&n->state)测试NAPI运行位
NAPI已经运行
直接返回//第一个中断信号触发的中断处理例程将会直接返回
iret
从这里得出的结论:
在关闭中断前,可能已经产生多个网卡中断,但是,最终只在其中一个中断处理例程中执行__napi_schedule(&adapter->napi)。
结论1也同样适用于多个CPU。
软中断的入口点是do_softirq(),内核代码中有多个点会进入do_softirq():
1.local_bh_enable()
2.irq_exit()
3.ksoftirq/n内核线程被唤醒时
以上3个是最重要的入口点。
1do_softirq(void)
2 if(in_interrupt())
3 return;如果已经在硬中断或者软中断中,直接返回。
4 local_irq_save(flags);保存IF标志并屏蔽可编程中断
5 __do_softirq();
5.1 pending= local_softirq_pending();
6 __local_bh_disable()软中断计数器加1
6.1 set_softirq_pending(0);
7 local_irq_enable();打开本地CPU可编程中断
8 local_irq_disable();屏蔽本地CPU可编程中断
9 _local_bh_enable()软中断计数器减1
10 local_irq_restore(flags);打开本地CPU可编程中断,并恢复IF标志
从这段代码中可以肯定:
如果某个内核路径已经在6—9这段代码中执行(软中断计数器已经加1),此时,其它内核路径,执行到2时将会直接返回。
由于中断上下文(包括软中断)中是禁止进程切换,如果有2个内核路径进入do_softirq(),那么后一个必定是在硬中断处理程序中进入do_softirq(),又由于4—7之间是屏蔽硬中断,那么这段代码是不会被硬中断打断,也就是说,不会被其它内核路径抢占。
4—7之间不会发生抢占(包括进程抢占和硬中断),这段代码内执行了__local_bh_disable()软中断计数器加1,在7—9这段代码内,不会发生进程抢占,但是可能发生硬中断,在这之后执行的硬中断服务程序,可能进入do_softirq(),此时已经可以判断in_interrupt()必定返回1,也就可以断定在7—9这段代码内执行的硬中断服务程序在进入do_softirq()时便会立即返回。也就是说,在一个CPU内不会有两个内核路径在执行4—9之间的代码。
好了,分析了这么多就是为了得出一个关键的结论:在一个CPU上,软中断服务例程必定是串行执行(这里的例程指的是h->action(h))。
注意:这里分析的是4—9这段代码,但是,对于具体的h->action(h)内部有具体的实现。
(4)NET_RX_SOFTIRQ软中断服务例程下面分析一个具体的action-------NET_RX_SOFTIRQ
软中断例程注册流程:
net_dev_init(void)
open_softirq(NET_RX_SOFTIRQ,net_rx_action);
net_rx_action内部流程:
net_rx_action(structsoftirq_action *h)
structsoftnet_data *sd = &__get_cpu_var(softnet_data)
local_irq_disable();
while(!list_empty(&sd->poll_list)) {
structnapi_struct *n;
local_irq_enable();
n= list_first_entry(&sd->poll_list, struct napi_struct,poll_list);
work= n->poll(n, weight);
}
以上关键代码出现了一个关键的数据结构—-每CPU变量&__get_cpu_var(softnet_data)
每CPU变量的分配:
DEFINE_PER_CPU_ALIGNED(structsoftnet_data, softnet_data);
其中第二个参数是数组名
structsoftnet_data {
structQdisc *output_queue;
structQdisc **output_queue_tailp;
structlist_head poll_list; struct napi_struct结构链表
structsk_buff *completion_queue;
structsk_buff_head process_queue;
/*stats */
unsignedint processed;
unsignedint time_squeeze;
unsignedint cpu_collision;
unsignedint received_rps;
#ifdefCONFIG_RPS
structsoftnet_data *rps_ipi_list;
/*Elements below can be accessed between CPUs for RPS */
structcall_single_data csd ____cacheline_aligned_in_smp;
structsoftnet_data *rps_ipi_next;
unsignedint cpu;
unsignedint input_queue_head;
unsignedint input_queue_tail;
#endif
unsigned dropped;
structsk_buff_head input_pkt_queue;
structnapi_struct backlog;
};
structnapi_struct {
structlist_head poll_list;连接structnapi_struct结构链表
unsignedlong state;
int weight;
int (*poll)(structnapi_struct *, int);执行函数
#ifdefCONFIG_NETPOLL
spinlock_t poll_lock;
int poll_owner;
#endif
unsignedint gro_count;
structnet_device *dev;
structlist_head dev_list;
structsk_buff *gro_list;
structsk_buff *skb;
};
net_rx_action内部流程就是执行挂接到poll_list链表的structnapi_struct中的回调函数poll。
(5)e1000软中断服务例程下面分析一下网卡e1000驱动程序如何接收数据包。(使用NAPI)
从上边硬中断已经分析到,e1000硬中断处理函数最终执行__napi_schedule(adapter->napi)
__napi_schedule(structnapi_struct *n)
____napi_schedule(&__get_cpu_var(softnet_data),n);
list_add_tail(&napi->poll_list,&sd->poll_list); 加入轮询队列
__raise_softirq_irqoff(NET_RX_SOFTIRQ);触发rx软中断
这个函数功能很简单,就是将structnapi_struct加入到每CPU变量softnet_data的poll_list,并唤醒NET_RX_SOFTIRQ软中断。
那么structnapi_struct中的poll将指向什么函数?
poll函数注册流程:
e1000_probe(structpci_dev *pdev, const struct pci_device_id *ent)
netif_napi_add(netdev,&adapter->napi,e1000_clean, 64);
现在可以知道poll指向e1000_clean()
e1000_clean(structnapi_struct *napi, int budget)
adapter->clean_rx(adapter,&adapter->rx_ring[0], &work_done, budget);
根据不同的情况clean_rx指向不同的函数,注册流程:
e1000_configure_rx(structe1000_adapter *adapter)
if(adapter->netdev->mtu > ETH_DATA_LEN) {
rdlen= adapter->rx_ring[0].count *
sizeof(struct e1000_rx_desc);
adapter->clean_rx= e1000_clean_jumbo_rx_irq;
adapter->alloc_rx_buf= e1000_alloc_jumbo_rx_buffers;
}else {
rdlen= adapter->rx_ring[0].count *
sizeof(struct e1000_rx_desc);
adapter->clean_rx= e1000_clean_rx_irq;
adapter->alloc_rx_buf= e1000_alloc_rx_buffers;
}
正常情况下是调用e1000_clean_rx_irq()
e1000_clean_rx_irq()
skb= buffer_info->skb;
buffer_info->skb= NULL;
e1000_receive_skb(adapter,status, rx_desc->special, skb);
netif_receive_skb(skb);
现在看到函数最终调用netif_receive_skb(skb)来接收数据包。
这里还可以看到明确一点,在执行netif_receive_skb(skb)时,这个skb离开了e1000网卡的接收环。
在网卡硬中断处理程序中关闭了网卡中断,在软中断处理例程结束时将会打开网卡中断:
e1000_clean(structnapi_struct *napi, int budget)-->
e1000_irq_enable(adapter)-->
ew32(IMS,IMS_ENABLE_MASK);打开网卡中断
(6)多CPU下的e1000网卡数据包接收流程上面主要是分析单个CPU的情况,下面开始分析多个CPU下的情况。
先来分析一下中断信号是如何分发的,这里涉及到一个组件:APIC(可编程中断控制器)。
每个CPU都有一个本地的APIC,本地APIC都连接到一个外部的I/OAPIC,设备的IRQ线连接到I/OAPIC。I/OAPIC接收到中断信号,根据一定的算法,再将中断分配给其中某个本地APIC。也可以通过配置指定某个中断分配给某个CPU。
CPU收到中断后就会执行相应的中断处理程序(这部分的内容在前面的《linux中断分析》中有较详细的分析)。
这里我们需要明确一点:任何时刻,每个CPU上运行的进程都是不同的,中断处理程序,包括硬中断和软中断,使用的是本地CPU上运行进程的内核栈,于是可以看到,不同CPU上的中断处理程序使用不同的内核栈,这给并发创造了很好的条件。
注意:如何并发还是要看具体的中断处理程序,也许中断处理程序中加入了一些自旋锁阻止了并发运行。
现在分析一下e1000网卡在多CPU下的NAPI,首先分析一个硬中断处理例程:
1e1000_intr()-->
2 ew32(IMC,~0);关闭网卡中断-->
3 napi_schedule_prep(&adapter->napi)测试NAPI是否已经运行-->
4 !test_and_set_bit(NAPI_STATE_SCHED,&n->state)测试并设置NAPI
5 如果NAPI未运行-->
6 __napi_schedule(&adapter->napi);加入本地CPU的软中断处理队列
7 NAPI已经运行
8 直接返回
在执行了ew32(IMC,~0)之后将不会产生网卡中断,直到软中断处理例程轮询结束的时候,才打开网卡中断。
但是,在未关闭网卡中断前,可能发生了多次的网卡中断,并且可能多个CPU都收到了中断信号,则多个CPU都将进入中断处理程序e1000_intr()。
注意到4行的原子位测试和设置函数test_and_set_bit(NAPI_STATE_SCHED,&n->state),我们可以看到,只能有一个CPU测试到NAPI未运行,其它CPU将测试到NAPI已经运行,也就是说,只能有一个CPU运行__napi_schedule(&adapter->napi),在这个CPU的软中断中将会执行数据包的接收和发送例程e1000_clean()。
e1000_clean(structnapi_struct *napi, int budget)
napi_complete(napi);
__napi_complete(n);
clear_bit(NAPI_STATE_SCHED,&n->state);
从上面这段流程可以看出,直到软中断轮询结束后才会清零NAPI_STATE_SCHED标志位,其它CPU才可能执行__napi_schedule(&adapter->napi)
到这里可以得到一个重要的结论:e1000网卡NAPI轮询函数e1000_clean()只能在一个CPU上执行。
这样就没法充分利用多个CPU,由于e1000网卡只有一个接收队列(接收环),同一时刻也只能有一个CPU操作这个接收队列,所以在驱动程序层面上是不好实现并发执行。
(7)RPS实现分析但是,注意到数据包的接收,转发等工作都是在中断上下文中(包括硬中断和软中断),e1000网卡的软中断处理例程最终是调用netif_receive_skb(skb)接收数据包:
e1000_clean_rx_irq()-->
e1000_receive_skb(adapter,status, rx_desc->special, skb);-->
netif_receive_skb(skb);
在netif_receive_skb()之后还有很多处理工作,也是在软中断上下文中,而且处理工作是和进程无关的。这样就可以把后续的处理工作移到其它的CPU上执行,这也是网络协议栈的软中断的负载均衡的基本思想(分析到这里总算进入了网络协议栈的软中断的负载均衡问题了)。
下面开始分析RPS的实现流程。
netif_receive_skb(structsk_buff *skb)
cpu= get_rps_cpu(skb->dev, skb, &rflow);根据数据包选择一个CPU
enqueue_to_backlog(skb,cpu, &rflow->last_qtail);
sd= &per_cpu(softnet_data, cpu);得到每CPU变量softnet_data
__skb_queue_tail(&sd->input_pkt_queue,skb);将数据包加入接收队列
____napi_schedule(sd,&sd->backlog);启动backlog,结构是napi_struct
这里的backlog指向process_backlog,注册流程如下:
net_dev_init(void)-->
structsoftnet_data *sd = &per_cpu(softnet_data,i);初始化所有的CPU的变量softnet_data-->
sd->backlog.poll= process_backlog;
函数流程如下:
process_backlog(structnapi_struct *napi, int quota)
skb_queue_splice_tail_init(&sd->input_pkt_queue,&sd->process_queue);
while((skb = __skb_dequeue(&sd->process_queue))){
local_irq_enable();
__netif_receive_skb(skb);处理后续工作
local_irq_disable();
input_queue_head_incr(sd);
if(++work >= quota) {
local_irq_enable();
returnwork;
}
}
现在总结RPS的实现流程:将数据包加入其它CPU的接收队列sd->input_pkt_queue,并激活其它CPU的NAPI结构sd->backlog,则其它CPU将会在自己的软中断中执行process_backlog,process_backlog将会接收sd->input_pkt_queue队列中的所有数据包,并调用__netif_receive_skb()执行后续工作。
(8)防止CPU的cache频繁刷新不能随意选择CPU,为了降低CPU硬件高速缓存的刷新频率,需要把特征相似的数据包分配给同一个CPU处理。
首先分析一下CPU硬件缓存(cache)的实现原理,实现原理并不复杂,如图1所示:
图1内存与直接映射cache的映射
这是最简单的一种cache方式----------直接映射cache。
先看一下本例中cache存储器的布局:
一共可以存储4KB的主存空间,每一行存储4个字的主存空间(16字节)。
每一行的标签字段对应物理地址的12到31位。
索引字段对应物理地址的4到11位,第一行的索引号是0x000,第二行的索引号是0x010,第n行的索引号是0xn0
每一行对应16字节的连续主存空间。
每个内存地址都唯一对应cache存储器的一个行,事实上主存中的每个字节都唯一对应cache存储器中的一个字节。
多个主存地址会同时对应一个cache行。
处理器如何使用cache存储器:
处理器要访问一个内存块,首先判断内存块是否已经在cache存储器中,如果存在则直接从cache存储器中获得数据,如果不存在则从内存中获得数据,并刷新对应的cache行。
如何确定是否已经在cache存储器:
首先根据物理地址的4到11位获得cache行(行号是0xn0其中n是4到11位的值)。
判断cache行标签值是否和物理地址的12到31位相同,如果相同,命中,如果不相同,不命中。
如果命中,根据物理地址的0到3位,在cache行中获得对应的数据。
例如,物理地址0x0000824,根据4到11位82,获得cache行索引820,判断820cache行的标签是否等于0x0000,如果相等,根据0到3位的值4,起始位置是第5个字节,获得数据word1(每个word占4个字节)。
这里注意到,如果某个物理地址在cache存储器中不命中,那么会刷新整个cache行的数据(一共16个字节),这样就降低了CPU的性能,所以在程序里面要尽量减少cache不命中的几率。
注意:cache行的v标志:标记当前cache行包含最近从主存中获取的数据。cache行的d标志:该cache行与主存中的内容不一致。
当命中一个cache行时,高速缓存控制器会进行不同的操作,具体取决于存取的类型。如果是读取操作,控制器从cache行中选择数据并送到CPU寄存器中,不需要访问主存。如果是写操作,控制器可能使用两种不同的策略,分别称为通写和回写。
通写:控制器总是既写主存又写cache行。
回写:只更新cache行,不更新主存,只有当CPU执行一条要求要刷新cache行的指令时,或者一个FLUSH硬件信号产生时,才将cache行中的数据写回主存。
上面的cache实现原理是最简单的一种,还有很多复杂的实现方式,但是基本原理应该是相同的。
多处理器系统的每个处理器都有一个单独的硬件高速缓存,如果其中一个CPU修改了自己的硬件高速缓存,它就必须检查同样的数据是否包含在其它CPU的硬件高速缓存,如果存在,它必须通知其它CPU更新硬件高速缓存。当然,这一切都是由硬件来处理的。
数据包的处理有这样的特点:两个特征(ip,端口等)相似的数据包,在处理过程中,访问相同的主存区域的交集应该越大,这样就可以减少硬件高速缓存的刷新频率,所以在RPS会根据数据包的特征选择CPU,具体的实现函数是:
staticint get_rps_cpu(struct net_device *dev, struct sk_buff *skb, structrps_dev_flow **rflowp)
未使用RFS(需要通过sysctl设置RFS)时的选择算法:
skb->rxhash= jhash_3words(addr1, addr2, ports.v32, hashrnd);
if(!skb->rxhash)
skb->rxhash= 1;
map= rcu_dereference(rxqueue->rps_map);
if(map) {
tcpu= map->cpus[((u64) skb->rxhash * map->len) >> 32];
if(cpu_online(tcpu)) {
cpu= tcpu;
gotodone;
}
}
其中map结构如图2所示,这个函数将会在下面的RFS中做较详细的分析。
不仅由于cache缓存的问题,还有一些其它的原因需要把特征相同的数据包分配给同一个CPU处理,例如:
TCP的IP包的分段重组问题,一旦乱序就要重传,这种情况下,一个linux主机如果只是作为一台路由器的话,那么进入系统的一个TCP包的不同分段如果被不同的cpu处理并向一个网卡转发了,那么同步问题会很麻烦的,如果你不做同步处理,那么很可能后面的段被一个cpu先发出去了,那么在真正的接收方接收到乱序的包后就会请求重发,这是不希望的,因此还是一个cpu串行处理好。
RFS是在RPS上的改进,通过RPS已经可以把同一流的数据包分发给同一个CPU核来处理了,但是有可能出现这样的情况,即给该数据流分发的CPU核和执行处理该数据流的应用程序的CPU核不是同一个。
不仅要把同一流的数据包分发给同一个CPU核来处理,还要分发给其‘被期望’的CPU核来处理就是RFS需要解决的问题。
RFS会创建两个与数据包相关信息的CPU核映射hash表:
1.一个用于表示期望处理具有该类相关信息数据包的CPU核映射,通过recvmsg()或sendmsg()等系统调用信息来创建该hash表(称之为期望CPU表)。比如运行于CPU0核上的某应用程序调用了recvmsg()从远程机器host1上获取数据,那么NIC对从host1上发过来的数据包的分发期望CPU核就是CPU0。
2.一个用于表示最近处理过具有该类相关信息数据包的CPU核映射,称这种表为当前CPU表。该表的存在是因为有多线程的情况,比如运行在两个CPU核上的多线程程序(每个核运行一个线程)交替调用recvmsg()系统函数从同一个socket上获取远程机器host1上的数据会导致期望CPU表频繁更改。如果数据包的分发仅由期望CPU表决定则会导致数据包交替分发到这两个CPU核上,很明显,这不是我们想要的效果。
使用以下算法选择CPU:
1.如果当前CPU表对应表项未设置或者当前CPU表对应表项映射的CPU核处于离线状态,那么使用期望CPU表对应表项映射的CPU核。
2.如果当前CPU表对应表项映射的CPU核和期望CPU表对应表项映射的CPU核为同一个,那么好办,就使用这一个核。
3.如果当前CPU表对应表项映射的CPU核和期望CPU表对应表项映射的CPU核不为同一个,那么:
a)如果同一流的前一段数据包未处理完,那么必须使用当前CPU表对应表项映射的CPU核,以避免乱序。
b)如果同一流的前一段数据包已经处理完,那么则可以使用期望CPU表对应表项映射的CPU核。
下面分析一下内核是如何实现以上的CPU选择算法。
当前CPU表:flow_table= rcu_dereference(rxqueue->rps_flow_table);
期望CPU表:sock_flow_table= rcu_dereference(rps_sock_flow_table);
每个网卡设备都有一个当前CPU表,结构如图2所示:
图2当前CPU表
整个系统只有一个期望CPU表,结构如图3所示:
图3期望CPU表
两个表的结构虽然不同,但是使用的方法是相同的:
计算数据包的哈希值skb->rxhash= jhash_3words(addr1, addr2, ports.v32, hashrnd);
如果是当前CPU表,选择的CPU是flows[skb->rxhash& flow_table->mask].cpu
如果是期望CPU表,选择的CPU是ents[skb->rxhash& sock_flow_table->mask]
下面是选择算法的实现,其中tcpu是当前CPU,next_cpu是期望CPU
if(unlikely(tcpu != next_cpu) && // 当前CPU和期望CPU不相同
(tcpu == RPS_NO_CPU || !cpu_online(tcpu)//当前CPU表未设置或者当前CPU处于离线状态
((int)(per_cpu(softnet_data, tcpu).input_queue_head-
rflow->last_qtail))>= 0)) { //或者前一段数据包已经处理完毕
tcpu= rflow->cpu = next_cpu;//则选择期望的CPU
if(tcpu != RPS_NO_CPU)
rflow->last_qtail= per_cpu(softnet_data,
tcpu).input_queue_head;
}
if(tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
*rflowp= rflow;
cpu= tcpu;
gotodone;
}
这段代码涉及到两个统计量input_queue_head和last_qtail,其意义如下:
process_backlog(structnapi_struct *napi, int quota)
__netif_receive_skb(skb);
input_queue_head_incr(sd);
sd->input_queue_head++;
由以上流程可知统计量input_queue_head表示已经处理完的的数据包个数。
intnetif_receive_skb(struct sk_buff *skb)
cpu= get_rps_cpu(skb->dev, skb, &rflow);
enqueue_to_backlog(skb,cpu, &rflow->last_qtail);
input_queue_tail_incr_save(sd,qtail);
*qtail= ++sd->input_queue_tail;
由以上流程可知统计量last_qtail表示一共要处理多少个数据包。
那么input_queue_head>= last_qtail则表示前面接收到的数据包已经处理完毕了。
现在可以知道rps_sock_flow_table表就是为了让内核知道应用程序是在哪个CPU上运行,那么内核和应用程序是如何进行通信的,下面分析这个过程。
首先明确两个流程:
内核接收数据包流程:判断是否存在对应的sock,如果存在,将数据包skb加入sock的接收队列中,到此内核处理完毕,整个处理过程都在软中断中进行。
应用程序接收数据包的流程:使用read()或者其它函数,陷入内核态,执行对应的系统调用服务例程,系统调用服务例程从sock的接收队列中读取数据,并返回,如果sock中没有数据包,进程可能被挂起。
(以上的两个过程在实现分析>中有详细的讲解)
下面这段是内核的一个接收流程:
tcp_v4_rcv(structsk_buff *skb)
tcp_v4_do_rcv(sk,skb);
sock_rps_save_rxhash(sk,skb->rxhash);修改sock的哈希值
其中staticinline void sock_rps_save_rxhash(struct sock *sk, u32 rxhash)
{
if(unlikely(sk->sk_rxhash != rxhash)){如果哈希值不相同
sock_rps_reset_flow(sk);
sk->sk_rxhash= rxhash; 赋予相同的哈希值
}
}
在这个流程中sock->sk_rxhash将会被修改为最新的值,以上的流程与进程无关的,在协议栈软中断中进行。
下面是应用程序的一个接收流程:
read()陷入内核态,并执行相应的系统调用服务例程。
inet_recvmsg()
sock_rps_record_flow()
sock_flow_table= rcu_dereference(rps_sock_flow_table);
rps_record_sock_flow(sock_flow_table,sk->sk_rxhash);
其中staticinline void rps_record_sock_flow(structrps_sock_flow_table *table,u32 hash)
{
if(table && hash) {
unsignedint cpu, index = hash & table->mask;
cpu= raw_smp_processor_id();//获取当前CPU号
if(table->ents[index] != cpu)
table->ents[index]= cpu;注册CPU号
}
}
每次用户程序读取数据包都会更新rps_sock_flow_table表,保证其中的CPU号是最新的。
第一个流程保证应用程序和内核会得到相同的哈希值。
第二个流程保证内核会得到最新的CPU号。
需要这样流程,原因有以下2点:
某些网络协议在通信的过程中数据包对应的哈希值有可能改变,那么应用程序就可以通过sock->sk_rxhash获得最新的哈希值。
应用程序在哪个CPU上运行也不是固定的,如果应用程序的运行CPU改变了,可以通过更新rps_sock_flow_table表来告知内核。
到现在RPS和RFS的实现基本分析完毕,当然还有很多东西没有分析:
比如:
rxqueue->rps_flow_table表是如何初始化的,这就涉及到sysfs文件系统,在《用户空间和内核通信》中有提到,但是没有详细的分析。
rps_sock_flow_table表默认是空的,需要通过sysctl进行配置,sysctl的实现在《用户空间和内核通信》有较详细的分析。
在多CPU下进程是如何调度的,还没分析过,但是,网络协议栈的软中断处理是和进程调度无关的,在软中断处理例程中将会禁止本地CPU的进程调度。