ConcurrentHashMap源码解读

ConcurrentHashMap是线程安全的HashMap,JDK8之后,采用volatile+自旋锁+Synchronized(锁桶)方式实现并发控制,并发性能非常高。下面介绍下ConcurrentHashMap的具体细节。

ConcurrentHashMap结构

JDK8中,ConcurrentHashMap和HashMap基本一致,由桶数组+链表/红黑树构成。ConcurrentHashMap的类成员变量和HashMap基本一致,包括初始大小、阈值、加载因子、链表转树阈值等等。除此之外,ConcurrentHashMap还多了下面这些成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Minimum number of rebinnings per transfer step. Ranges are
* subdivided to allow multiple resizer threads. This value
* serves as a lower bound to avoid resizers encountering
* excessive memory contention. The value should be at least
* DEFAULT_CAPACITY.
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;

/**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes,表示同正在迁移
static final int TREEBIN = -2; // hash for roots of trees,表示正在转成红黑树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();

put()方法

有几个细节,这里先总结一下:

  • 先判断桶数组是否创建,没创建的话,调用initTable()初始化桶数组;

    • 和HashMap初始化不同,这里采用CAS判断是否有其他线程抢先进行了初始化。
  • 外层加了一个for循环for (Node<K,V>[] tab = table;;),不断自旋;

  • 没有碰撞,即桶数组(n-1) & hash处是null,casTabAt自旋插入(防止同时有其他线程执行了相同key的插入操作);

  • 存在hash碰撞

    • 如果正在扩容,线程转去帮助扩容;
    • 不在扩容,那么准备插入,执行synchronized (f)给当前桶加锁,然后再插入;
    • 插入成功后,判断是否要树化或者扩容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//hash,对hashcode再散列,高16位和低16位异或
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//迭代桶数组,自旋,直到成功put
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)//懒加载。如果为空,则进行初始化
tab = initTable();//初始化桶数组
//(n - 1) & hash)计算下标,取值,为空即无hash碰撞
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//这里同样采用自旋插入,防止插入冲突
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))//通过cas插入新值
break; // no lock when adding to empty bin
}
//判断是否正在扩容。如果正在扩容,当前线程帮助进行扩容。
//每个线程只能同时负责一个桶上的数据迁移,并且不影响其它桶的put和get操作。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {//put5,存在hash碰撞
V oldVal = null;
//此处,f在上面已经被赋值,f为当前下标桶的首元素。对链表来说是链表头对红黑树来说是红黑树的头元素。
synchronized (f) {
//再次检查当前节点是否有变化,有变化进入下一轮自旋
//为什么再次检查?因为不能保证,当前线程到这里,有没有其他线程对该节点进行修改
if (tabAt(tab, i) == f) {
if (fh >= 0) {//当前桶为链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {//迭代链表节点
K ek;
if (e.hash == hash &&//key相同,覆盖(onlyIfAbsent有什么用?)
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//找到链表尾部,插入新节点。(什么这里不用CAS?因为这在同步代码块里面)
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//当前桶为红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {//想红黑树插入新节点
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//树化。binCount > 8,进行树化,链表转红黑树
if (binCount >= TREEIFY_THRESHOLD)
//如果容量 < 64则直接进行扩容;不转红黑树。
//(你想想,假如容量为16,你就插入了9个元素,巧了,都在同一个桶里面,
//如果这时进行树化,时间复杂度会增加,性能下降,不如直接进行扩容,空间换时间)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//扩容。addCount内部会进行判断要不要扩容
return null;
}

transfer()方法

这里简单总结下扩容的流程,和一些重要细节:

  1. 扩容前,先作一些准备工作,包括设置并发数、创建新数组等操作;

  2. 判断桶是不是已经迁移完毕了,或者桶中没有数据,如果是的话,直接跳到下一个桶;

  3. 如果桶中有数据,并且还没有迁移,那么给桶加锁,开始迁移。

    • 迁移完毕后,把旧桶头结点设置成ForwardingNodesetTabAt(tab, i, fwd);表示改桶已经被迁移了,这样如果其他线程执行get或者put操作时,发现它是fwd节点,就会到新桶中去执行相应操作。(这一点在get和put的源码中有所体现)

    • 旧数组和新数组中的桶都是共享变量,加锁的时候只锁了旧数组中的桶,新数组中的桶并没有加锁,这样会不会有线程安全问题?答案是不会,因为不可能同时向新数组同一个桶迁移数据,详情可以了解HashMap中扩容原理。

    • 链表顺序问题: 对比JDK7 HashMap,JDK8 HashMap,JDK8 ConcurrentHashMap在扩容后对节点相对顺序的保证方面,JDK7 HashMap是完全倒序。JDK8 HashMap不改变相对顺序。JDK8 ConcurrentHashMap 保证部分节点的相对顺序,其余的倒序。

      • ConcurrentHashMap这里有个优化,首先通过一次链表扫描找到lastRun,lastRun后面的所有节点在新数组中的index都是oldIndex + oldCap ,减少判断次数。下图演示了扩容前后,链表中元素分布:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//tab旧桶数组,nextTab新桶数组
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//控制并发数,控制CPU的资源
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating//新数组为空,则初始化新数组
try {
@SuppressWarnings("unchecked")
//扩容为原来的两倍 n << 1
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//在这里面进行new Node将node.hash置为-1。表示该桶正在进行移动。
//(这里很重要的一点是,只锁表头,所以只需要将链表(或者红黑树)头结点.hash置为-1即可)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//advance是控制是否继续进行移动的条件,当advance == false,表示正在移动一个桶。
//true表示可以继续进行下一个桶的移动
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {//自旋
Node<K,V> f; int fh;
while (advance) {//start
int nextIndex, nextBound;
//当前桶是不是已经移动完了
if (--i >= bound || finishing)
advance = false;
//两个停止移动的条件。移动完了。(这个是真正停止的条件。下面那个条件会进行一次检查)
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//结束扩容
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit 再次检查一遍,防止有桶中还有数据没移动。
}
}//end 从start到end可看可不看就是条件控制,包括结束条件的控制,移动进度的控制等。
//该桶没数据
else if ((f = tabAt(tab, i)) == null)
//将oldtab中的该桶设置为fwd节点,hash=-1
advance = casTabAt(tab, i, null, fwd);
//已经移动过的桶其hash=-1
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {//上锁
if (tabAt(tab, i) == f) {
//ln新链表,不需要移动的节点重新组组织成的链表。
//hn新链表,需要移动的节点重新组织成的链表
Node<K,V> ln, hn;
if (fh >= 0) {//链表
int runBit = fh & n;
Node<K,V> lastRun = f;
//start
//从start,到end之间。不看也行。实在费脑子。其实这段代码写的有点让人费解
//主要是不认真看不知道作者的意图。本意是这样的。判断是不是可以从某个节点n开始
//后面的节点是不是都是和节点n一样,移动的目标桶一样的。
//如果是一样的,则后面的这些节点就不用移动了,只需要移动n节点即可。
//(注意链表的引用,next指针就把后面的都带过去了)
//想一个极端情况,如果在这里迭代后发现,所有节点,扩容后数据移动的目标桶都是一样的。
//则只需要移动头结点即可。不用重新拼接链表了。
......//和HashMap差不多,不过更新操作换成了线程安全的操作
}
else if (f instanceof TreeBin) {//红黑树
......//和HashMap差不多,不过更新操作换成了线程安全的操作
}
}
}
}
}
}

get()方法

get()方法比较简单,就一个细节需要关注:

  • 需要判断桶的状态,即tabAt(tab, (n - 1) & h).hash,如果小于0,说明桶被迁移了,需要到新数组中去找
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//hash
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//取桶
if ((eh = e.hash) == h) {//key相同直接返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//hash < 0 表示正在扩容
//在这里需要非常注意的一点,扩容后的桶会放入fwd节点
//该节点hash = MOVED,fwd.nextTable为扩容后新的数组。
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//迭代链表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

remove()方法

remove()方法底层调用了replaceNode方法,下面是其源码。这里我们先总结下replaceNode中的一些细节:

  • 先判断是否正在扩容,如果在扩容,先帮助扩容;

  • 不在扩容的话,采用自旋+Synchronized锁桶方式进行替换;

    • 判断是否是数组对应位置是否为null,不为null继续,否则直接返回;
    • 如果是链表,则遍历链表,如果是红黑树则遍历红黑树;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
* Implementation for the four public remove/replace methods:
* Replaces node value with v, conditional upon match of cv if
* non-null. If resulting value is null, delete.
*/
final V replaceNode(Object key, V value, Object cv) {
//计算key经过扰动运算后的hash
int hash = spread(key.hashCode());
//自旋
for (Node<K,V>[] tab = table;;) {
//f表示桶位头结点
//n表示当前table数组长度
//i表示hash命中桶位下标
//fh表示桶位头结点 hash
Node<K,V> f; int n, i, fh;

//CASE1:
//条件一:tab == null true->表示当前map.table尚未初始化.. false->已经初始化
//条件二:(n = tab.length) == 0 true->表示当前map.table尚未初始化.. false->已经初始化
//条件三:(f = tabAt(tab, i = (n - 1) & hash)) == null true -> 表示命中桶位中为null,直接break, 会返回
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;

//CASE2:
//前置条件CASE2 ~ CASE3:当前桶位不是null
//条件成立:说明当前table正在扩容中,当前是个写操作,所以当前线程需要协助table完成扩容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

//CASE3:
//前置条件CASE2 ~ CASE3:当前桶位不是null
//当前桶位 可能是 "链表" 也可能 是 "红黑树" TreeBin
else {
//保留替换之前的数据引用
V oldVal = null;
//校验标记
boolean validated = false;
//加锁当前桶位 头结点,加锁成功之后会进入 代码块。
synchronized (f) {
//判断sync加锁是否为当前桶位 头节点,防止其它线程,在当前线程加锁成功之前,修改过 桶位 的头结点。
//条件成立:当前桶位头结点 仍然为f,其它线程没修改过。
if (tabAt(tab, i) == f) {
//条件成立:说明桶位 为 链表 或者 单个 node
if (fh >= 0) {
validated = true;

//e 表示当前循环处理元素
//pred 表示当前循环节点的上一个节点
Node<K,V> e = f, pred = null;
for (;;) {
//当前节点key
K ek;
//条件一:e.hash == hash true->说明当前节点的hash与查找节点hash一致
//条件二:((ek = e.key) == key || (ek != null && key.equals(ek)))
//if 条件成立,说明key 与查询的key完全一致。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//当前节点的value
V ev = e.val;

//条件一:cv == null true->替换的值为null 那么就是一个删除操作
//条件二:cv == ev || (ev != null && cv.equals(ev)) 那么是一个替换操作
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
//删除 或者 替换

//将当前节点的值 赋值给 oldVal 后续返回会用到
oldVal = ev;

//条件成立:说明当前是一个替换操作
if (value != null)
//直接替换
e.val = value;
//条件成立:说明当前节点非头结点
else if (pred != null)
//当前节点的上一个节点,指向当前节点的下一个节点。
pred.next = e.next;

else
//说明当前节点即为 头结点,只需要将 桶位设置为头结点的下一个节点。
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}

//条件成立:TreeBin节点。
else if (f instanceof TreeBin) {
validated = true;

//转换为实际类型 TreeBin t
TreeBin<K,V> t = (TreeBin<K,V>)f;
//r 表示 红黑树 根节点
//p 表示 红黑树中查找到对应key 一致的node
TreeNode<K,V> r, p;

//条件一:(r = t.root) != null 理论上是成立
//条件二:TreeNode.findTreeNode 以当前节点为入口,向下查找key(包括本身节点)
// true->说明查找到相应key 对应的node节点。会赋值给p
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
//保存p.val 到pv
V pv = p.val;

//条件一:cv == null 成立:不比对value,就做替换或者删除操作
//条件二:cv == pv ||(pv != null && cv.equals(pv)) 成立:说明“对比值”与当前p节点的值 一致
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
//替换或者删除操作


oldVal = pv;

//条件成立:替换操作
if (value != null)
p.val = value;


//删除操作
else if (t.removeTreeNode(p))
//这里没做判断,直接搞了...很疑惑
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
//当其他线程修改过桶位 头结点时,当前线程 sync 头结点 锁错对象时,validated 为false,会进入下次for 自旋
if (validated) {

if (oldVal != null) {
//替换的值 为null,说明当前是一次删除操作,oldVal !=null 成立,说明删除成功,更新当前元素个数计数器。
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
} public V remove(Object key) {
return replaceNode(key, null, null);
}

JDK 8之前 VS 之后

  1. 首先和HashMap一样,JDK8之前没有引入红黑树

  2. JDK7中ConcurrentHashMap采用分段锁,具体来说给每个Segment加上ReentrantLock,性能要低于JDK8。其底层数据结构如下:

打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2021-2022 Yin Peng
  • 引擎: Hexo   |  主题:修改自 Ayer
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信