生命不息,折腾不止

生命不息,折腾不止

引入ConcurrentHashMap

模拟使用hashmap在多线程场景下发生线程不安全现象

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * 模拟hashmap在多线程场景下的出现的不安全现象之一
 * hashmap还有put丢失,jdk1.7扩容成环的问题
 */
public class Demo2 {
    public static void main(String[] args) {
        Map<String, String> hashmap = new HashMap<>();
        //开30个线程去往hashmap中添加元素
        for (int i = 1; i <= 30; i++) {
            new Thread(() -> {
                hashmap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
                System.out.println(hashmap);
            }, String.valueOf(i)).start();
        }
    }
}

运行结果如下,由于fail-fast机制的存在,出现了并发修改失败的错误

浅析Java7中的ConcurrentHashMap-LMLPHP

如何解决该问题呢?

方式一:使用hashtable

Map<String, String> hashmap = new Hashtable<>();

方式二:使用Collections.synchronizedMap

Map<String, String> hashmap = Collections.synchronizedMap(new HashMap<>());

方式三:使用并发集合容器ConcurrentHashMap

Map<String, String> hashmap = new ConcurrentHashMap<>();

浅析Java7中ConcurrentHashMap源码

数据结构

ConcurrentHashMap JDK1.7的数据结构是由Segment数组+HashEntry数组组成,其解决hash冲突的方式与jdk1.7中的hashmap方式差不多,解决线程安全是采用一种分段锁的思想,多个线程操作多个Segment是相互独立的,这样一来相比于传统的hashtable就大大提高了并发度。

浅析Java7中的ConcurrentHashMap-LMLPHP

我们在简单画个图来理解分段锁的思想:数组套数组,多个线程独立访问Segment,扩容嵌套数组

浅析Java7中的ConcurrentHashMap-LMLPHP

Segment与HashEntry

我们在来看下其Segment数组以及HashEntry数组在源码中是如何定义的。

先来看看Segment的定义:由以下我们可以看到每个Segment都是继承的ReentLock,且其内部嵌套的是HashEntry数组,Segment的数量相当于锁的数量,这些锁彼此之间福独立,即“分段锁”

//以内部类的形式定义,并且继承的ReentratLock
static final class Segment<K,V> extends ReentrantLock implements Serializable {

        private static final long serialVersionUID = 2249069246763182397L;

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    	//由此处也可以看出Segment内部嵌套的是HashEntry数组
        transient volatile HashEntry<K,V>[] table;

    	//Segment的个数
        transient int count;
		//modCount代表被修改的次数,每次Remove、put都相当于一次修改
        transient int modCount;
		//阈值
        transient int threshold;
		//负载因子
        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }

    	//以下是Segment内部的一些操作
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        	.......
        }

         private void rehash(HashEntry<K,V> node) {
             ......
         }

		 final V remove(Object key, int hash, Object value) {
         	....
         }

    	......

在来看看HashEntry的定义

//以内部类的形式定义
static final class HashEntry<K,V> {
        final int hash;
        final K key;
    	//采用volatile修饰,保证其可见性和有序性
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

		//在HashEntry数组后面链上HashEntry对象
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe类是Java提供的操作内存的类,
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

关于Unsafe类中的putOrderedObject方法,摘自Java魔法类:Unsafe应用解析

//存储变量的引用到对象的指定的偏移量处,使用volatile的存储语义
public native void putObjectVolatile(Object o, long offset, Object x);
//有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即看到。只有在field被volatile修饰符修饰时有效,而我们的HashEntry就是被volatile修饰的
public native void putOrderedObject(Object o, long offset, Object x);

构造函数

我们来看下ConcurrentHashMap的构造函数在源码中是如何定义的

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {
    private static final long serialVersionUID = 7249069246763182397L;
	//默认初始容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //默认加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //默认并发等级
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小Segment数量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大Segment数量
    static final int MAX_SEGMENTS = 1 << 16;

    //默认构造函数
    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
    }

    /**
     * initialCapacity:初始参数
     * loadFactor:加载因子
     * concurrencyLevel:并发级别即Segment的数量
     */
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        //非法数校验
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 用来记录向左按位移动的次数
        int sshift = 0;
        //用来记录Segment的数量
        int ssize = 1;
        //该段while循环保证Segment的数量是2的幂
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        //这里SegmentMask先提前减一了,
        //在hashmap中计算数组下标索引是(table.length-1)&hash
        //这里也可以推断出Segment数量一旦确定不能在变,扩容是扩Segment数组内的HashEntry数组
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //每个Segment数组内要放置多少个HashEntry数组
        int c = initialCapacity / ssize;
        //确保无余数
        if (c * ssize < initialCapacity)
            ++c;
        //确保每个Segment内部的HashEntry数组的大小一定为2的幂,当三个参数皆为默认值时,其Segment内部的table大小是2,
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        //初始化Segment数组,并填充Segment[0],阈值是(int)(cap * loadFactor),当参数皆为默认时,该值为1,当put第一个元素时不会扩容,在put就会触发扩容
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

    .....

由构造函数可以看出来

  • Segment数量默认是16,初始容量默认是16,负载因子默认是0.75,最小Segment是2
  • Segment的数量即为并发级别,且内部保证是2的幂,Segment内部的table大小也保证为2的幂
  • Segment数量一旦确定不会在更改,后续添加元素不会增加Segment的数量,而是增加Segment中链表数组的容量,这样的好处是扩容也不用针对整个ConcurrentHashMap来进行了,而是针对Segment里面的数组
  • 初始化了Segment[0],其他Segment还是null

put函数

来看看put函数

    public V put(K key, V value) {
        Segment<K,V> s;
        //value不能为空
        if (value == null)
            throw new NullPointerException();
        //通过hash函数获取关于key的hash值
        int hash = hash(key);
     	//计算要插入的Segment数组的下标,位运算提高计算速度,由于此处使用位运算,所以得保证是2的幂可以减少hash冲突,具体原因不详述
        int j = (hash >>> segmentShift) & segmentMask;
        //如果要插入的Segment为初始化,调用ensureSeggment函数进行初始化(初始化concurrentHashMap时只初始化了第一个Segment[0])
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        //调用Segment的put函数
        return s.put(key, hash, value, false);
    }

到现在我们还没有发现加锁,在接着看Segment中的put函数,可见是在该函数中加的锁,这又一次验证了是分段锁,计算完了Segment位置后,在针对某一个Segment内部进行插入的时候上锁。

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //去获取独占锁,获取锁失败进入scanAndLockForPut函数
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            //到此处肯定已经获取到锁了
            try {
                //Segment内部的HashEntry数组
                HashEntry<K,V>[] tab = table;
                //计算元素插入的位置
                int index = (tab.length - 1) & hash;
                //定位到第index个HashEntry
                HashEntry<K,V> first = entryAt(tab, index);
                //该段for循环使用头插法将元素进行插入
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        //如果在链表中找到相同的key,则新值替换旧值,并退出函数
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            //onlyIfAbsent默认为false,!onlyIfAbsent表示替换旧值
                            if (!onlyIfAbsent) {
                                e.value = value;
                                //修改次数+1
                                ++modCount;
                            }
                            break;
                        }
                        //如果没有key值相同的则遍历到链表尾部
                        e = e.next;
                    }
                    else {//已经遍历到链表尾部
                        if (node != null)//在scanAndLockForPut函数中已经建立好node
                            node.setNext(first); //把node插入链表的头部
                        else
                            //新建node,插入到链表头部
                            node = new HashEntry<K,V>(hash, key, value, first);
                        //该count代表元素的个数
                        int c = count + 1;
                        //判断是否超过阈值,超过调用rehash扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            //把node赋值给tab[index]
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                //释放锁
                unlock();
            }
            return oldValue;
        }

Segment内部的put函数涉及到一个scanAndLockForPut函数,多个线程去进行put操作,去竞争锁,那那些没获取到锁的线程它是如何处理的呢,我们来看一下scanAndLockForPut函数

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                //自旋过程中遍历链表,若发现没有重复的key值,则提前先新建一个节点为后续的插入节约时间
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                //自旋次数达到若干次后就调用lock()进行阻塞,阻塞后的线程由AQS进行管理入队列
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

该函数简化简化下来的思想如下:

//线程竞争锁失败后进入该函数
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
	//tryLock函数与Lock函数的区别就是tryLock函数获取锁失败会返回false,而不是阻塞
    while(!tryLock()){//自旋操作
        ......
        System.out.println("干点自己的事情...")
    }
}

所以scanAndLockForPut函数的策略就是拿不到锁的线程不让它直接阻塞,而是让其自旋,自旋达到一定次数之后在调用lock()进行阻塞,另外在自旋的过程中遍历了后面的HashEntry链表,如果没有发现重复的节点就提前先建立一个,为线程之后拿到锁插入节省时间。

ensureSegment函数

在ConcurrentHashMap初始化时,只初始化了Segment[0],其他的Segment数组都是null,多个线程可能同时调用ensureSegment去初始化Segment[j],所以在该函数内部应该避免重复初始化的问题,保证其线程安全。

    private Segment<K,V> ensureSegment(int k) {
        //赋值ss=this.segments
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        //第一次判断segment[j]是否被初始化
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            //使用segment[0]为原型去初始化新的segment
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            //第二次判断segment[j]是否被初始化
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                //while循环+CAS操作,当前线程成功设值或其他线程成功设值后,退出
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {//第三次判断segment[j]是否被初始化
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

可见UNSAFE.getObjectVolatile(ss, u)) == null出现了三次,多次去判断segment[j]是否被初始化了,即使如此也不能完全避免重复初始化,最后还采用CAS操作保证其只被初始化

rehash函数

我们在来看看具体是如何扩容的,在Segment内部的put函数我们看到,超过阈值后会进行扩容操作

        private void rehash(HashEntry<K,V> node) {
            //获取旧数组和其容量
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            //扩容为旧容量的2倍、设置新的阈值
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            //创建新的数组
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            //sizeMask提前减1了
            int sizeMask = newCapacity - 1;
            //遍历原数组
            for (int i = 0; i < oldCapacity ; i++) {
                //获取旧数组中的元素
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    //计算插入的索引
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  链表中只有单个元素时,直接放入新数组中去
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        //寻找链表中最后一个hash值不等于lastIdx的元素
                        for (HashEntry<K,V> last = next;last != null;last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        //一个优化,把在lastRun之后的链表元素直接链到新hash表中的lastIdx位置
                        newTable[lastIdx] = lastRun;
                        //在lastrun之前的所有链表元素,需要在新的位置逐个拷贝
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            // 把新的节点加入Hash表
            int nodeIndex = node.hash & sizeMask;
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

可见扩容函数是扩容为原来数组的两倍大小,且扩容进行了一次优化,并没有对元素依次拷贝,而是先通过for循环找到lastRun位置。lastRun到链表末尾的所有元素,其hash值没有改变,所以不需要一次重新拷贝,只需要把这部分链表链到新hash表中所对应的位置即可。lastRun之前的节点则需要依次拷贝。

get函数

整个get函数相对来是实现思路不复杂,先找到在哪个Segment数组中,再去寻找具体在哪个table上,整个过程没加锁,因为Sigment中的HashEntry和HashEntry中的value都是由volatile修饰的,volatile保证了内存的可见性。

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        //先计算在哪个segment数组中
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            //计算在segment数组中的哪个HashEntry上
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                //key值和当前节点的key指向同一片地址,或者当前节点的hash等于key的hash并且equals比价后相同则说明是目标节点
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

小结

ConcurrentHashMap内容颇多且有难度,以上为简单阅读,如有不对的恳请指正。

  • 在JDK1.7中,ConcurrentHashMap是基于分段锁的思想来提高并发能力,数据结构采用Segment数组+HashEntry数组+链表来实现,每个Segment都相当于一把锁(其继承自ReentrantLock),多个线程操作多个Segment是相互独立的,Segment有多少个即为并发级别有多大。
  • Segment在ConcurrentHashMap初始化后就不会改变了,其扩容是针对每个Segment内部的HashEntry数组扩容,扩容为原来的两倍大小且进行了优化。
  • 多个线程put操作时候,竞争锁失败的线程会进行自旋,自旋达到一定次数在直接调用lock进行阻塞。
  • 初始化ConcurrentHashMap的时候只会填充第一个Segment[0],需要在多线程情况下避免重复初始化Segment[j]
  • 读操作未上锁,Segment中的HashEntry数组和hashEntry对象中的value都是用volatile修饰的
04-25 03:00