自旋锁(spinlock)是一种常用的互斥同步原语,当试图进入临界区的线程使用忙等待的方式检测锁的状态,若锁未被持有则尝试获取。这种忙等待的做法无谓的消耗了处理器资源,故只适合用于临界区非常小的代码片段,如linux下的中断处理函数。
    posix 已经实现了自旋锁相关的API,其主要包括 pthread_spin_lock,pthread_spin_trylock,pthread_spin_unlock。从实现的原理上来说,它属于busy-waiting类型的锁。假设一双核的机器上有两个线程(线程a,b),他们分别运行在core0,core1上,当线程a使用pthread_spin_lock去请求锁,那么线程a就会在core0上进行忙等待并不停的进行锁请求,直到得到这个锁为止。与mutex锁不同的是,线程a通过pthread_mutex_lock操作区得到一个临界区的时候,如果此时临界区被线程b所持有,那么线程a就会阻塞,core0 会进行上下文切换将线程a置于等待队列中,此时core0就可以运行其他的任务。可见spin_lock的好处是省去用户态切换与内核态的切换。
    linux 下自旋锁的代码在include/linux/spinlock.h,
    其主要结构定义:

点击(此处)折叠或打开

  1. typedef struct {
  2.     volatile unsigned int lock;
  3. } spinlock_t;
关于volatile的作用在 Linux 内核的同步机制--原子操作中已有说明,实际上spinlock_t仅仅包含一个unsigned int类型变量    
    其关键代码:

点击(此处)折叠或打开

  1. static inline void _raw_spin_lock(spinlock_t *lock)
  2. {
  3.     __asm__ __volatile__(
  4.         spin_lock_string
  5.         :"=m" (lock->lock) : : "memory");
  6. }
  7. #define spin_lock_string
  8.     "n1:t"
  9.     "lock ; decb %0nt"
  10.     "js 2fn"
  11.     LOCK_SECTION_START("")
  12.     "2:t"
  13.     "rep;nopnt"
  14.     "cmpb $0,%0nt"
  15.     "jle 2bnt"
  16.     "jmp 1bn"
  17.     LOCK_SECTION_END
上面汇编代码中,decb %0 将lock递减,js为负跳转,即如果为负,表明锁已被持有,进入自旋。 每次执行一条空指令后(rep;nop),比较lock与0的值("cmpb $0,%0),如果lock小于或者等于0,则继续自旋(jle 2b);否则,说明锁已释放,将lock递减后返回(jmp 1b)。
在这里,采取了一种优化手段,因在大多数情况下,自旋锁是能够获取成功的,而自旋部分代码,只是在锁被持有时才执行,因此LOCK_SECTION_STARTLOCK_SECTION_END将这些代码放到专门道区.text.lock中。如果把它跟别的常用指令混在一起,会浪费指令缓存的空间。

点击(此处)折叠或打开

  1. #define LOCK_SECTION_NAME
  2.     ".text.lock." __stringify(KBUILD_BASENAME)

  3. #define LOCK_SECTION_START(extra)
  4.     ".subsection 1nt"
  5.     extra
  6.     ".ifndef " LOCK_SECTION_NAME "nt"
  7.     LOCK_SECTION_NAME ":nt"
  8.     ".endifnt"

  9. #define LOCK_SECTION_END
  10.     ".previousnt"
由于spin_try_lock不需要自旋,实现采用xchgb指令(该指令将自动锁总线,不需要使用lock前缀)。若lock原来的值为1,则说明未锁,返回真,表明成功获取锁;否则返回假,获取锁失败。

点击(此处)折叠或打开

  1. static inline int _raw_spin_trylock(spinlock_t *lock)
  2. {
  3.     char oldval;
  4.     __asm__ __volatile__(
  5.         "xchgb %b0,%1"
  6.         :"=q" (oldval), "=m" (lock->lock)
  7.         :"0" (0) : "memory");
  8.     return oldval > 0;
  9. }
附:sgi stl下旋转锁实现部分代码

点击(此处)折叠或打开

  1. int acquire_lock()
  2.         {
  3.             volatile unsigned long* __lock = &this->lock_;
  4.             if (!_Atomic_swap((unsigned long*)__lock,1))
  5.             {
  6.                 return 0;
  7.             }
  8.             unsigned __my_spin_max = mutex_spin<0>::__max;        //    30
  9.             unsigned __my_last_spins = mutex_spin<0>::__last;    //    0
  10.             //    no matter the value of __junk
  11.             volatile unsigned __junk = 17;    
  12.             unsigned __i;
  13.             for (__i = 0; __i < __my_spin_max; ++__i)
  14.             {
  15.                 if (__i < __my_spin_max/2 || *__lock)
  16.                 {
  17.                     //    __junk ^ 4
  18.                     __junk *= __junk;
  19.                     __junk *= __junk;
  20.                     __junk *= __junk;
  21.                     __junk *= __junk;
  22.                     continue;
  23.                 }
  24.                 //    when __lock is 0,record the value __i and return
  25.                 if (!_Atomic_swap((unsigned long*)__lock, 1))
  26.                 {
  27.                     // got
  28.                     // Spinning worked. Thus we're probably not being scheduled
  29.                     // against the other process with which we were contending.
  30.                     // Thus it makes sense to spin longer the next time.
  31.                     mutex_spin<0>::__last = __i;
  32.                     mutex_spin<0>::__max = mutex_spin<0>::__high_max;    //    1000
  33.                     return 0;
  34.                 }
  35.             }
  36.             //    waiting until the value of __lock is 0, no time out
  37.             mutex_spin<0>::__last = mutex_spin<0>::__low_max;            //    30
  38.             for (__i = 0; ; ++__i)
  39.             {
  40.                 int __log_nsec = __i + 6;
  41.                 if (__log_nsec > 27)
  42.                 {
  43.                     __log_nsec = 27;
  44.                 }
  45.                 if (!_Atomic_swap((unsigned long*)__lock,1))
  46.                 {
  47.                     return 0;
  48.                 }
  49.                 nsec_sleep(__log_nsec);
  50.             }
  51.             return 0;
  52.         }
   在sgi的实现中,首先判断资源是否被占用,如果没有被占用,立即返回;如果被占用,则循环进行一些简单的数学运算,每次循环继续检测是否被占用。如果在这次循环中成功获取资源,则说明自旋时间比较长,需要调高循环次数。如果在此次循环中依旧没有获取资源,则开始进入循环休眠,同样,每次定长时间休眠后,检测是否能获取资源,直到能获取资源为止。这个实现方式避免了长时间获取不了资源而一直进行自旋,超过一定时间后,进入休眠,把CPU让给其他线程,这点跟mutex锁类似。

理解旋转锁基本实现后,下面简单介绍下POSIX threads pthread_spin_lock的使用:

点击(此处)折叠或打开

  1. #include <pthread.h>
  2. #include <unistd.h>
  3. static int __val = 0;
  4. pthread_spinlock_t __spinlock;
  5. static void* thread_fun1(void *__arg)
  6. {
  7.     while(true)
  8.         {
  9.         pthread_spin_lock(&__spinlock);
  10.         ++__val;
  11.         pthread_spin_unlock(&__spinlock);
  12.         }
  13.         return NULL;
  14.  }
  15.  
  16. static void* thread_fun2(void *__arg)
  17. {
  18.     while(true)
  19.         {
  20.         pthread_spin_lock(&__spinlock);
  21.             ++__val;
  22.         pthread_spin_unlock(&__spinlock);
  23.         }
  24.         return NULL;
  25.  }
  26. int main()
  27. {
  28.     pthread_t __thread_id1;
  29.     pthread_t __thread_id2;
  30.     pthread_spin_init(&__spinlock, 0);
  31.      int __res1 = pthread_create(&__thread_id1,NULL,&thread_fun1,NULL);
  32.         int __res2 = pthread_create(&__thread_id2,NULL,&thread_fun2,NULL);
  33.         sleep(1);
  34.     pthread_spin_destroy(&__spinlock);
  35.     return 0;
  36. }
承接linux下valgrind检测跨线程访问,valgrind检测不到内存跨线程问题了。  
    参考:    
    自旋锁
    http://blog.chinaunix.net/uid-20184656-id-138057.html

    http://www.parallellabs.com/2010/01/31/pthreads-programming-spin-lock-vs-mutex-performance-analysis/








12-17 00:20