前言

抛开死锁不谈,只聊性能问题,尽管锁总能粗暴的满足同步需求,但一旦存在竞争关系,意味着一定会有线程被阻塞,竞争越激烈,被阻塞的线程越多,上下文切换次数越多,调度成本越大,显然在高并发的场景下会损害性能。在高并发高性能且要求线程安全的述求下,无锁构造(非阻塞构造)闪亮登场。

参考文档:

C# - 理论与实践中的 C# 内存模型,第 2 部分 | Microsoft Docs

volatile 关键字 (C# 参考)

一、非阻塞同步

重排序与缓存

我们观察下面这个例子:

public class Foo
{
    private int _answer;
    private bool _complete;

    void A() //A 1
    {
        _answer = 10;
        _complete = true;
    }

    void B() //B 2
    {
        if (_complete) Console.WriteLine(_answer);
    }
}

如果方法AB在不同的线程上并发运行,B可能会打印 “ 0 “ 吗?答案是会的,原因如下:

  • 编译器、CLR 或 CPU 可能会对代码/指令进行重排序(reorder)以提高效率。
  • 编译器、CLR 或 CPU 可能会进行缓存优化,导致其它线程不能马上看到变量的新值。

请务必重视它们,它们将是幽灵般的存在

int x = 0, y = 0, a = 0, b = 0;

var task1 = Task.Run(() => // A 1
{
    a = 1; // 1
    x = b; // 2
});
var task2 = Task.Run(() => // B 2
{
    b = 2; // 3
    y = a; // 4
});
Task.WaitAll(task1, task2);
Console.WriteLine("x:" + x + " y:" + y);

直觉和经验告诉我们,程序至顶向下执行:代码1一定发生在代码2之前,代码3一定发生在代码4之前,然鹅

在一个独立的线程中,每一个语句的执行顺序是可以被保证的,但在不使用lock,waithandle这样的显式同步操作时,我们就没法保证事件在不同的线程中看到的执行顺序是一致的了。尽管线程A中一定需要观察到a=1执行成功之后才会去执行x=b,但它没法确保自己观察得到线程B中对b的写入,所以A还可能会打印出y的一个旧版的值。这就叫指令重排序。

x:0 y:1 #1-2-3-4
x:2 y:0 #3-4-1-2
x:2 y:1 #1-3-2-4

可实际运行时还是有些让我们惊讶的情况:

x:0 y:0 #??

这就是缓存问题,如果两个线程在不同的CPU上执行,每一个核心有自己的缓存,这样一个线程的写入对于其它线程,在主存同步之前就是不可见的了。

内存屏障

参考博客小林野夫

处理器支持哪种内存重排序(LoadLoad重排序、LoadStore重排序、StoreStore重排序、StoreLoad重排序),就会提供相对应能够禁止重排序的指令,而这些指令就被称之为内存屏障(LoadLoad屏障、LoadStore屏障、StoreStore屏障、StoreLoad屏障)

net standard1.0 提供System.Lazy<T>来帮助你以线程安全且高效的方式(DCL)解决延迟初始化问题,只需

public class D
{
    private Lazy<Expensive> _expensive = new Lazy<Expensive>(() => new Expensive(), true);

    public Expensive GetExpensiveInstance() => _expensive.Value;
}

第一个参数是一个委托,告知如何构建,第二个参数是boolean类型,传false实现的就是上面提到的plain B非线程安全迟初始化

双检锁 double checked locking会进行一次额外的易失读(volatile read),在对象已经完成初始化时,能够避免获取锁产生的开销。

public class E
{
    private readonly object _locker = new object();
    private volatile Expensive _expensive;

    public Expensive GetExpensiveInstance()
    {
        // 额外的易失读(volatile read)
        if (_expensive == null)
        {
            lock (_locker)
            {
                if (_expensive == null) _expensive = new Expensive();
            }
        }
        
        return _expensive;
    }
}

LazyInitializer

LazyInitializer是一个静态类,提供EnsureInitialized方法,第一个参数是需要构造的变量地址,第二个参数是一个委托,告知如何构造

public class F
{
    private Expensive _expensive;

    public Expensive GetExpensiveInstance()
    {
        LazyInitializer.EnsureInitialized(ref _expensive,
            () => new Expensive());
        return _expensive;
    }
}

它使用竞争初始化模式的实现,比双检锁更快(在多核心情况下),因为它的实现完全不使用锁。这是一个很少需要用到的极端优化,并且会带来以下代价:

  • 当参与初始化的线程数大于核心数时,它会更慢。
  • 可能会因为进行了多余的初始化而浪费 CPU 资源。
  • 初始化逻辑必须是线程安全的(例如,Expensive的构造器对静态字段进行写,就不是线程安全的)。
  • 如果初始化的对象是需要进行销毁的,多余的对象需要额外的逻辑才能被销毁。

竞争初始化(race-to-initialize)模式,通过易失性和CAS,实现无锁构造

public class G
{
    private volatile Expensive _expensive;
    public Expensive Expensive
    {
        get
        {
            if (_expensive == null)
            {
                var instance = new Expensive();
                Interlocked.CompareExchange (ref _expensive, instance, null);
            }
            return _expensive;
        }
    }
}

三、线程局部存储

我们花费了大量篇幅来讲并发访问公共数据问题,前文提到的锁构造,信号构造,无锁构造本质上都是使用同步构造,使得多线程在访问公共数据时能安全的进行,然而有时我们会希望数据在线程间是隔离的,局部变量就能实现这个目的,但他们的生命周期总是那么短暂(随代码块而释放),我们期待更大作用域的隔离数据,线程局部变量(thread-local storage,TLS)就可以实现这个目的。

ThreadStatic

被ThreadStatic标记的static字段不会在线程间共享,每个执行线程都有一个单独的字段实例

Note:

  • 被标记的必须是static字段,不能在实例字段上使用(添加了也无效)
  • 请不要给被标记的字段指定初始值,因为这种初始化只会在类被构造时执行一次,影响一个线程,因此他依赖零值

如果你需要使用实例字段,或者非零值,请使用ThreadLocal<T>

public class ThreadStatic测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    [ThreadStatic] private static int _num;

    public ThreadStatic测试(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        void Work()
        {
            for (int i = 0; i < 100000; i++)
            {
                _num++;
                _testOutputHelper.WriteLine(_num.ToString());
            }
        }

        var t1 = new Thread(Work);
        var t2 = new Thread(Work);

        t1.Start();
        t2.Start();
        t1.Join();
        t2.Join();

        _testOutputHelper.WriteLine(_num.ToString());
    }
}

输出:

100000
100000
0

LocalDataStoreSlot

封装内存槽以存储本地数据。 此类不能被继承。.NET Framework 1.1加入,但在standard2.0+才有。

public sealed class LocalDataStoreSlot

.NET Framework 提供了两种机制,用于使用线程本地存储 (TLS) :LocalDataStoreSlotThreadStaticAttribute

LocalDataStoreSlotThreadStaticAttribute更慢,更尴尬。此外,数据存储为类型 Object,因此必须先将其强制转换为正确的类型,然后再使用它。

有关使用 TLS 的详细信息,请参阅 线程本地存储

同样,.NET Framework 提供了两种使用上下文本地存储的机制:LocalDataStoreSlotContextStaticAttribute。 上下文相对静态字段是用属性标记的 ContextStaticAttribute 静态字段。 请参考注解

// 同一个 LocalDataStoreSlot 对象可以跨线程使用。
LocalDataStoreSlot _slot = Thread.AllocateNamedDataSlot("mySlot");
void Work()
{
    for (int i = 0; i < 100000; i++)
    {
        int num = (int)(Thread.GetData(_slot)??0);
        Thread.SetData(_slot, num + 1);
    }
    _testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());

输出效果和ThreadStaticAttribute一样:

100000
100000
0

使用Thread.FreeNamedDataSlot("mySlot");可以释放所有线程上的指定槽,但是只有在所有对该槽的引用都出了其作用域,并且被垃圾回收后才会真正释放。这确保了只要保持对槽的引用,就能继续使用槽。

你也可以通过Thread.AllocateDataSlot()来创建一个无名槽位,与命名槽的区别是无名槽需要自行控制作用域

当然我们也可以对上面复杂的᠍᠍᠍᠍᠍Thread.GetData,Thread.SetData进行封装

LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel");
int Num
{
    get
    {
        object data = Thread.GetData(_secSlot);
        return data == null ? 0 : (int) data;    // null 相当于未初始化。
    }
    set { Thread.SetData (_secSlot, value); }
}

ThreadLocal

ThreadLocal<T>是 Framework 4.0 加入的,涵盖在netstandard1.0。它提供了可用于静态字段和实例字段的线程局部存储,并且允许设置默认值。

public class ThreadLocal测试
{
    ThreadLocal<int> _num = new ThreadLocal<int> (() => 3);
    private readonly ITestOutputHelper _testOutputHelper;


    public ThreadLocal测试(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        void Work()
        {
            for (int i = 0; i < 100000; i++)
            {
                _num.Value++;
            }
            _testOutputHelper.WriteLine(_num.ToString());
        }

        var t1 = new Thread(Work);
        var t2 = new Thread(Work);

        t1.Start();
        t2.Start();
        t1.Join();
        t2.Join();

        _testOutputHelper.WriteLine(_num.ToString());
    }
}

输出

100003
100003
3

下面这个测试非常有意思

[Fact]
void Show()
{
    var threadName = new ThreadLocal<string>(() => "Thread" + Thread.CurrentThread.ManagedThreadId);
    Parallel.For(0, 13, x =>
    {
        bool repeat = threadName.IsValueCreated;
        _testOutputHelper.WriteLine($"ThreadName = {threadName.Value} {(repeat ? "(repeat)" : "")}");
    });
    
    threadName.Dispose();  // 释放资源
}

你会发现当Parallel.For第二个参数超过你的逻辑内核后,repeat出现了!

ThreadName = Thread5 
ThreadName = Thread8 
ThreadName = Thread31 
ThreadName = Thread29 
ThreadName = Thread31 (repeat)
ThreadName = Thread30 
ThreadName = Thread18 
ThreadName = Thread12 
ThreadName = Thread32 
ThreadName = Thread28 
ThreadName = Thread33 
ThreadName = Thread35 
ThreadName = Thread34

Random类不是线程安全的,所以我们要不然在使用Random时加锁(这样限制了并发),如今我们有了ThreadLocal:

var localRandom = new ThreadLocal<Random>(() => new Random());

很轻易的就解决了线程安全问题,但是上面的版本使用的Random的无参构造方法,会依赖系统时间作为生成随机数的种子,在大概 10ms 时间内创建的两个Random对象可能会使用相同的种子,下边是解决这个问题的一个办法:

var localRandom = new ThreadLocal<Random>(() => new Random (Guid.NewGuid().GetHashCode()) );
11-15 09:16