数据竞争和竞态条件

Go并发中有两个重要的概念:数据竞争(data race)和竞争条件(race condition)。在并发程序中,竞争问题可能是程序面临的最难也是最不容易发现的错误之一。

当有两个或多个协程同时访问同一个内存地址,并且至少有一个是写时,就会发生数据竞争,它造成的影响就是读取变量的值将变得不可知。

数据竞争产生的原因是对于同一个变量的访问不是原子性的。

避免数据竞争可以使用以下三种方式:

  • 使用原子操作
  • 使用mutex对同一区域进行互斥操作
  • 使用管道 (channel) 进行通信以保证仅且只有一个协程在进行写操作

相比于数据竞争,竞争条件也称为资源竞争,受各协程的执行顺序和时机的影响,程序的运行结果产生变化。

竞态条件产生的原因很多是对于同一个资源的一系列连续操作并不是原子性的,也就是说有可能在执行的中途被其他线程抢占,同时这个“其他线程”刚好也要访问这个资源。解决方法通常是:将这一系列操作作为一个critical section(临界区)

i := 0
mutex := sync.Mutex{}
go func() {
    mutex.Lock()
    defer mutex.Unlock()
    i = 1
}()

go func() {
    mutex.Lock()
    defer mutex.Unlock()
    i = 2
}()

这里虽然使用了锁来避免了数据竞争,但输出结果仍然是不可控的,因为变量的结果依赖于协程执行的顺序。这就是竞争条件。

乐观锁和悲观锁

概念

乐观锁和悲观锁是两种思想,用于解决并发场景下的数据竞争问题。

  • 乐观锁

    乐观锁在操作数据时非常乐观,认为别人不会同时修改数据。因此乐观锁不会上锁,只是在执行更新的时候判断一下在此期间别人是否修改了数据:如果别人没有修改数据,执行更新操作。如果数据已经被更新过了,根据不同的实现方式执行不同的操作:重试(重新读取更新然后比较)或报异常。

  • 悲观锁

    悲观锁在操作数据时比较悲观,认为别人会同时修改数据。因此操作数据时直接把数据锁住,直到操作完成后才会释放锁;上锁期间其他人不能修改数据。

应用场景:

当竞争不激烈时,即出现并发冲突的概率小。乐观锁更有优势,因为悲观锁加锁和释放锁的操作需要耗费额外的资源;当竞争激烈的时候,悲观锁有优势,因为乐观锁在执行失败的时候需要不断重试,浪费CPU资源。针对这个问题的一个思路是引入退出机制,如果重试次数超过一定阈值后失败推出。当然,应该避免在高竞争环境下使用乐观锁。

实现方式

悲观锁的实现方式是加锁,加锁既可以是对代码块加锁(如Java的synchronized关键字),也可以是对数据加锁(如MySQL中的排它锁)。

乐观锁的实现方式主要有两种:CAS机制和版本号机制,

  • CAS

    CAS机制就是Compare And Swap。他的操作逻辑是:如果内存位置V的值等于预期的A值,则将该位置更新为新值B,否则不进行任何操作。许多CAS的操作是自旋的:如果操作不成功,会一直重试,直到操作成功为止。即CAS在更新之前先比较一下,然后决定是否要更新。

    这里的比较和更新是两个操作,其原子性是由CPU支持的,在硬件层面上进行保证。

    CAS有个缺点,就是ABA问题:

    假设有两个线程——线程1和线程2,两个线程按照顺序进行以下操作:

    (1)线程1读取内存中数据为A;

    (2)线程2将该数据修改为B;

    (3)线程2将该数据修改为A;

    (4)线程1对数据进行CAS操作

    在第(4)步中,由于内存中数据仍然为A,因此CAS操作成功,但实际上该数据已经被线程2修改过了。这就是ABA问题。

    在AtomicInteger的例子中,ABA似乎没有什么危害。但是在某些场景下,ABA却会带来隐患,例如栈顶问题:一个栈的栈顶经过两次(或多次)变化又恢复了原值,虽然栈顶不变,但是栈的结构可能已发生了变化。

    对于ABA问题,比较有效的方案是引入版本号,内存中的值每发生一次变化,版本号都+1;在进行CAS操作时,不仅比较内存中的值,也会比较版本号,只有当二者都没有变化时,CAS才能执行成功。

  • 版本号机制

    版本号机制的基本思路是在数据中增加一个字段version,表示该数据的版本号,每当数据被修改,版本号加1。当某个线程查询数据时,将该数据的版本号一起查出来;当该线程更新数据时,判断当前版本号与之前读取的版本号是否一致,如果一致才进行操作。需要注意的是,这里使用了版本号作为判断数据变化的标记,实际上可以根据实际情况选用其他能够标记数据版本的字段,如时间戳等。

与悲观锁相比,乐观锁功能有很多限制,比如CAS操作只能保证单个变量的原子性。

kubernetes中的乐观并发

更新资源时

k8s中的资源都有一个metadata.ResourceVersion字段,当api-server执行update操作的时候通常会先执行get操作,server会比较该字段,如果相同则更新成功,并修改该字段,如果不同,则更新失败。

Leader Election

在kubernetes中,通常kube-scheduler和kube-controller-manager都是多副本进行部署来保证高可用的,这里利用的就是leader election机制。

leader election 指的是一个程序为了高可用会有多个副本,但是每一个时候只允许一个进程在工作。k8s基于乐观并发控制实现了leader election,使得一些控制面组件有多个副本,但只有一个副本在工作,从而达到高可用。

$ kubectl get ep -n kube-system kube-scheduler -o yaml
apiVersion: v1
kind: Endpoints
metadata:
  annotations:
    control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"kube-master-1_ad5220de-2442-11e9-91f6-52540025e0cf","leaseDurationSeconds":15,"acquireTime":"2019-02-22T02:09:15Z","renewTime":"2019-03-14T07:37:19Z","leaderTransitions":1}'
  name: kube-scheduler
  namespace: kube-system

holderIdentity:表示当前那个副本在工作

leaseDurationSeconds:租期的时间

acquireTime:获得锁的时间

renewTime:刷新锁的时间

leaderTransitions:leader 交换的次数

实现原理

其原理就是利用kubernetes中的configmap、endpoints、lease这三种锁资源实现了一个分布式锁。推荐使用Lease,因为Lease 对象本身就是用来协调租约对象的,其Spec 定义与Leader 选举机制需要操控的属性是一致的。使用Configmap 和Endpoints 对象更多是为了向后兼容,伴随着一定的负面影响。以Endpoints 为例,Leader 每隔固定周期就要续约,这使得Endpoints 对象处于不断的变化中。Endpoints 对象会被每个节点的kube-proxy 等监听,任何Endpoints 对象的变更都会推送给所有节点的kube-proxy,这为集群引入了不必要的网络流量。

  1. 大致逻辑就是多个副本会同时更新某个资源annotation中的holderIdentity字段,写入字段值的操作被称为获取锁资源。由于k8s时乐观并发,只有一个会更新成功,更新成功的这个副本就会成为leader。
  2. 在 leader 被选举成功之后,leader 为了保住自己的位置,需要定时去更新这个 Lease 资源的状态,即一个时间戳信息,表明自己有在一直工作没有出现故障,这一操作称为续约。
  3. 其他 candidate 也不是完全闲着,而是也会定期尝试获取这个资源,检查资源的信息,时间戳有没有太久没更新,否则认为原来的 leader 故障失联无法正常工作,并更新此资源的 holder 为自己,成为 leader 开始工作并同时定期续约。

使用举例

代码路径:client-go/examples/leader-election/main.go

		// leader election uses the Kubernetes API by writing to a
    // lock object, which can be a LeaseLock object (preferred),
    // a ConfigMap, or an Endpoints (deprecated) object.
    // Conflicting writes are detected and each client handles those actions
    // independently.
    config, err := buildConfig(kubeconfig)
    if err != nil {
        klog.Fatal(err)
    }
    client := clientset.NewForConfigOrDie(config)

    run := func(ctx context.Context) {
        // complete your controller loop here
        klog.Info("Controller loop...")

        select {}
    }

    // use a Go context so we can tell the leaderelection code when we
    // want to step down
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // listen for interrupts or the Linux SIGTERM signal and cancel
    // our context, which the leader election code will observe and
    // step down
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-ch
        klog.Info("Received termination, signaling shutdown")
        cancel()
    }()

    // we use the Lease lock type since edits to Leases are less common
    // and fewer objects in the cluster watch "all Leases".
    // 指定锁的资源对象,这里使用了Lease资源,还支持configmap,endpoint,或者multilock(即多种配合使用)
    lock := &resourcelock.LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Name:      leaseLockName,
            Namespace: leaseLockNamespace,
        },
        Client: client.CoordinationV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: id,
        },
    }

    // start the leader election code loop
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock: lock,
        // IMPORTANT: you MUST ensure that any code you have that
        // is protected by the lease must terminate **before**
        // you call cancel. Otherwise, you could have a background
        // loop still running and another process could
        // get elected before your background loop finished, violating
        // the stated goal of the lease.
        ReleaseOnCancel: true,
        LeaseDuration:   60 * time.Second,//租约时间
        RenewDeadline:   15 * time.Second,//更新租约的
        RetryPeriod:     5 * time.Second,//非leader节点重试时间
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                //变为leader执行的业务代码
                // we're notified when we start - this is where you would
                // usually put your code
                run(ctx)
            },
            OnStoppedLeading: func() {
                 // 进程退出
                // we can do cleanup here
                klog.Infof("leader lost: %s", id)
                os.Exit(0)
            },
            OnNewLeader: func(identity string) {
                //当产生新的leader后执行的方法
                // we're notified when new leader elected
                if identity == id {
                    // I just got the lock
                    return
                }
                klog.Infof("new leader elected: %s", identity)
            },
        },
    })
07-17 13:04