简介

  • Mutex 实现了一个优先级继承算法,该算法可以将 Mutex 持有者的的任务优先级提高至等待队列中优先级最高的线程同等优先级。
  • Zephyr 中支持互斥操作,由于其本身是支持SMP调度机制的,关中断不能达到独占访问的目的,需要使用一个全局自旋锁来保护持有者线程优先级之类的东西,这不是 k_mutex 中的一部分。

优先级反转与优先级继承

优先级反转

  • 高优先级任务访问的临界资源被低优先级任务持有,因此高优先级任务让出CPU进入等待,而低优先级任务由于优先级较低,被中等优先级任务抢占,其他中等优先级的任务却能正常运行。
  • 从运行现象上来看,中等优先级的任务一直在执行,而高优先级任务被阻塞无法执行,就像是中等优先级任务的优先级更高,这种现象称为优先级反转。

解决方案

  • 禁止抢占
    • 在一个任务运行时,除非主动让出CPU,否则即使是比他优先级更高的线程,也不能抢占CPU。
  • 优先级继承
    • 当高优先级进程请求一个已经被被低优先级占有的临界资源时,将低优先级进程的优先级临时提升到与高优先级进程一样的级别,使得低优先级进程能更快地运行,从而更快地释放临界资源。低优先级进程离开临界区后,其优先级恢复至原本的值。
  • 优先级天花板
    • 将访问临界资源的任务的优先级提升(将该值称为优先级的天花板),所有访问该临界资源的任务,其优先级都应小于等于天花板。

Zephyr 中的解决方案

  • 在 Zephyr 中,默认采用的是基于优先级继承算法来实现加锁和解锁。

数据结构

struct k_mutex {
	/** Mutex wait queue */
	_wait_q_t wait_q;
	/** Mutex owner */
	struct k_thread *owner;

	/** Current lock count */
	uint32_t lock_count;

	/** Original thread priority */
	int owner_orig_prio;

	SYS_PORT_TRACING_TRACKING_FIELD(k_mutex)
};
  • wait_q 获取锁失败且等待时间不为0的线程会被放入等待队列中。
  • owner 所的持有线程。
  • lock_count 加锁次数,Zephyr中的 mutex 支持递归操作,同一个线程可对互斥量多次加锁,每加锁一次 lock_count 加一,每解锁一次 lock_count 减1。
  • owner_orig_prio 对互斥量进行第一次加锁时,线程的初始优先级,用于解锁时将线程的优先级进行恢复。

Mutex 初始化

  • Zephyr 提供两种初始化方式,一种是使用 Z_MUTEX_INITIALIZER 初始化,另一种是使用 k_mutex_init 函数初始化。
#define Z_MUTEX_INITIALIZER(obj) \
	{ \
	.wait_q = Z_WAIT_Q_INIT(&obj.wait_q), \
	.owner = NULL, \
	.lock_count = 0, \
	.owner_orig_prio = K_LOWEST_APPLICATION_THREAD_PRIO, \
	}
int z_impl_k_mutex_init(struct k_mutex *mutex)
{
	mutex->owner = NULL;
	mutex->lock_count = 0U;

	z_waitq_init(&mutex->wait_q);

	z_object_init(mutex);

	SYS_PORT_TRACING_OBJ_INIT(k_mutex, mutex, 0);

	return 0;
}

Mutex 加锁

  • 对 Mutex 进行加锁时分为三种情况
    • 加锁成功,返回0
    • 加锁失败,超时为0,返回错误
    • 加锁失败,超时不为0,将当前线程添加到等待队列,同时提升互斥量持有者的优先级。
  • k_mutex_lock 可以在一个线程中多次调用,实现递归互斥锁的功能,解锁次数需与加锁次数相同。
  • 下面是 k_mutex_lock 的实现:
int z_impl_k_mutex_lock(struct k_mutex *mutex, k_timeout_t timeout)
{
	int new_prio;
	k_spinlock_key_t key;
	bool resched = false;

	__ASSERT(!arch_is_in_isr(), "mutexes cannot be used inside ISRs");

	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_mutex, lock, mutex, timeout);

	key = k_spin_lock(&lock);

	/* mutex 未上锁或持有者递归调用 k_mutex_lock */
	if (likely((mutex->lock_count == 0U) || (mutex->owner == _current))) {
		
		/* 递归调用 k_mutex_lock时优先级不变,首次上锁将 mutex->owner_orig_prio 设置为线程优先级 */
		mutex->owner_orig_prio = (mutex->lock_count == 0U) ?
					_current->base.prio :
					mutex->owner_orig_prio;

		/* 记录上锁次数,标记持有线程 */
		mutex->lock_count++;
		mutex->owner = _current;

		LOG_DBG("%p took mutex %p, count: %d, orig prio: %d",
			_current, mutex, mutex->lock_count,
			mutex->owner_orig_prio);

		k_spin_unlock(&lock, key);

		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mutex, lock, mutex, timeout, 0);

		return 0;
	}

	/* timeout 等于 K_NO_WAIT,返回-EBUSY */
	if (unlikely(K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
		k_spin_unlock(&lock, key);

		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mutex, lock, mutex, timeout, -EBUSY);

		return -EBUSY;
	}

	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_mutex, lock, mutex, timeout);

	/* 通过优先级继承算法获取最高优先级,Zephyr中数字越小优先级越高 */
	new_prio = new_prio_for_inheritance(_current->base.prio,
					    mutex->owner->base.prio);

	LOG_DBG("adjusting prio up on mutex %p", mutex);

	/* 如果 new_prio 比 Mutex持有者优先级更高,将调整持有线程优先级 */
	if (z_is_prio_higher(new_prio, mutex->owner->base.prio)) {
		resched = adjust_owner_prio(mutex, new_prio);
	}

	/* 将当前线程挂起等待唤醒 */
	int got_mutex = z_pend_curr(&lock, key, &mutex->wait_q, timeout);

	LOG_DBG("on mutex %p got_mutex value: %d", mutex, got_mutex);

	LOG_DBG("%p got mutex %p (y/n): %c", _current, mutex,
		got_mutex ? 'y' : 'n');

	/* 获取Mutex 成功,返回0 */
	if (got_mutex == 0) {
		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mutex, lock, mutex, timeout, 0);
		return 0;
	}

	/* 获取超时 */
	LOG_DBG("%p timeout on mutex %p", _current, mutex);

	key = k_spin_lock(&lock);

	/* 检查在线程超时解除挂起后,Mutex是否被释放,如果已经被释放,则不用将持有者的优先级调低
	 */
	if (likely(mutex->owner != NULL)) {
		/* 等待队列中的线程在插入时按照优先级进行排序,
		 * 头部的线程优先级最高,尾部线程优先级最低,
		 * 等待队列中头节点线程的优先级即为持有线程需要调整的优先级
		 */
		struct k_thread *waiter = z_waitq_head(&mutex->wait_q);

		/* 等待队列中没有挂起线程则直接设置为当前值,有则将其设置为 waiter 的优先级 */
		new_prio = (waiter != NULL) ?
			new_prio_for_inheritance(waiter->base.prio, mutex->owner_orig_prio) :
			mutex->owner_orig_prio;

		LOG_DBG("adjusting prio down on mutex %p", mutex);

		/* 如果调整后的优先级 new_prio 和Mutex持有者优先级不一致,说明继承的优先级更高,
		 * 或者前面提高了持有者的优先级,现在上锁超时,需要将优先级恢复至原有值,
		 * 这两种情况会改变持有者优先级,在此必须进行一次上下文切换,使新的优先级立即生效
	     */
		resched = adjust_owner_prio(mutex, new_prio) || resched;
	}

	if (resched) {
		z_reschedule(&lock, key);
	} else {
		k_spin_unlock(&lock, key);
	}

	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mutex, lock, mutex, timeout, -EAGAIN);

	return -EAGAIN;
}

Mutex 解锁

  • 互斥量的解锁线程必须与加锁线程相同,否则返回错误。
  • 当线程解锁时会调用 k_mutex_unlock 函数,该函数首先会将 lock_count 减1,同时通过 owner_orig_prio 恢复当前线程的优先级。
  • 此后资源便可以被其他线程抢占,会将优先级最高的线程从等待队列中取出并唤醒,将该线程设置为互斥量的持有者。
int z_impl_k_mutex_unlock(struct k_mutex *mutex)
{
	struct k_thread *new_owner;

	__ASSERT(!arch_is_in_isr(), "mutexes cannot be used inside ISRs");

	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_mutex, unlock, mutex);

	CHECKIF(mutex->owner == NULL) {
		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mutex, unlock, mutex, -EINVAL);

		return -EINVAL;
	}

	/* 当前线程不是互斥量的持有者,返回错误 */
	CHECKIF(mutex->owner != _current) {
		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mutex, unlock, mutex, -EPERM);

		return -EPERM;
	}

	/* 尝试解锁一个未锁定的互斥量。如果当前线程等于mutex->owner,
	 * 则mutex->lock_count不能为零,因此不需要下溢检查。使用assert捕获未定义的行为。
	 */
	__ASSERT_NO_MSG(mutex->lock_count > 0U);

	LOG_DBG("mutex %p lock_count: %d", mutex, mutex->lock_count);

	/* 如果当前线程是互斥量的所有者并且计数大于1,则减少计数并返回并保持当前线程作为所有者 */
	if (mutex->lock_count > 1U) {
		mutex->lock_count--;
		goto k_mutex_unlock_return;
	}

	k_spinlock_key_t key = k_spin_lock(&lock);

	/* 如果在解锁时 mutex->owner_orig_prio 与 mutex->owner->base.prio 不相等,
	 * 说明在解锁前这段时间内,互斥量的持有者的优先级被提升过,这时需要将互斥量持有者的优先级恢复为原来的优先级
	 */
	adjust_owner_prio(mutex, mutex->owner_orig_prio);

	/* 从等待队列中将优先级最高的线程唤醒 */
	new_owner = z_unpend_first_thread(&mutex->wait_q);
	/* 将线程持有者设置为该线程 */
	mutex->owner = new_owner;

	LOG_DBG("new owner of mutex %p: %p (prio: %d)",
		mutex, new_owner, new_owner ? new_owner->base.prio : -1000);

	if (new_owner != NULL) {
		/* 由于等待队列是根据优先级进行排序的,因此唤醒的线程的优先级一定是最高的,不需要再次调整互斥量的持有者的优先级 */
		
		mutex->owner_orig_prio = new_owner->base.prio;
		arch_thread_return_value_set(new_owner, 0);
		z_ready_thread(new_owner);
		z_reschedule(&lock, key);
	} else {
		mutex->lock_count = 0U;
		k_spin_unlock(&lock, key);
	}


k_mutex_unlock_return:
	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mutex, unlock, mutex, 0);

	return 0;
}
04-17 23:20