文章目录

简介

  • Zephyr 内核中支持队列操作,该队列是由带头指针和尾指针的单链表组成。
  • 并且提供了队列的常见操作方式,和内核中的其他对象一样,向队列中插入数据是不会进行深拷贝的,用户向其中插入数据时需要自己申请动态空间,否则就会造成内存错误。

队列数据结构

alloc_node

  • 下面是queue中节点的定义:

#ifdef __LP64__
typedef uint64_t unative_t;
#else
typedef uint32_t unative_t;
#endif

struct _sfnode {
	unative_t next_and_flags;
};

typedef struct _sfnode sys_sfnode_t;

struct alloc_node {
	sys_sfnode_t node;
	void *data;
};
  • struct alloc_node 是队列中的节点,它由一个 sys_sfnode_t 类型的 node 和一个数据指针构成,其中 node 是一个32位/64位的变量,用于保存下一个节点的指针,相当于单链表的next指针,data用于保存用户传入的数据首地址。

k_queue

struct _sflist {
	sys_sfnode_t *head;
	sys_sfnode_t *tail;
};

typedef struct _sflist sys_sflist_t;

struct k_queue {
	sys_sflist_t data_q;
	struct k_spinlock lock;
	_wait_q_t wait_q;

	_POLL_EVENT;

	SYS_PORT_TRACING_TRACKING_FIELD(k_queue)
};
  • k_queue 中包含一个 sys_sflist_t 类型的 data_q 节点,里面就是单链表的头指针和尾指针,用于队列的头部和尾部的插入操作。
  • lock 是一个自旋锁,用于多核之间实现互斥操作,确保队列中的各种操作是安全的,如果是在单核CPU中,其功能类似互斥锁。
  • wait_q 是一个双链表,用于保存那些因为等待队列数据而被挂起的线程,当向队列中插入数据时,会将优先级最高的被挂起线程从双链表中取出,将其恢复为就绪态,等待下一次被唤醒。
  • 最后是 _POLL_EVENT:
#ifdef CONFIG_POLL
#define _POLL_EVENT_OBJ_INIT(obj) \
	.poll_events = SYS_DLIST_STATIC_INIT(&obj.poll_events),
#define _POLL_EVENT sys_dlist_t poll_events
#else
#define _POLL_EVENT_OBJ_INIT(obj)
#define _POLL_EVENT
#endif
  • 当CONFIG_POLL使能之后会开启POLL机制,该机制用于同时等待多个条件被满足, poll_events 也是一个双链表,此处不讨论 poll 机制。

队列的基本操作

void k_queue_init(struct k_queue *queue)

  • 该函数用于初始化队列成员变量,例如将单链表 data_q 中的头指针和尾指针设置为NULL,将双链表 wait_q 的两个指针指向 wait_q 本身。

void k_queue_cancel_wait(struct k_queue *queue)

  • 将从等待队列中取出优先级别最高的任务,将其唤醒。

queue_insert

  • 在Zephyr的队列中,所有的数据插入操作都由 queue_insert 函数完成,下面是该函数的实现:
static int32_t queue_insert(struct k_queue *queue, void *prev, void *data, bool alloc, bool is_append)
{
	struct k_thread *first_pending_thread;
	k_spinlock_key_t key = k_spin_lock(&queue->lock);

	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_queue, queue_insert, queue, alloc);

	if (is_append) {
		/* 如果是使用 k_queue_append 从尾部插入,先获取当前的尾指针,用于后续的插入操作 */
		prev = sys_sflist_peek_tail(&queue->data_q);
	}

	/* 查找队列中被挂起的线程 */
	first_pending_thread = z_unpend_first_thread(&queue->wait_q);

	/* 如果有被挂起的线程,会将 data 放入到该线程的 tcb 中的 thread->base.swap_data 成员变量中,
	   然后将该任务唤醒,而不会将数据放入到队列中,等待调用 k_queue_get 的线程唤醒之后直接从 thread->base.swap_data 获取数据 */
	if (first_pending_thread != NULL) {
		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_queue, queue_insert, queue, alloc, K_FOREVER);

		prepare_thread_to_run(first_pending_thread, data);
		z_reschedule(&queue->lock, key);

		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_queue, queue_insert, queue, alloc, 0);

		return 0;
	}

	/* k_queue_alloc_append 和 k_queue_alloc_prepend 函数被调用时,alloc 为1,此时用户传入的只有数据,
	并不包含链表节点所需要的next 指针,这种情况下需要动态申请一个 struct alloc_node 的节点,
	然后将 anode 的 data 成员变量保存用户传入的地址,Zephyr支持的系统为32位/64位,
	这就意味着编译器为变量分配的首地址一定是4字节/8字节对齐的(访问效率),传入地址的低2位一定是0,
	sys_sfnode_init 函数便是用于设置低2位的值,Zephyr将这2位设置为1时,代表插入节点是插入时被动态创建,
	当其为0时代表节点由用户提供,后续使用将节点取出时,会根据该值决定是否需要回收内存 */
	if (alloc) {
		struct alloc_node *anode;

		anode = z_thread_malloc(sizeof(*anode));
		if (anode == NULL) {
			k_spin_unlock(&queue->lock, key);

			SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_queue, queue_insert, queue, alloc,
				-ENOMEM);

			return -ENOMEM;
		}
		anode->data = data;
		sys_sfnode_init(&anode->node, 0x1);
		data = anode;
	}
	/* 当使用的是不带 alloc 关键字的插入函数时, 将最低2位设置为0,代表节点由用户提供,后续无需回收 */ 
	else {
		sys_sfnode_init(data, 0x0);
	}

	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_queue, queue_insert, queue, alloc, K_FOREVER);

	/* 将节点插入链表中 */
	sys_sflist_insert(&queue->data_q, prev, data);
	handle_poll_events(queue, K_POLL_STATE_DATA_AVAILABLE);
	z_reschedule(&queue->lock, key);

	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_queue, queue_insert, queue, alloc, 0);

	return 0;
}
  • 此处讲解一下 struct alloc_node 中的 node 成员变量的作用:
    1. 充当单链表中的next指针
    2. 标记node来源,低2位非0代表节点由 z_thread_malloc 动态申请,否则视为用户提供

void k_queue_append(struct k_queue *queue, void *data)

void k_queue_append(struct k_queue *queue, void *data)
{
	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_queue, append, queue);

	(void)queue_insert(queue, NULL, data, false, true);

	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_queue, append, queue);
}
  • 该函数用于从队列尾部插入数据,此时 data 的前4字节必须是 sys_sfnode_t 类型,在 sys_sfnode_t 之后定义用户数据,以此来保证data的头部与链表节点是对齐的,常见的使用方式如下:
struct bt_mesh_test_msg {
	sys_snode_t _node;
	size_t len;
	uint8_t seq;
	struct bt_mesh_msg_ctx ctx;
};

void func(void)
{
	struct bt_mesh_test_msg *msg;

	if (k_mem_slab_alloc(&msg_pool, (void **)&msg, K_NO_WAIT)) {
		test_stats.recv_overflow++;
		return -EOVERFLOW;
	}

	msg->len = 0;
	msg->seq = 0;
	msg->ctx = 0;

	k_queue_append(&recv, msg);
}
  • 此处需要注意,虽然32位系统用的比较多,但是也不能用 int 替换 sys_snode_t,一旦所使用的系统由32位变为64位将造成严重错误,下面是驱动文件 zephyr/drivers/modem/hl7800.c:400 中的错误:

struct stale_socket {
	int reserved; /* first word of queue data item reserved for the kernel */
	enum net_sock_type type;
	uint8_t id;
	bool allocated;
};

int32_t k_queue_alloc_append(struct k_queue *queue, void *data)

  • k_queue_alloc_append 和 k_queue_append 唯一的差别是增加了动态申请节点的步骤,如果使用的是不带alloc的插入函数,data代表的就是整个节点,如果使用带alloc的版本,data代表的只是数据部分,不包含头部的 sys_snode_t 。

void k_queue_prepend(struct k_queue *queue, void *data)

  • k_queue_prepend 调用queue_insert从头部插入整个节点,data 是 sys_snode_t 子类的一个对象指针。

int32_t k_queue_alloc_prepend(struct k_queue *queue, void *data)

  • k_queue_prepend 调用queue_insert从头部插入整个节点,data 仅代表数据。

void k_queue_insert(struct k_queue *queue, void *prev, void *data)

  • 从某个节点后插入新节点,data 是 sys_snode_t 子类的一个对象指针。

int k_queue_append_list(struct k_queue *queue, void *head, void *tail)

  • 将单链表添加到当前队列中,该操作实际上就是改变了节点的next指针,并未进行深拷贝。

int k_queue_merge_slist(struct k_queue *queue, sys_slist_t *list)

  • k_queue_merge_slist 功能和 k_queue_insert 一致,也是将list添加到队列中,可以将 k_queue_merge_slist 当作 k_queue_append_list 的重载版本。

void *z_queue_node_peek(sys_sfnode_t *node, bool needs_free)

 void *z_queue_node_peek(sys_sfnode_t *node, bool needs_free)
{
	void *ret;

	if ((node != NULL) && (sys_sfnode_flags_get(node) != (uint8_t)0)) {
		/* If the flag is set, then the enqueue operation for this item
		 * did a behind-the scenes memory allocation of an alloc_node
		 * struct, which is what got put in the queue. Free it and pass
		 * back the data pointer.
		 */
		struct alloc_node *anode;

		anode = CONTAINER_OF(node, struct alloc_node, node);
		ret = anode->data;
		if (needs_free) {
			k_free(anode);
		}
	} else {
		/* Data was directly placed in the queue, the first word
		 * reserved for the linked list. User mode isn't allowed to
		 * do this, although it can get data sent this way.
		 */
		ret = (void *)node;
	}

	return ret;
}
  • z_queue_node_peek用于从队列中取出数据,取出时会使用 sys_sfnode_flags_get 判断低2位的值与 needs_free 来决定是否需要真的释放占用的内存。

void *k_queue_get(struct k_queue *queue, k_timeout_t timeout)

void *z_impl_k_queue_get(struct k_queue *queue, k_timeout_t timeout)
{
	k_spinlock_key_t key = k_spin_lock(&queue->lock);
	void *data;

	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_queue, get, queue, timeout);

	/* 如果队列中存在数据,将其从队列中取出 */
	if (likely(!sys_sflist_is_empty(&queue->data_q))) {
		sys_sfnode_t *node;

		node = sys_sflist_get_not_empty(&queue->data_q);
		/* 取出数据的同时会将插入时申请的内存释放,此处的 data 有不同的含义,如果调用的是alloc版本的插入函数,
		返回值 data 代表的就只包含数据,如果调用的是不带alloc版本的插入函数,返回的就是一个带数据的链表节点,
		简而言之,插入的是什么,返回的就是什么,同一个队列中如果混用两个版本的函数可能无法判断返回的具体类型 */
		data = z_queue_node_peek(node, true);
		k_spin_unlock(&queue->lock, key);

		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_queue, get, queue, timeout, data);

		return data;
	}

	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_queue, get, queue, timeout);
	
	/* 队列中无数据且等待时间为0,直接返回NULL */
	if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
		k_spin_unlock(&queue->lock, key);

		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_queue, get, queue, timeout, NULL);

		return NULL;
	}
	
	/* 进入挂起等待状态 */
	int ret = z_pend_curr(&queue->lock, key, &queue->wait_q, timeout);
	
	/* 当队列中有新数据插入、链表合并、取消等待函数被调用时,会唤醒因为等待而被挂起的线程,唤醒同时会设置唤醒之后的返回值,
	该返回值通过 z_pend_curr 进行读取,上面三个操作都会将返回值设置为0,除此之外还会设置 _current->base.swap_data 的值,
	区别在于取消等待函数 z_impl_k_queue_cancel_wait 将其设置为NULL,其余两个操作将其设置为插入数据的首地址。
	*/
	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_queue, get, queue, timeout,
		(ret != 0) ? NULL : _current->base.swap_data);

	return (ret != 0) ? NULL : _current->base.swap_data;
}
  • k_queue_get 函数等待队列中数据,调用该函数会将数据从队列中移除。

bool k_queue_remove(struct k_queue *queue, void *data)

  • 根据提供的data查找链表中是否存在对应节点,存在则将其移除, data 必须是继承自 sys_sfnode_t 的节点。

bool k_queue_unique_append(struct k_queue *queue, void *data)

  • 插入一个节点,前提是该节点不在链表中,保证该节点唯一,data 必须是继承自 sys_sfnode_t 的节点。

int k_queue_is_empty(struct k_queue *queue)

  • 队列为空返回1,否则返回0。

void *k_queue_peek_head(struct k_queue *queue)

  • 获取头节点首地址,但是节点依然在队列中。

void *k_queue_peek_tail(struct k_queue *queue)

  • 获取尾节点首地址,但是节点依然在队列中。

总结

  • Zephyr 中的队列是链式队列,队列的插入和并不会进行数据的深拷贝,使用时应避免使用栈上的内存,如果传入的对象来自于栈需要仔细考虑同步问题,否则会造成严重错误。
  • 提供alloc版本的插入函数,会从当前线程的资源池中分配空间作为节点,但是不会拷贝用户数据,当用户取出数据时会自动释放申请的内存。
  • 链表中的数据可直接合并到队列中,因为队列本身就是由链表组成的。
04-03 09:24