使用atomic来避免lock

在程序中为了互斥,难免要用锁,有些时候可以通过使用atomic来避免锁,
从而更高效.

下面给出一个以太坊中的例子,就是MsgPipeRW,从名字Pipe可以看出,

他实际上就是一个pipe,相比大家对pipe已经比较熟悉了,我就不多解释了.

type MsgPipeRW struct {
w chan<- Msg
r <-chan Msg
closing chan struct{}
closed *int32
} //创建一个MsgPipeRw
func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
var (
c1, c2 = make(chan Msg), make(chan Msg)
closing = make(chan struct{})
closed = new(int32)
rw1 = &MsgPipeRW{c1, c2, closing, closed}
rw2 = &MsgPipeRW{c2, c1, closing, closed}
)
return rw1, rw2
}
pipe就像水管一样,这里MsgPipe创建了两根水管,可以自由双向流动,rw1写,rw2就可以
读到,rw2写,rw1就可以读到.原理也很简单,因为rw1写和rw2操作的是同一个chan Msg,反之亦然. 关键是这里的closed,可以想想rw1,rw2很有可能在不同的goroutine发生读写关闭等操作,
这时候要同时访问closed这个变量,难免会发生冲突,我们看看如何避免. closed如果为0表示没有关闭,1表示已经关闭,就不应该再进行读写了.
// 从pipe中读取一个msg
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
//这里不能直接*p.closed==0,要使用atomic.LoadInt32来访问
if atomic.LoadInt32(p.closed) == 0 {
...
}
return Msg{}, ErrPipeClosed
} // 写的时候也一样
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
if atomic.LoadInt32(p.closed) == 0 {
...
}
return ErrPipeClosed
}

读写消息只是读取互斥变量,没有发生写入,下面来看看close的时候如何写入

func (p *MsgPipeRW) Close() error {
if atomic.AddInt32(p.closed, 1) != 1 { //避免锁,
// someone else is already closing
atomic.StoreInt32(p.closed, 1) // avoid overflow
return nil
}
close(p.closing)
return nil
}

atomic.AddInt32能够避免我们一般这样的写法发生的并发访问.

if *p.closed==0 {
*p.closed+=1
}

感兴趣的可以修改代码试试,采用*p.closed==0这种方式,会不会造成崩溃,测试代码如下

func TestMsgPipeConcurrentClose(t *testing.T) {
rw1, _ := MsgPipe()
for i := 0; i < 10; i++ {
go rw1.Close()
}
}

atomic看似神奇的避免了锁,实际上这需要处理器的特殊指令支持,尤其是发生在多和处理器上时,atomic指令

会保证对特定地址的锁定.

atomic相对于lock的最大优势就是他只是一条特殊指令,不用发生系统上下文切换,我们都知道系统上下文切换

代价要大得多.

05-11 23:01