从dpdk1811看virtio1.1 的实现—packedring——lvyilong316 virtio1.1已经在新的kernel和dpdk pmd中陆续支持,但是网上关于这一块的介绍却比较少,唯一描述多一点的就是这个ppt:https://www.dpdk.org/wp-content/uploads/sites/35/2018/09/virtio-1.1_v4.pdf 。但是看ppt这东西总觉得还是不过瘾的。只是模糊的大概理解,但要想看清其本质还是要看代码。这篇文章主要是基于dpdk18.11中的vhost_user来分析virtio1.1具有哪些新特性,已经具体是如何工作的。 virtio1.1 关键的最大改动点就是引入了packed queue,也就是将virtio1.0中的desc ring,avail ring,used ring三个ring打包成一个desc ring了。向对应的,我们将virtio1.0这种实现方式称之为split ring。我们以vm的接收处理逻辑(vhost_user的发送逻辑)为例分析一下split 和packed方式的区别。 在virtio_dev_rx中有如下实现:点击(此处)折叠或打开if (vq_is_packed(dev)) nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count); else nb_tx = virtio_dev_rx_split(dev, vq, pkts, count); 根据后端设备是否支持VIRTIO_F_RING_PACKED这个feature,分别调用packed和split处理函数。我们先回顾了解下我们看下其分别实现的流程。split方式处理l virtio_dev_rx_split点击(此处)折叠或打开static __rte_always_inline uint32_tvirtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq, struct rte_mbuf **pkts, uint32_t count){ uint32_t pkt_idx = 0; uint16_t num_buffers; struct buf_vector buf_vec[BUF_VECTOR_MAX]; uint16_t avail_head; rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]); avail_head = *((volatile uint16_t *)&vq->avail->idx); for (pkt_idx = 0; pkt_idx count; pkt_idx++) { uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen; uint16_t nr_vec = 0; /* 为拷贝当前mbuf后续预留avail desc */ if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec, &num_buffers, avail_head, &nr_vec) 0)) { vq->shadow_used_idx -= num_buffers; break; } /* 拷贝mbuf到avail desc */ if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers) 0) { vq->shadow_used_idx -= num_buffers; break; } /* 更新last_avail_idx */ vq->last_avail_idx += num_buffers; } /* 小包的批处理拷贝 */ do_data_copy_enqueue(dev, vq); if (likely(vq->shadow_used_idx)) { flush_shadow_used_ring_split(dev, vq); /* 更新used ring */ vhost_vring_call_split(dev, vq); /* 通知前端 */ } return pkt_idx;} 其中涉及三处会和packed方式处理不同的地方,从函数名字我们也能看出,就是带有split的函数,对应一定有packed函数。split收包处理流程这里不再具体展开,下图描述了split相关数据结构。下面重点以这个图为背景大致描述一下guset接收流程。 图中黄色部分表示guest内存,绿色部分表示host内存(非共享内存)。可以看到共享内存主要由三部分也就是三个ring构成:desc ring,avail ring,used ring。这三个ring也是split模式的核心构成。 首先是desc ring,有多个desc chain构成,用来指向存放数据的地址。desc中有个flag,主要有以下几个取值:点击(此处)折叠或打开/* This marks a buffer as continuing via the next field. */#define VRING_DESC_F_NEXT 1/* This marks a buffer as write-only (otherwise read-only). */#define VRING_DESC_F_WRITE 2/* This means the buffer contains a list of buffer descriptors. */#define VRING_DESC_F_INDIRECT 4 其次是avail ring,注意avail->idx 不是desc ring的idx,而是avail->ring的idx,对应的avail->ring[idx]最后一个后端可用的(对前端来说是下一个可用)desc chain的header idx。last_avail_idx也不是desc ring的idx,记录的也是avail->ring的idx,对应的avail->ring[idx]表示上一轮拷贝用到的最后一个desc chain的header idx(avail->ring[idx+1]为本轮拷贝可用的第一个descchain的header idx)。availring中也有一个flag,目前只会取值VRING_AVAIL_F_NO_INTERRUPT,其作用是让guest通过设置这个来告诉后端(如果更新了uesd ring)暂时不用kick前端,作为前端的一个优化。 最后是uesd ring,再讲used ring前要提一下shadow_used ring,shadow_usedring是vhost user为了提高性能分配的一个ring,它和其他三个ring不同,它是host内存,guest是不感知的。仔细观察上图就可以看出shadow_used ring和uesd->ring指向的结构是完全一样的。因为shadow_used ring正是uesd ring的一个暂存的buff。当后端将数据从对应desc chain记录的内存拷贝出之后,这些descchain的idx和len就需要先暂时记录在shadow_usedring中,等这一批mbuf拷贝完后,再将shadow_used ring一次性拷贝到uesd->ring的对应位置。同样last_used_idx记录的也不是desc ring的idx,而是used->ring的idx,对应used->ring[idx]记录的是上一次后端已经处理好可以给前端释放(对于guest rx来说)的desc chain的header idx。used ring中也有一个flag,目前只会取值VRING_USED_F_NO_NOTIFY,其作用是让host使用这个值告诉前端,当用可用的avail ring时不要kick host,dpdk vhost_user默认会设置这个flag,因为后端采用的是polling模式。 另外值得一提的是,整个guest rx涉及到两次位置的向guest内存拷贝的动作,一个是将mbuf中的数据拷贝到desc中(对应函数copy_mbuf_to_desc),另一处是将shadow_uesd ring拷贝到uesd ring的过程中(对应函数flush_shadow_used_ring_split),所以如果在guest热迁移过程中这两处都会涉及到log_page的相关操作。 关于split ring的其他一些注释:1. 每个virtqueue由三部分组成: (1) Descriptor Table (2) Available Ring (3) Used Ring2. Legacy Interfaces (1)vq需要严格的按照以下顺序和pad 布局点击(此处)折叠或打开struct virtq {// The actual descriptors (16 bytes each)struct virtq_desc desc[ Queue Size ];// A ring of available descriptor heads with free-running index.struct virtq_avail avail;// Padding to the next Queue Align boundary.u8 pad[ Padding ];// A ring of used descriptor heads with free-running index.struct virtq_used used;};3. avail desc中的ring存放的是desc chain的header desc id。desc的id表示的是下一个可用(对于前端)的desc id;packed方式处理 前面回顾分析完split的处理方式后下面重点要分析packed的处理方式,这是virtio1.1改变的重点。packed的关键变化是desc ring的变化,为了更好的利用cache和硬件的亲和性(方便硬件实现virtio),将split方式中的三个ring(desc,avail,used)打包成一个packed desc ring。我们首先看些相对split desc来说packed desc有什么不同。点击(此处)折叠或打开/*split desc*/struct vring_desc { uint64_t addr; /* Address (guest-physical). */ uint32_t len; /* Length. */ uint16_t flags; /* The flags as indicated above. */ uint16_t next; /* We chain unused descriptors via this. */};/*packed desc*/struct vring_packed_desc { uint64_t addr; uint32_t len; uint16_t id; uint16_t flags;}; 我们看到addr和len名字和含义保持不变,flags看起来也没有变化,实际上其取值多了几种。下面我们具体分析其变化的原因,以及每个变化字段的含义。(1) 相对split desc去掉了next字段:我们知道在split desc中next字段是记录一个desc chain中的下一个desc idx使用的,通常配合flags这样使用: if((descs[idx].flags & VRING_DESC_F_NEXT) == 1) nextdesc = descs[ descs[idx].next]; 但是在packeddesc ring中一个desc chain一定是相邻的(可以理解为链表变为了数组),所以next字段就用不上了,上面获取nextdesc的方式可以转化为如下方式: if ((descs[idx].flags &VRING_DESC_F_NEXT) == 1) nextdesc= descs[++idx];(2) flags字段的变化:相对split desc,flags字段仍然保留,但是其取值增加了,因为要把三个ring合一,每个desc就需要更多的信息表明身份(是used还是avail)。在原有flags的基础上增加了两个flag: #defineVRING_DESC_F_AVAIL (1ULL#define VRING_DESC_F_USED (1ULL关于这两个flag如何使用后面再分析。(3) 相对split desc增加了id字段:这个id比较特殊,他是buffer id,注意不是desc的下标idx。那么这个buffer又是个什么含义呢?其实可用理解为前端guest维护的一个mbuf数组,这个buffer就是这个数组的idx,用来发送或接受数据。为了更准确描述buffer id的来历,我们看下前端是如果将一个availbuffer关联到一个desc的:对每个(foreach)将要发送的buffer, b:1.从desc ring中获取到下一个可用的desc,d;2.获取下一个可用的buffer id;3.设置d.addr的值为b的数据起始物理地址;4.设置d.len的值为b的数据长度;5.设置d.id为buffer id;6.采用如下方式生成desc的flag:(a)如果b是后端可写的,则设置VIRTQ_DESC_F_WRITE,否则不设置;(b)按照avail ring的Wrap Counter值设置VIRTQ_DESC_F_AVAIL;(c)按照avail ring 的Wrap Counter值取反设置VIRTQ_DESC_F_USED;7. 调用一下memory barrier确保desc已经被初始化;8.设置d.flags为刚刚生成的flag;9.如果d是avail ring的最后一个desc,则对Wrap Counter进行翻转;10.否则增加d指向下一个desc;附上伪代码实现:点击(此处)折叠或打开/* Note: vq->avail_wrap_count is initialized to 1 *//* Note: vq->sgs is an array same size as the ring */id = alloc_id(vq);first = vq->next_avail;sgs = 0;for (each buffer element b) {sgs++;vq->ids[vq->next_avail] = -1;vq->desc[vq->next_avail].address = get_addr(b);vq->desc[vq->next_avail].len = get_len(b);avail = vq->avail_wrap_count ? VIRTQ_DESC_F_AVAIL : 0;used = !vq->avail_wrap_count ? VIRTQ_DESC_F_USED : 0;f = get_flags(b) | avail | used;if (b is not the last buffer element) { f |= VIRTQ_DESC_F_NEXT;}/* Don't mark the 1st descriptor available until all of them are ready. */if (vq->next_avail == first) { flags = f;} else { vq->desc[vq->next_avail].flags = f;}last = vq->next_avail;vq->next_avail++;if (vq->next_avail >= vq->size) { vq->next_avail = 0; vq->avail_wrap_count \^= 1; }}vq->sgs[id] = sgs;/* ID included in the last descriptor in the list */vq->desc[last].id = id;write_memory_barrier();vq->desc[first].flags = flags;memory_barrier();if (vq->device_event.flags != RING_EVENT_FLAGS_DISABLE) { notify_device(vq);} 注意上面实现的一个细节:当需要传递多个buffer的时候,第一个desc的flag是延时到最后更新的,这样可以减少memory_barrier调用次数,一次调用确保之后的desc都已经正常初始化了(为什么最后更新flag,以及要调用memory_barrier呢?因为后端是以flag判断desc是否可以使用,所以需要确保flag设置时其他字段以及被正确设置写入内存)。最后再附一张desc ring的图。 另外注意,由于avail 和uesd都统一到了desc中,但是并不是每个字段都是必须的。avail ring和used ring是如何体现的? packed把三个ring进行了整合,但virtio的本质思想并没有变化,整个数据传输还是avail和used共同作用完成的,所以三ring合一仅仅是形式的变化,avail ring和used ring并没有消失。那么自然就有一个问题:avail ring和used ring是如何体现的? 回答这个问题前,我们先看一下virtio的vq为了支持packed发生的一些变化。点击(此处)折叠或打开struct vhost_virtqueue {… bool used_wrap_counter; bool avail_wrap_counter;…} 这两个wrap_counter分别对应avail ring和used ring,packed方式正式通过这两个bool型变量以及前面提到的packed desc新增的两个flag完成avail和uesd的区分的。 首先这两个wrap_counter在初始化队列的时候都被初始化为1; 对于avail ring,当使用了最后一个desc时则将avail_wrap_counter进行翻转(0变为1,1变为0),然后再从第一个开始;对于uesd ring,当使用了最后一个desc时将used_wrap_counter进行翻转,然后再从第一个开始。 有了上面的前提就可以说明avail desc和used desc是如果表示的了: avail desc:当desc flags关于VRING_DESC_F_AVAIL的设置和avail_wrap_counter同步,且VRING_DESC_F_USED的设置和avail_wrap_counter相反时,表示desc为avail desc。例如avail_wrap_counter为1时,flags应该设置VRING_DESC_F_AVAIL|~VRING_DESC_F_USED,当avail_wrap_counter为0时,flags应该设置~VRING_DESC_F_AVAIL|VRING_DESC_F_USED。 used desc:当desc flags关于VRING_DESC_F_USED的设置和used_wrap_counter同步,且VRING_DESC_F_AVAIL的设置也和used_wrap_counter同步时,表示desc为used desc。例如used_wrap_counter为1时,flags应该设置VRING_DESC_F_AVAIL|VRING_DESC_F_USED,当used_wrap_counter为0时,flags应该设置~VRING_DESC_F_AVAIL|~VRING_DESC_F_USED。 综上可以看出,avail desc的两个flag总是相反的(只能设置一个),而used desc的两个flag总是相同的,要么都设置,要么都不设置。 看到这里可能有人会奇怪,为什么要搞得这么麻烦呢?仅仅通过两个flags应该也可以区分出是uesd还是avail吧。那我们看下面这个图,以avail desc为例,假如仅仅靠VRING_DESC_F_AVAIL|~VRING_DESC_F_USED就表示avail desc: 图中情况表示当前avail ring满了,没有uesddesc,这个时候如果后端处理完最后一个avail desc,回绕到第一个availdesc时,就无法区分这个avail desc是新的avail desc还是已经处理过的desc。而如果结合avail_wrap_counter就很好处理了,假如本轮其值为1,则遍历到最后一个avail desc时avail_wrap_counter要被置零了,再继续遍历到第一个desc时判断是avail desc的标准就变为了~USED|AVAIL,所以第一个desc就不满足条件了。 所以我们看出引入wrap_counter的作用主要是为了解决desc ring回绕问题。在split方式中,由于对于avail ring有avail->idx存放当前最后一个可用avail desc的位置,对于uesd ring有used->idx存放最后一个可用的uesd desc位置,而packed方式中三ring合一,不再有这样一个变量表示ring的结束位置,所以才引入了这么个机制。 关于packed ring的其他一些注释:1. Packed virtqueues支持2^15 entries;2. 每个packed virtqueue 有三部分构成: (1)Descriptor Ring (2)Driver EventSuppression:后端(device)只读,用来控制后端向前端(driver)的通知(used notifications) (3)Device EventSuppression:前端(driver)只读,用来控制前端向后端(device)的通知(avail notifications)3. Write Flag,VIRTQ_DESC_F_WRITE (1)对于avail desc这个flag用来标记其关联的buffer是只读的还是只写的; (2)对于used desc这个flag用来表示去关联的buffer是否有被后端(device)写入数据;4. desc中的len (1)对于avail desc,len表示desc关联的buffer中被写入的数据长度; (2)对于uesd desc,当VIRTQ_DESC_F_WRITE被设置时,len表示后端(device)写入数据的长度,当VIRTQ_DESC_F_WRITE没有被设置时,len没有意义;5. Descriptor Chain buffer id包含在desc chain的最后一个desc中,另外,VIRTQ_DESC_F_NEXT在used desc中是没有意义的。 好了,说了这么多我们大概对packed的实现原理清楚了,那么接下来就看下具体实现,还是以vm收包方向的后端处理逻辑为例。l virtio_dev_rx_packed点击(此处)折叠或打开static __rte_always_inline uint32_tvirtio_dev_rx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq, struct rte_mbuf **pkts, uint32_t count){ uint32_t pkt_idx = 0; uint16_t num_buffers; struct buf_vector buf_vec[BUF_VECTOR_MAX]; for (pkt_idx = 0; pkt_idx count; pkt_idx++) { uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen; uint16_t nr_vec = 0; uint16_t nr_descs = 0; /* 为拷贝当前mbuf后续预留avail desc */ if (unlikely(reserve_avail_buf_packed(dev, vq, pkt_len, buf_vec, &nr_vec, &num_buffers, &nr_descs) 0)) { vq->shadow_used_idx -= num_buffers; break; } rte_prefetch0((void *)(uintptr_t)buf_vec[0].buf_addr); /* 拷贝mbuf到avail desc */ if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers) 0) { vq->shadow_used_idx -= num_buffers; break; } vq->last_avail_idx += nr_descs; if (vq->last_avail_idx >= vq->size) { vq->last_avail_idx -= vq->size; vq->avail_wrap_counter ^= 1; } } /* 小包的批处理拷贝 */ do_data_copy_enqueue(dev, vq); if (likely(vq->shadow_used_idx)) { /* 更新used ring */ flush_shadow_used_ring_packed(dev, vq); /* kick 前端 */ vhost_vring_call_packed(dev, vq); } return pkt_idx;} 函数中带有packed后缀的都是packed方式的特有处理实现。我们先看reserve_avail_buf_packed,这个函数为拷贝当前mbuf后续预留avail desc。l reserve_avail_buf_packed点击(此处)折叠或打开static inline intreserve_avail_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq, uint32_t size, struct buf_vector *buf_vec, uint16_t *nr_vec, uint16_t *num_buffers, uint16_t *nr_descs){ uint16_t avail_idx; uint16_t vec_idx = 0; uint16_t max_tries, tries = 0; uint16_t buf_id = 0; uint32_t len = 0; uint16_t desc_count; *num_buffers = 0; avail_idx = vq->last_avail_idx; /* 如果支持mergeable特性,则一个mbuf可用使用多个desc chain */ if (rxvq_is_mergeable(dev)) max_tries = vq->size - 1; else max_tries = 1; while (size > 0) { if (unlikely(++tries > max_tries)) return -1; /* 尝试填充一个desc chain */ if (unlikely(fill_vec_buf_packed(dev, vq, avail_idx, &desc_count, buf_vec, &vec_idx, &buf_id, &len, VHOST_ACCESS_RW) 0)) return -1; len = RTE_MIN(len, size); /* 将当前使用的desc chian信息同步到shadow_used_packed ring 中 */ update_shadow_used_ring_packed(vq, buf_id, len, desc_count); size -= len; avail_idx += desc_count; if (avail_idx >= vq->size) avail_idx -= vq->size; *nr_descs += desc_count; *num_buffers += 1; } *nr_vec = vec_idx; return 0;} 下面看fill_vec_buf_packed,这个函数是将mbuf填充到当前desc chain中(如果mbuf过大,不保证填完,只负责填充当前desc chian)。l fill_vec_buf_packed主要倒数第三个参数是返回的buffer id。点击(此处)折叠或打开static __rte_always_inline intfill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq, uint16_t avail_idx, uint16_t *desc_count, struct buf_vector *buf_vec, uint16_t *vec_idx, uint16_t *buf_id, uint32_t *len, uint8_t perm){ bool wrap_counter = vq->avail_wrap_counter; struct vring_packed_desc *descs = vq->desc_packed; uint16_t vec_id = *vec_idx; /* 如果avail idx发送了回绕,则wrap_counter要进行翻转 */ if (avail_idx vq->last_avail_idx) wrap_counter ^= 1; /* 判断是否是avail desc */ if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter))) return -1; *desc_count = 0; *len = 0; while (1) { if (unlikely(vec_id >= BUF_VECTOR_MAX)) return -1; *desc_count += 1; *buf_id = descs[avail_idx].id; /* buf_id记录的是使用的最后一个avail desc的id */ if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) { if (unlikely(fill_vec_buf_packed_indirect(dev, vq, &descs[avail_idx], &vec_id, buf_vec, len, perm) 0)) return -1; } else { *len += descs[avail_idx].len; if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id, descs[avail_idx].addr, descs[avail_idx].len, perm))) return -1; } if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0) break; if (++avail_idx >= vq->size) { avail_idx -= vq->size; wrap_counter ^= 1; } } *vec_idx = vec_id; return 0;} 其中需要注意的有三点,首先,buf_id记录的是使用的最后一个avail desc的buffer id,这个id会在shadow_uesd中使用,然后是当avail ring出现翻转的时候,同步翻转对应的wrap_counter。再一点就是desc_is_avail函数,用来判断当前desc是否是avail desc。l desc_is_avail点击(此处)折叠或打开static inline booldesc_is_avail(struct vring_packed_desc *desc, bool wrap_counter){ /* VRING_DESC_F_AVAIL的设置和wrap_counter一致,且VRING_DESC_F_USED的设置和wrap_counter相反时表示设avail desc */ return wrap_counter == !!(desc->flags & VRING_DESC_F_AVAIL) && wrap_counter != !!(desc->flags & VRING_DESC_F_USED);} 我们看到这个判断逻辑原理和之前我们讲的packed方式中avail和uesd desc是如果区分的相同。即availdesc需要VRING_DESC_F_AVAIL这个flag的设置和avail wrap_counter一致,且VRING_DESC_F_USED的设置和availwrap_counter相反。 下面回头看update_shadow_used_ring_packed函数,这个函数将当前使用的desc chian信息同步到shadow_used_packedring 中。l update_shadow_used_ring_packed点击(此处)折叠或打开static __rte_always_inline voidupdate_shadow_used_ring_packed(struct vhost_virtqueue *vq, uint16_t desc_idx, uint32_t len, uint16_t count){ uint16_t i = vq->shadow_used_idx++; vq->shadow_used_packed[i].id = desc_idx; /*desc chain最后一个avail desc的buffer id*/ vq->shadow_used_packed[i].len = len; vq->shadow_used_packed[i].count = count;} 注意这里的count是当前desc chain中使用的desc个数,desc_idx是当前desc chain使用的最后一个desc的buffer idx,而split方式shadow_used的id记录的是当前desc chain头部desc的id。 另外一个关键的地方,shadow_used_packed相对shadow_used_split 多了一个count字段,用来记录当前desc chain中使用的desc个数。这个作用我们后面马上分析。 flush_shadow_used_ring_packed函数用来根据shadow_used ring的信息更新uesd ring。我们看其具体实现。l update_shadow_used_ring_split点击(此处)折叠或打开static __rte_always_inline voidupdate_shadow_used_ring_split(struct vhost_virtqueue *vq, uint16_t desc_idx, uint32_t len){ uint16_t i = vq->shadow_used_idx++; vq->shadow_used_split[i].id = desc_idx; vq->shadow_used_split[i].len = len;}static __rte_always_inline voidflush_shadow_used_ring_packed(struct virtio_net *dev, struct vhost_virtqueue *vq){ int i; uint16_t used_idx = vq->last_used_idx; /* Split loop in two to save memory barriers */ for (i = 0; i vq->shadow_used_idx; i++) { vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id; vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len; /* count的作用就一个是用来判断desc ring发送回绕 */ used_idx += vq->shadow_used_packed[i].count; if (used_idx >= vq->size) used_idx -= vq->size; } rte_smp_wmb(); /* 将desc 标记为uesd desc */ for (i = 0; i vq->shadow_used_idx; i++) { uint16_t flags; if (vq->shadow_used_packed[i].len) flags = VRING_DESC_F_WRITE; else flags = 0; if (vq->used_wrap_counter) { flags |= VRING_DESC_F_USED; flags |= VRING_DESC_F_AVAIL; } else { flags &= ~VRING_DESC_F_USED; flags &= ~VRING_DESC_F_AVAIL; } vq->desc_packed[vq->last_used_idx].flags = flags; /* log page 更新热迁移bitmap*/ vhost_log_cache_used_vring(dev, vq, vq->last_used_idx * sizeof(struct vring_packed_desc), sizeof(struct vring_packed_desc)); /* count的作用另一个是用来更新last_used_idx */ vq->last_used_idx += vq->shadow_used_packed[i].count; if (vq->last_used_idx >= vq->size) { vq->used_wrap_counter ^= 1; vq->last_used_idx -= vq->size; } } rte_smp_wmb(); vq->shadow_used_idx = 0; vhost_log_cache_sync(dev, vq);} 注意为什么先更新uesd desc的其他字段,最后才一起更新flag,而不是第一次循环就一起吧flag更新了,这个原因其实我们前面讲“bufferid”的时候已经说明了。对端(前端)是更加desc 的flag判断desc是否可用的,所以在更新flag需要有memory barrier,确保其他字段以及正确初始化到内存,为了减少memory barrier的调用,所以单独进行flag更新。 这个函数需要关注的地方还是比较多的。首先我们看到shadow_used_packed 中count字段的作用,其一是用来判断desc ring发送回绕,可以看到packed方式uesd desc在desc中不是连续的,而是会跳隔:used_idx += vq->shadow_used_packed[i].count,这个在split中是不存在的,因为split中uesd有单独的ring,所以uesd是连续的,直接就可以使用起始位置和要拷贝的uesddesc长度就可以判断used ring回绕了(注意一个是判断desc ring回绕,一个是判断uesdring回绕,packed没有单独的uesd ring)。另外一个作用是用来更新last_used_idx,packed中last_used_idx表示的是使用的desc chain的最后的desc idx,而split方式中表示的是使用的desc chain的header idx。 然后就是标记desc为uesd desc,我们之前已经讲过,uesd desc需要VRING_DESC_F_USED和VRING_DESC_F_AVAIL一致且和used_wrap_counter一致。 关于通知前端的逻辑vhost_vring_call_packed我们下一次再分析。