引入ConcurrentHashMap
模拟使用hashmap在多线程场景下发生线程不安全现象
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 模拟hashmap在多线程场景下的出现的不安全现象之一
* hashmap还有put丢失,jdk1.7扩容成环的问题
*/
public class Demo2 {
public static void main(String[] args) {
Map<String, String> hashmap = new HashMap<>();
//开30个线程去往hashmap中添加元素
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
hashmap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(hashmap);
}, String.valueOf(i)).start();
}
}
}
运行结果如下,由于fail-fast机制的存在,出现了并发修改失败的错误
如何解决该问题呢?
方式一:使用hashtable
Map<String, String> hashmap = new Hashtable<>();
方式二:使用Collections.synchronizedMap
Map<String, String> hashmap = Collections.synchronizedMap(new HashMap<>());
方式三:使用并发集合容器ConcurrentHashMap
Map<String, String> hashmap = new ConcurrentHashMap<>();
浅析Java7中ConcurrentHashMap源码
数据结构
ConcurrentHashMap JDK1.7的数据结构是由Segment数组+HashEntry数组组成,其解决hash冲突的方式与jdk1.7中的hashmap方式差不多,解决线程安全是采用一种分段锁的思想,多个线程操作多个Segment是相互独立的,这样一来相比于传统的hashtable就大大提高了并发度。
我们在简单画个图来理解分段锁的思想:数组套数组,多个线程独立访问Segment,扩容嵌套数组
Segment与HashEntry
我们在来看下其Segment数组以及HashEntry数组在源码中是如何定义的。
先来看看Segment的定义:由以下我们可以看到每个Segment都是继承的ReentLock,且其内部嵌套的是HashEntry数组,Segment的数量相当于锁的数量,这些锁彼此之间福独立,即“分段锁”
//以内部类的形式定义,并且继承的ReentratLock
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//由此处也可以看出Segment内部嵌套的是HashEntry数组
transient volatile HashEntry<K,V>[] table;
//Segment的个数
transient int count;
//modCount代表被修改的次数,每次Remove、put都相当于一次修改
transient int modCount;
//阈值
transient int threshold;
//负载因子
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
//以下是Segment内部的一些操作
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
.......
}
private void rehash(HashEntry<K,V> node) {
......
}
final V remove(Object key, int hash, Object value) {
....
}
......
在来看看HashEntry的定义
//以内部类的形式定义
static final class HashEntry<K,V> {
final int hash;
final K key;
//采用volatile修饰,保证其可见性和有序性
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
//在HashEntry数组后面链上HashEntry对象
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe类是Java提供的操作内存的类,
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
关于Unsafe类中的putOrderedObject方法,摘自Java魔法类:Unsafe应用解析
//存储变量的引用到对象的指定的偏移量处,使用volatile的存储语义
public native void putObjectVolatile(Object o, long offset, Object x);
//有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即看到。只有在field被volatile修饰符修饰时有效,而我们的HashEntry就是被volatile修饰的
public native void putOrderedObject(Object o, long offset, Object x);
构造函数
我们来看下ConcurrentHashMap的构造函数在源码中是如何定义的
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
//默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认并发等级
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小Segment数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大Segment数量
static final int MAX_SEGMENTS = 1 << 16;
//默认构造函数
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* initialCapacity:初始参数
* loadFactor:加载因子
* concurrencyLevel:并发级别即Segment的数量
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//非法数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 用来记录向左按位移动的次数
int sshift = 0;
//用来记录Segment的数量
int ssize = 1;
//该段while循环保证Segment的数量是2的幂
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
//这里SegmentMask先提前减一了,
//在hashmap中计算数组下标索引是(table.length-1)&hash
//这里也可以推断出Segment数量一旦确定不能在变,扩容是扩Segment数组内的HashEntry数组
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//每个Segment数组内要放置多少个HashEntry数组
int c = initialCapacity / ssize;
//确保无余数
if (c * ssize < initialCapacity)
++c;
//确保每个Segment内部的HashEntry数组的大小一定为2的幂,当三个参数皆为默认值时,其Segment内部的table大小是2,
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//初始化Segment数组,并填充Segment[0],阈值是(int)(cap * loadFactor),当参数皆为默认时,该值为1,当put第一个元素时不会扩容,在put就会触发扩容
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
.....
由构造函数可以看出来
- Segment数量默认是16,初始容量默认是16,负载因子默认是0.75,最小Segment是2
- Segment的数量即为并发级别,且内部保证是2的幂,Segment内部的table大小也保证为2的幂
- Segment数量一旦确定不会在更改,后续添加元素不会增加Segment的数量,而是增加Segment中链表数组的容量,这样的好处是扩容也不用针对整个ConcurrentHashMap来进行了,而是针对Segment里面的数组
- 初始化了Segment[0],其他Segment还是null
put函数
来看看put函数
public V put(K key, V value) {
Segment<K,V> s;
//value不能为空
if (value == null)
throw new NullPointerException();
//通过hash函数获取关于key的hash值
int hash = hash(key);
//计算要插入的Segment数组的下标,位运算提高计算速度,由于此处使用位运算,所以得保证是2的幂可以减少hash冲突,具体原因不详述
int j = (hash >>> segmentShift) & segmentMask;
//如果要插入的Segment为初始化,调用ensureSeggment函数进行初始化(初始化concurrentHashMap时只初始化了第一个Segment[0])
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//调用Segment的put函数
return s.put(key, hash, value, false);
}
到现在我们还没有发现加锁,在接着看Segment中的put函数,可见是在该函数中加的锁,这又一次验证了是分段锁,计算完了Segment位置后,在针对某一个Segment内部进行插入的时候上锁。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//去获取独占锁,获取锁失败进入scanAndLockForPut函数
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
//到此处肯定已经获取到锁了
try {
//Segment内部的HashEntry数组
HashEntry<K,V>[] tab = table;
//计算元素插入的位置
int index = (tab.length - 1) & hash;
//定位到第index个HashEntry
HashEntry<K,V> first = entryAt(tab, index);
//该段for循环使用头插法将元素进行插入
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//如果在链表中找到相同的key,则新值替换旧值,并退出函数
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
//onlyIfAbsent默认为false,!onlyIfAbsent表示替换旧值
if (!onlyIfAbsent) {
e.value = value;
//修改次数+1
++modCount;
}
break;
}
//如果没有key值相同的则遍历到链表尾部
e = e.next;
}
else {//已经遍历到链表尾部
if (node != null)//在scanAndLockForPut函数中已经建立好node
node.setNext(first); //把node插入链表的头部
else
//新建node,插入到链表头部
node = new HashEntry<K,V>(hash, key, value, first);
//该count代表元素的个数
int c = count + 1;
//判断是否超过阈值,超过调用rehash扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//把node赋值给tab[index]
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//释放锁
unlock();
}
return oldValue;
}
Segment内部的put函数涉及到一个scanAndLockForPut函数,多个线程去进行put操作,去竞争锁,那那些没获取到锁的线程它是如何处理的呢,我们来看一下scanAndLockForPut函数
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//自旋过程中遍历链表,若发现没有重复的key值,则提前先新建一个节点为后续的插入节约时间
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//自旋次数达到若干次后就调用lock()进行阻塞,阻塞后的线程由AQS进行管理入队列
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
该函数简化简化下来的思想如下:
//线程竞争锁失败后进入该函数
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//tryLock函数与Lock函数的区别就是tryLock函数获取锁失败会返回false,而不是阻塞
while(!tryLock()){//自旋操作
......
System.out.println("干点自己的事情...")
}
}
所以scanAndLockForPut函数的策略就是拿不到锁的线程不让它直接阻塞,而是让其自旋,自旋达到一定次数之后在调用lock()进行阻塞,另外在自旋的过程中遍历了后面的HashEntry链表,如果没有发现重复的节点就提前先建立一个,为线程之后拿到锁插入节省时间。
ensureSegment函数
在ConcurrentHashMap初始化时,只初始化了Segment[0],其他的Segment数组都是null,多个线程可能同时调用ensureSegment去初始化Segment[j],所以在该函数内部应该避免重复初始化的问题,保证其线程安全。
private Segment<K,V> ensureSegment(int k) {
//赋值ss=this.segments
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//第一次判断segment[j]是否被初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//使用segment[0]为原型去初始化新的segment
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//第二次判断segment[j]是否被初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//while循环+CAS操作,当前线程成功设值或其他线程成功设值后,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {//第三次判断segment[j]是否被初始化
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
可见UNSAFE.getObjectVolatile(ss, u)) == null出现了三次,多次去判断segment[j]是否被初始化了,即使如此也不能完全避免重复初始化,最后还采用CAS操作保证其只被初始化
rehash函数
我们在来看看具体是如何扩容的,在Segment内部的put函数我们看到,超过阈值后会进行扩容操作
private void rehash(HashEntry<K,V> node) {
//获取旧数组和其容量
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
//扩容为旧容量的2倍、设置新的阈值
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
//创建新的数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
//sizeMask提前减1了
int sizeMask = newCapacity - 1;
//遍历原数组
for (int i = 0; i < oldCapacity ; i++) {
//获取旧数组中的元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
//计算插入的索引
int idx = e.hash & sizeMask;
if (next == null) // 链表中只有单个元素时,直接放入新数组中去
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//寻找链表中最后一个hash值不等于lastIdx的元素
for (HashEntry<K,V> last = next;last != null;last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//一个优化,把在lastRun之后的链表元素直接链到新hash表中的lastIdx位置
newTable[lastIdx] = lastRun;
//在lastrun之前的所有链表元素,需要在新的位置逐个拷贝
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 把新的节点加入Hash表
int nodeIndex = node.hash & sizeMask;
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
可见扩容函数是扩容为原来数组的两倍大小,且扩容进行了一次优化,并没有对元素依次拷贝,而是先通过for循环找到lastRun位置。lastRun到链表末尾的所有元素,其hash值没有改变,所以不需要一次重新拷贝,只需要把这部分链表链到新hash表中所对应的位置即可。lastRun之前的节点则需要依次拷贝。
get函数
整个get函数相对来是实现思路不复杂,先找到在哪个Segment数组中,再去寻找具体在哪个table上,整个过程没加锁,因为Sigment中的HashEntry和HashEntry中的value都是由volatile修饰的,volatile保证了内存的可见性。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
//先计算在哪个segment数组中
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//计算在segment数组中的哪个HashEntry上
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//key值和当前节点的key指向同一片地址,或者当前节点的hash等于key的hash并且equals比价后相同则说明是目标节点
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
小结
ConcurrentHashMap内容颇多且有难度,以上为简单阅读,如有不对的恳请指正。
- 在JDK1.7中,ConcurrentHashMap是基于分段锁的思想来提高并发能力,数据结构采用Segment数组+HashEntry数组+链表来实现,每个Segment都相当于一把锁(其继承自ReentrantLock),多个线程操作多个Segment是相互独立的,Segment有多少个即为并发级别有多大。
- Segment在ConcurrentHashMap初始化后就不会改变了,其扩容是针对每个Segment内部的HashEntry数组扩容,扩容为原来的两倍大小且进行了优化。
- 多个线程put操作时候,竞争锁失败的线程会进行自旋,自旋达到一定次数在直接调用lock进行阻塞。
- 初始化ConcurrentHashMap的时候只会填充第一个Segment[0],需要在多线程情况下避免重复初始化Segment[j]
- 读操作未上锁,Segment中的HashEntry数组和hashEntry对象中的value都是用volatile修饰的