JDK源码之concurrentHashMap

当风再起时,你将找到心中的诗。

HashTable,空有线程同步的特性,因全程使用synchronized关键字,速度过慢,故被抛弃。

HashMap,新时代贵族,在HashTable的基础上做了大量优化而来,被广泛应用于各种场景中,却不再支持线程同步。

ConcurrentHashMap,在 HashTable 的基础上做了另一个方向的优化,在尽量保证速度的同时,支持全并发的读操作,支持部分并发的写操作(默认情况可达到16并发写)。

总数据结构

首先来看看 ConcurrentHashMap 的结构,既然它基于 HashTable 改造而来,它就和 HashTable 有一定的关联性。ConcurrentHashMap 中默认使用了16个Segment存储数据,如:

1
final Segment<K,V>[] segments;

每个 Segment 元素都可以看做一个传统的 Hashtable,拥有自己的写锁,看到这您明白了吗?ConcurrentHashMap 默认使用了16个 HashTable一样的东西,所以它才支持了16并发读。

Segment

再来看看Segment 又是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;

/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;

/**
* The total number of mutative operations in this segment.
* Even though this may overflows 32 bits, it provides
* sufficient accuracy for stability checks in CHM isEmpty()
* and size() methods. Accessed only either within locks or
* among other volatile reads that maintain visibility.
*/
transient int modCount;

每个 Segment 段里头又是一个 HashEntry 数组,看到这,有 HashMap/HashTable 基础的同学瞬间就反应过来了:

HashTable 和 HashMap 里边也是用一个数组来存数据,要插入数据时预习算好数组Index,然后再插入到相应的位置,如果该位置已经有数据,就将新数据和已有数据组成一条新链,链头插入 HashEntry 数组中。

没错!就是这样,Concurrent 的结构也是一样的!

值得一提的是,此处的HashEntry 和 HashMap 的 HashEntry 有些不一样,ConcurrentHashMap 的如下:

1
2
3
4
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;

HashMasp 的如下:

1
2
3
4
final K key;
V value;
Entry<K,V> next;
int hash;

可见,hash 与 key 用 final 修饰,表明一生不变放纵不羁的态度,不会发生改变;而 value 和 next 则使用 volatile 修饰,保证了线程之间的可见性。

那么,有童鞋就问了:同样的结构,为什么 ConcurrentHashMap 就支持完全并发的读,而 HashMap 就不支持呢?

具体的原因,请看下面的解释。

ConcurrentHashMap的完全并发读

get 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

在使用 get 操作时,会首先定位到某个 Segment,再定位该Segment上某个Index的元素,如果是条冲突链,就一直遍历完该链,而不管该链“已被遍历过的地方”发生了什么变化。注意这里的“已被遍历过的地方”,因为读的过程中可能某些元素会被其他线程改变,比如删除等,但由于其删除操作的特殊性,使得其支持并发性。

remove 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}

在删除之前,首先申请重入锁,得到锁后,去遍历冲突链,以便删除数据。

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

也是首先申请重入锁,申请到后再遍历冲突链。

值得注意的是,之前的 ConcurrentHashMap 使用了 final 来修饰 HashEntry 中的 next指针,那样每次删除时重新构建一条冲突链而不是申请锁,就能做到绝大部分的并发读。