用户通过ibv_create_cq接口创建完成队列,函数原型和常见用法如下,本节以该用法为例看下cq的创建过程。

struct ibv_cq *ibv_create_cq(struct ibv_context *context, int cqe,
                             void *cq_context,
                             struct ibv_comp_channel *channel,
                             int comp_vector);
cq = ibv_create_cq(ctx, ncqe, NULL, NULL, 0);             

用户态

ncqe为cq队列的容量,cqe_sz是cqe的大小,默认64B;mlx5_alloc_cq_buf就是通过posix_memalign分配cq队列的内存,记录到cq->buf_a。

static struct ibv_cq_ex *create_cq(struct ibv_context *context,
                   const struct ibv_cq_init_attr_ex *cq_attr,
                   int cq_alloc_flags,
                   struct mlx5dv_cq_init_attr *mlx5cq_attr)
{
	...
    ncqe = align_queue_size(cq_attr->cqe + 1);
    cqe_sz = get_cqe_size(mlx5cq_attr);
    mlx5_alloc_cq_buf(to_mctx(context), cq, &cq->buf_a, ncqe, cqe_sz);
    cq->dbrec  = mlx5_alloc_dbrec(to_mctx(context), cq->parent_domain,
                      &cq->custom_db);
	...
}

然后通过mlx5_alloc_dbrec分配dbr,dbr位于物理内存,大小为8B,同时对齐到8B,记录了软件poll到了什么位置,即ci,以及cq的状态信息,dbr地址会被记录到cqc中,当用户执行poll_cq之后会更新dbr,后续会具体介绍。

__be32 *mlx5_alloc_dbrec(struct mlx5_context *context, struct ibv_pd *pd,
             bool *custom_alloc)
{   
    struct mlx5_db_page *page;
    __be32 *db = NULL;
    int i, j;
    ...
default_alloc:
    pthread_mutex_lock(&context->dbr_map_mutex);
    
    page = list_top(&context->dbr_available_pages, struct mlx5_db_page,
            available);
    if (page)
        goto found;
    
    page = __add_page(context);
    if (!page)
        goto out;

found:
   ...
    return db;
}

分配dbr的时候首先尝试去空闲链表中获取,如果拿不到则执行__add_page,开始时空闲链表为空,因此执行__add_page。

static struct mlx5_db_page *__add_page(struct mlx5_context *context)
{   
    struct mlx5_db_page *page;
    int ps = to_mdev(context->ibv_ctx.context.device)->page_size;
    int pp;
    int i;
    int nlong;
    int ret;
    
    pp = ps / context->cache_line_size; 
    nlong = (pp + 8 * sizeof(long) - 1) / (8 * sizeof(long));
    
    page = malloc(sizeof *page + nlong * sizeof(long));
    if (!page) 
        return NULL;
    
    if (mlx5_is_extern_alloc(context))
        ret = mlx5_alloc_buf_extern(context, &page->buf, ps);
    else
        ret = mlx5_alloc_buf(&page->buf, ps, ps);
    if (ret) {
        free(page);
        return NULL;
    }
    
    page->num_db  = pp;
    page->use_cnt = 0;
    for (i = 0; i < nlong; ++i)
        page->free[i] = ~0;
    
    cl_qmap_insert(&context->dbr_map, (uintptr_t) page->buf.buf,
               &page->cl_map);
    list_add(&context->dbr_available_pages, &page->available);
    
    return page;
}

struct mlx5_db_page {
    cl_map_item_t           cl_map;
    struct list_node        available;
    struct mlx5_buf         buf;
    int             num_db;
    int             use_cnt;
    unsigned long           free[0];
};  

ps为page size,mlx5一次性分配一个物理页用于存储多个dbr,用结构体mlx5_db_page page描述。通过mlx5_alloc_buf分配了大小为ps的内存,即一个物理页,地址记录到page中buf。这里为了防止false sharing,将dbr地址对齐到了cache_line_size,因此一个物理页能存储dbr的数量pp为page大小除以cacheline大小,pp记录到page的num_db,page中use_cnt初始化为0,表示这个page中还没有dbr被占用。page中free数组相当于一个bitmap,记录了这个page中dbr的空闲情况,nlong表示为了记录num_db个dbr需要几个long,即最少需要几个long才能有num_db个bit,free数组初始化为全1,表示所有dbr都为空闲。mlx通过树和链表的方式管理每一个mlx5_db_page,因此将page插入到dbr_available_pages链表head,链表节点为list_node available,将page插入到dbr_map,树节点为cl_map_item_t cl_map。

然后再回到mlx5_alloc_dbrec的逻辑

__be32 *mlx5_alloc_dbrec(struct mlx5_context *context, struct ibv_pd *pd,
             bool *custom_alloc)
{   
    ...
    ++page->use_cnt;
    if (page->use_cnt == page->num_db)
        list_del(&page->available);
    
    for (i = 0; !page->free[i]; ++i)
        /* nothing */;

    j = ffsl(page->free[i]);
    --j;
    page->free[i] &= ~(1UL << j); 
    db = page->buf.buf + (i * 8 * sizeof(long) + j) * context->cache_line_size;

out:
    pthread_mutex_unlock(&context->dbr_map_mutex);
    
    return db;
}

首先增加use_cnt,表示又占用了一个dbr,如果use_cnt等于num_db,表示这个page已经满了,因此从空闲链表中删除。遍历free数组,找到第一个不为一的long,说明这个long里有空闲的dbr,然后通过ffsl找到free[i]中第一个为1的位置 j,然后将free[i]的第 j 位改为0,表示占用了,然后索引对应的dbr,因为一个long能存8 * sizeof(long)个dbr,因此这次分配的索引就是(i * 8 * sizeof(long) + j),最后将这个地址记录到db。

然后回到create_cq逻辑。

static struct ibv_cq_ex *create_cq(struct ibv_context *context,
                   const struct ibv_cq_init_attr_ex *cq_attr,
                   int cq_alloc_flags,
                   struct mlx5dv_cq_init_attr *mlx5cq_attr)
{
	...
    cq->dbrec[MLX5_CQ_SET_CI]   = 0; 
    cq->dbrec[MLX5_CQ_ARM_DB]   = 0; 
    cq->arm_sn          = 0; 
    cq->cqe_sz          = cqe_sz;
    cq->flags           = cq_alloc_flags;
    
    cmd_drv->buf_addr = (uintptr_t) cq->buf_a.buf;
    cmd_drv->db_addr  = (uintptr_t) cq->dbrec;
    cmd_drv->cqe_size = cqe_sz;
    {
        struct ibv_cq_init_attr_ex cq_attr_ex = *cq_attr;

        cq_attr_ex.cqe = ncqe - 1;
        ret = ibv_cmd_create_cq_ex(context, &cq_attr_ex, &cq->verbs_cq,
                       &cmd_ex.ibv_cmd, sizeof(cmd_ex),
                       &resp_ex.ibv_resp, sizeof(resp_ex),
                       CREATE_CQ_CMD_FLAGS_TS_IGNORED_EX);
    }

	...
}

初始化dbrec和cq,然后将buf地址,dbrec地址,cqe_sz记录到cmd_drv,然后执行ibv_cmd_create_cq_ex,进入到了内核态。

内核态

dma映射

int mlx5_ib_create_cq(struct ib_cq *ibcq, const struct ib_cq_init_attr *attr,
              struct ib_udata *udata)
{
	if (udata) {
        err = create_cq_user(dev, udata, cq, entries, &cqb, &cqe_size,
                     &index, &inlen);
		...
    }
}

进入内核态后执行mlx5_ib_create_cq,由于是用户态的create_cq,因此执行create_cq_user,cqb类型为mlx5_ifc_create_cq_in_bits,即第二章中介绍的create_cq的cmd,会在create_cq_user创建。

static int create_cq_user(struct mlx5_ib_dev *dev, struct ib_udata *udata,
              struct mlx5_ib_cq *cq, int entries, u32 **cqb,
              int *cqe_size, int *index, int *inlen)
{
	struct mlx5_ib_create_cq ucmd = {};
	*cqe_size = ucmd.cqe_size;
	cq->buf.umem = 
        ib_umem_get_peer(udata, ucmd.buf_addr,
                 entries * ucmd.cqe_size,
                 IB_ACCESS_LOCAL_WRITE, 0);
    ...
}

entries为cq容量,cqe_size为cqe大小,然后执行ib_umem_get_peer,因为用户态中的buf_addr或者dbr的地址均为虚拟地址,用户态软件使用虚拟地址访问cq,但是硬件需要使用总线地址访问cq,所以ib_umem_get_peer作用就是将虚拟地址连续的addr转为总线地址。

struct ib_umem *ib_umem_get_peer(struct ib_udata *udata, unsigned long addr,                  
                 size_t size, int access,                                                     
                 unsigned long peer_mem_flags)                                                
{
    return __ib_umem_get(udata, addr, size, access,                                           
                 IB_PEER_MEM_ALLOW | peer_mem_flags);
}

首先分配ib_umem umem,相关信息都会记录到umem里。

struct ib_umem *__ib_umem_get(struct ib_udata *udata,
                    unsigned long addr, size_t size, int access,
                    unsigned long peer_mem_flags)
{
    
    struct ib_umem *umem;
    struct page **page_list;
    unsigned long dma_attr = 0;
    struct mm_struct *mm;
    unsigned long npages;
    int ret;
    struct scatterlist *sg = NULL;
    unsigned int gup_flags = FOLL_WRITE;
    unsigned long dma_attrs = 0;
    ...

    umem = kzalloc(sizeof(*umem), GFP_KERNEL);
    if (!umem)
        return ERR_PTR(-ENOMEM);
    umem->context = context;
    umem->length     = size;
    umem->address    = addr;
    ...
}

然后分配page_list,page_list用于保存接下来通过get_user_pages返回的物理页,ib_umem_num_pages计算得到npages,表示这段内存一共占用了多少个物理页。然后创建sg_table,用于保存离散的物理页集合,sg被赋值为sg_table的scatterlist sgl。

{
	page_list = (struct page **) __get_free_page(GFP_KERNEL);
    if (!page_list) {
        ret = -ENOMEM;
        goto umem_kfree;
    }
    npages = ib_umem_num_pages(umem);
    ...
    cur_base = addr & PAGE_MASK;
    ret = sg_alloc_table(&umem->sg_head, npages, GFP_KERNEL);
    sg = umem->sg_head.sgl;
	...
}

接下来开始通过get_user_pages获取入参addr这个虚拟地址对应的物理页集合,get_user_pages通过每个页的虚拟地址找vma,找到之后通过follow_page_mask获取对应的物理页,如果没有分配物理页就分配,并且pin住保证不会交换,因为page_list大小为一个物理页,所以这里一次性最多传进去的page数为PAGE_SIZE / sizeof (struct page *)。

{
	while (npages) {
        cond_resched();
        down_read(&mm->mmap_sem);
        ret = get_user_pages_longterm(cur_base,
            min_t(unsigned long, npages,
            PAGE_SIZE / sizeof (struct page *)),
            gup_flags, page_list, NULL);

        if (ret < 0) {
            pr_debug("%s: failed to get user pages, nr_pages=%lu, flags=%u\n", __func__,
                   min_t(unsigned long, npages,
                    PAGE_SIZE / sizeof(struct page *)),
                   gup_flags);
            up_read(&mm->mmap_sem);
            goto umem_release;
        }

        cur_base += ret * PAGE_SIZE;
        npages -= ret;
        sg = ib_umem_add_sg_table(sg, page_list, ret,
        dma_get_max_seg_size(context->device->dma_device),
        &umem->sg_nents);
        up_read(&mm->mmap_sem);
    }

}

然后开始将返回的ret个物理页通过ib_umem_add_sg_table加入到sg_table中,因为第一次调用的时候sg里还没有保存page,所以可以通过sg_page判断是否是第一次调用。对于非第一次的调用这里会尝试合并本次操作的page到当前的sg里,通过page_to_pfn(sg_page(sg))可以拿到当前sg里物理页的pfn,然后加上sg的物理页数就得到了sg保存的连续物理页后的第一个物理页的pfn,如果和page_list[0]的pfn相等,说明是连续的,可以合并,通过设置update_cur_sg表示可以将page_list合并到当前sg。

static struct scatterlist *ib_umem_add_sg_table(struct scatterlist *sg,
        struct page **page_list,
        unsigned long npages,
        unsigned int max_seg_sz,
        int *nents)
{
    unsigned long first_pfn;
    unsigned long i = 0;
    bool update_cur_sg = false;
    bool first = !sg_page(sg);

    if (!first && (page_to_pfn(sg_page(sg)) + (sg->length >> PAGE_SHIFT) ==
                page_to_pfn(page_list[0])))
        update_cur_sg = true;
	...
}

然后开始循环添加page_list的页面到sg_table,拿到这次循环要处理的第一个page即first_page和他的pfn即first_pfn,然后从i往后看接下来有多少个物理页是连续的,len表示连续的page数量,如果可以合并到当前sg,那么直接更新sg的长度信息,然后continue。如果不能合并且不是第一次执行,那么就需要通过sg_next切换到下一个sg并更新,最后返回当前处理的sg。

static struct scatterlist *ib_umem_add_sg_table(struct scatterlist *sg,
        struct page **page_list,
        unsigned long npages,
        unsigned int max_seg_sz,
        int *nents)
{
	while (i != npages) {
        unsigned long len;
        struct page *first_page = page_list[i];

        first_pfn = page_to_pfn(first_page);                               */
        for (len = 0; i != npages &&
                first_pfn + len == page_to_pfn(page_list[i]) &&
                len < (max_seg_sz >> PAGE_SHIFT);
                len++)
            i++;

        /* Squash N contiguous pages from page_list into current sge */
        if (update_cur_sg) {
            if ((max_seg_sz - sg->length) >= (len << PAGE_SHIFT)) {
                sg_set_page(sg, sg_page(sg),
                        sg->length + (len << PAGE_SHIFT),
                        0);
                update_cur_sg = false;
                continue;
            }
            update_cur_sg = false;
        }
        /* Squash N contiguous pages into next sge or first sge */
        if (!first)
            sg = sg_next(sg);

        (*nents)++;
        sg_set_page(sg, first_page, len << PAGE_SHIFT, 0);
        first = false;
    }
    return sg;
}

最后通过sg_mark_end标记当前sg为最后一个sg,如果access属性有IB_ACCESS_RELAXED_ORDERING,那么dma_attr需要设置上DMA_ATTR_WEAK_ORDERING,表示读写可以乱序。最后通过ib_dma_map_sg_attrs将sg_table的物理页执行dma映射,实际用的就是dma_map_sg_attrs。

struct ib_umem *__ib_umem_get(struct ib_udata *udata,
                    unsigned long addr, size_t size, int access,
                    unsigned long peer_mem_flags)
{
	...
    sg_mark_end(sg);
    if (access & IB_ACCESS_RELAXED_ORDERING)
        dma_attr |= DMA_ATTR_WEAK_ORDERING;

    umem->nmap = ib_dma_map_sg_attrs(
                    context->device,
                    umem->sg_head.sgl,
                    umem->sg_nents,
                    DMA_BIDIRECTIONAL
                                  , dma_attrs
                  );  
	...
}

这里就完成了对cq buf的dma映射,回到create_cq_user的逻辑:


    page_size = mlx5_umem_find_best_cq_quantized_pgoff(
        cq->buf.umem, cqc, log_page_size, MLX5_ADAPTER_PAGE_SHIFT,
        page_offset, 64, &page_offset_quantized);
    if (!page_size) {
        err = -EINVAL;
        goto err_umem;
    }    
	err = mlx5_ib_db_map_user(context, udata, ucmd.db_addr, &cq->db);
    if (err)
        goto err_umem;

    ncont = ib_umem_num_dma_blocks(cq->buf.umem, page_size);

网卡支持多种大小的页大小,mlx5_umem_find_best_cq_quantized_pgoff就是计算出最合适的页大小,返回给pag_size,page_offset_quantized为buffer首地址相对于页的偏移,假设这里返回的还是4K,ib_umem_num_dma_blocks计算cq buff一共占用了多少个物理页。
然后执行mlx5_ib_db_map_user完成对dbr的dma映射。

int mlx5_ib_db_map_user(struct mlx5_ib_ucontext *context,
            struct ib_udata *udata, unsigned long virt,
            struct mlx5_db *db)
{   
    struct mlx5_ib_user_db_page *page;
    int err = 0;
    
    mutex_lock(&context->db_page_mutex);
    
    list_for_each_entry(page, &context->db_page_list, list)
        if ((current->mm == page->mm) &&
            (page->user_virt == (virt & PAGE_MASK)))
            goto found;
    
    page = kmalloc(sizeof(*page), GFP_KERNEL);
    if (!page) {
        err = -ENOMEM;
        goto out;
    }
    
    page->user_virt = (virt & PAGE_MASK);
    page->refcnt    = 0;
    page->umem =
        ib_umem_get_peer(udata, virt & PAGE_MASK,
                      PAGE_SIZE, 0, 0);
    if (IS_ERR(page->umem)) {
        err = PTR_ERR(page->umem);
        kfree(page);
        goto out;
    }
    mmgrab(current->mm);
    page->mm = current->mm;
    
    list_add(&page->list, &context->db_page_list);

found:
    db->dma = sg_dma_address(page->umem->sg_head.sgl) + (virt & ~PAGE_MASK);
    db->u.user_page = page;
    ++page->refcnt;

out:
    mutex_unlock(&context->db_page_mutex);
    
    return err;
}

用户态分配dbr的时候会一次性分配一个page以容纳多个dbr,因此这里核心逻辑就是判断当前dbr所在的页是否已经执行过dma映射,执行过dma映射的页会保存在db_page_list链表中,所以这里先遍历链表里所有的mlx5_ib_user_db_page,如果发现和当前要映射的page是同一个进程,并且虚拟地址相等,就说明已经映射过,那直接通过umem中的sg_table拿到首地址加上偏移就得到了dma地址;如果没有找到,说明是第一次映射,将ib_umem_get_peer完成映射,将信息保存到page的umem,然后将当前page加入到db_page_list。

cmd初始化

然后开始初始化mailbox机制中的输入,即mlx5_ifc_create_cq_in_bits,如下所示,其中cq context entry即cqc,pas为cq buff对应的物理页集合。

RDMA驱动学习(三)- cq的创建-LMLPHP
RDMA驱动学习(三)- cq的创建-LMLPHP

void mlx5_ib_populate_pas(struct ib_umem *umem, size_t page_size, __be64 *pas,
              u64 access_flags)
{
    struct ib_block_iter biter;

    rdma_umem_for_each_dma_block (umem, &biter, page_size) {
        *pas = cpu_to_be64(rdma_block_iter_dma_address(&biter) |
                   access_flags);
        pas++;
    }   
}

然后执行mlx5_ib_populate_pas,这里会将sg_table记录的物理内存即cq buff按照page_size大小记录到pas数组。
首先创建一个ib_block_iter,初始化设置__sg为scatterlist,__sg_nents 为nents,即sg_table里的成员个数。

#define rdma_umem_for_each_dma_block(umem, biter, pgsz)                        \
    for (__rdma_umem_block_iter_start(biter, umem, pgsz);                  \
         __rdma_block_iter_next(biter);)

static inline void __rdma_umem_block_iter_start(struct ib_block_iter *biter,
                        struct ib_umem *umem,
                        unsigned long pgsz)
{
    __rdma_block_iter_start(biter, umem->sg_head.sgl, umem->nmap, pgsz);
}

void __rdma_block_iter_start(struct ib_block_iter *biter,
                 struct scatterlist *sglist, unsigned int nents,
                 unsigned long pgsz)
{
    memset(biter, 0, sizeof(struct ib_block_iter));
    biter->__sg = sglist;
    biter->__sg_nents = nents;

    /* Driver provides best block size to use */
    biter->__pg_bit = __fls(pgsz);
}

struct ib_block_iter {
    /* internal states */
    struct scatterlist *__sg;   /* sg holding the current aligned block */
    dma_addr_t __dma_addr;      /* unaligned DMA address of this block */
    unsigned int __sg_nents;    /* number of SG entries */
    unsigned int __sg_advance;  /* number of bytes to advance in sg in next step */
    unsigned int __pg_bit;      /* alignment of current block */
};

然后通过__rdma_block_iter_next遍历sg_table,biter的dma_addr设置为当前scatterlist的dma地址,__sg_advance表示在当前entry中的偏移,第一次为0,所以biter第一次的__dma_addr就是第一个entry的dma地址,将dma地址记录到pas第一项,然后开始移动到下一个物理页,即将__sg_advance加上物理页大小,如果__sg_advance大于当前entry对应的物理内存长度,那么通过sg_next移动到scatterlist的下一个entry,直到遍历完成所有entry,就将所有物理页记录到了pas。

bool __rdma_block_iter_next(struct ib_block_iter *biter)
{
    unsigned int block_offset;

    if (!biter->__sg_nents || !biter->__sg)
        return false;

    biter->__dma_addr = sg_dma_address(biter->__sg) + biter->__sg_advance;
    block_offset = biter->__dma_addr & (BIT_ULL(biter->__pg_bit) - 1);
    biter->__sg_advance += BIT_ULL(biter->__pg_bit) - block_offset;

    if (biter->__sg_advance >= sg_dma_len(biter->__sg)) {
        biter->__sg_advance = 0;
        biter->__sg = sg_next(biter->__sg);
        biter->__sg_nents--;
    }
                
    return true;
}
static int create_cq_user(struct mlx5_ib_dev *dev, struct ib_udata *udata,
              struct mlx5_ib_cq *cq, int entries, u32 **cqb,
              int *cqe_size, int *index, int *inlen)
{
	...
    cqc = MLX5_ADDR_OF(create_cq_in, *cqb, cq_context);
    MLX5_SET(cqc, cqc, log_page_size,
         order_base_2(page_size) - MLX5_ADAPTER_PAGE_SHIFT);
    MLX5_SET(cqc, cqc, page_offset, page_offset_quantized);
	...
}

然后开始设置mlx5_ifc_create_cq_in_bits的cqc,cqc的格式如下所示,log_page_size表示以log表示的物理页大小;page_offset表示cq buff首地址相对物理页的偏移,对于cq这个值必须为0。
RDMA驱动学习(三)- cq的创建-LMLPHP

cmd执行

然后回到create_cq的逻辑

int mlx5_ib_create_cq(struct ib_cq *ibcq, const struct ib_cq_init_attr *attr,
              struct ib_udata *udata)
{
	...
	cqc = MLX5_ADDR_OF(create_cq_in, cqb, cq_context);
    MLX5_SET(cqc, cqc, cqe_sz,
         cqe_sz_to_mlx_sz(cqe_size,
                  cq->private_flags &
                  MLX5_IB_CQ_PR_FLAGS_CQE_128_PAD));
    MLX5_SET(cqc, cqc, log_cq_size, ilog2(entries));
    MLX5_SET(cqc, cqc, uar_page, index);
    MLX5_SET(cqc, cqc, c_eqn_or_apu_element, eqn);
    MLX5_SET64(cqc, cqc, dbr_addr, cq->db.dma);
    if (cq->create_flags & IB_UVERBS_CQ_FLAGS_IGNORE_OVERRUN)
        MLX5_SET(cqc, cqc, oi, 1);

    err = mlx5_core_create_cq(dev->mdev, &cq->mcq, cqb, inlen, out, sizeof(out));
	...
}

设置好cqc之后执行mlx5_core_create_cq,会执行到mlx5_create_cq

int mlx5_create_cq(struct mlx5_core_dev *dev, struct mlx5_core_cq *cq,
           u32 *in, int inlen, u32 *out, int outlen)
{
    int eqn = MLX5_GET(cqc, MLX5_ADDR_OF(create_cq_in, in, cq_context),
               c_eqn_or_apu_element);
    u32 din[MLX5_ST_SZ_DW(destroy_cq_in)] = {};
    struct mlx5_eq_comp *eq;
    int err;

    eq = mlx5_eqn2comp_eq(dev, eqn);
    if (IS_ERR(eq))
        return PTR_ERR(eq);

    memset(out, 0, outlen);
    MLX5_SET(create_cq_in, in, opcode, MLX5_CMD_OP_CREATE_CQ);
    err = mlx5_cmd_do(dev, in, inlen, out, outlen);
    if (err)
        return err;

    cq->cqn = MLX5_GET(create_cq_out, out, cqn);
    cq->cons_index = 0;
    cq->arm_sn     = 0;
    cq->eq         = eq; 
    cq->uid = MLX5_GET(create_cq_in, in, uid);
    refcount_set(&cq->refcount, 1); 
    init_completion(&cq->free);
	...
}

就是执行mlx5_cmd_do,通过第二章介绍的mailbox机制将cmd下发给硬件执行,执行完成后将结果通过out返回,得到cqn记录到cq,到这里cq的创建就完成了。

12-03 17:13