ConcurrentHashMap源码JDK8

JDK8中ConcurrentHashMap实现有很大的变化,首先没有了分段锁所有数据都放在一个大的HashMap中,其次是引入了红黑树。若头结点是Node类型,则其就是一个普通链表,若头结点是TreeNode类型,则其是一颗红黑树TreeNode是Node的子类。链表和红黑树可以相互转换,初始时是链表,当链表达到一定阈值后,把链表转换成红黑树,反之当红黑树中元素小于某个阈值时,再转回链表。

JDK8中ConcurrentHashMap数据结构示意图

在JDK7中分段锁的减少了hash冲突,避免了一个槽中有太多元素,提高读和写的并发度,段与段之间相互独立提供扩容的并发度,扩容时不是把整个ConcurrentHashMap一起扩容,而是每个Segment独立扩容

JDK8中使用红黑树,当一个槽中有很多元素时,其查询和更新速度会比链表快很多,Hash冲突问题由此得到了很好的解决加锁的粒度变小了,并非是整个ConcurrentHashMap加锁,而是对每个头结点分别加锁,即提高了并发度,就是Node数组的长度,初始默认长度为16,和JDK7中初始Segment个数相同,且支持并发扩容JDK7中一旦Segment个数在初始化时确定就不能再修改,并发度被固定,之后只是在每个Segment内部扩容,JDK8中相当于只有一个Segment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
private static final int MAXIMUM_CAPACITY = 1 << 30; // 最大容量
private static final int DEFAULT_CAPACITY = 16; // 默认容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 默认并发度
private static final float LOAD_FACTOR = 0.75f; // 负载因子
static final int TREEIFY_THRESHOLD = 8; // 链表超过该长度走判断转红黑树的逻辑
static final int UNTREEIFY_THRESHOLD = 6; // 红黑树元素个数小于该值,转成链表
static final int MIN_TREEIFY_CAPACITY = 64; // 当数组长度大于该值时,链表长度超过8,转红黑树
private static final int MIN_TRANSFER_STRIDE = 16; // 扩容最小步长,即扩容线程每次最少要迁移16个hash桶
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Node<K,V>[] table;
private transient volatile Node<K,V>[] nextTable; // 扩容时将table中元素迁移至nextTable
private transient volatile long baseCount;
private transient volatile int sizeCtl; // 用于控制在初始化或并发扩容时的线程数,初始值设置为cap
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0) throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

cap就是Node数组的长度,保持为2的整数次方tableSizeFor是根据传入的初始容量,计算出一个合适的数组长度,具体是1.5倍的初始容量+1,再往上取接近2的整数次方,作为数组长度cap的初始值。这里的sizeCtl其含义是用于控制在初始化或并发扩容时的线程数,其初始值设置为cap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // sizeCtl = -1时自旋等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 把sizeCtl设置为-1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 初始化
table = tab = nt;
sc = n - (n >>> 2); // sizeCtl并非表示数组长度,故初始化成功后,就不在等于数组长度,而是0.75n,表示下一次扩容的阈值
}
} finally {
sizeCtl = sc; // 重置sizeCtl
}
break;
}
}
return tab;
}

需特别说明spread方法,由于数组大小限制导致高位在索引计算中一直用不到,故在spread方法中将hash的高16位利用起来进行异或转换,最经济的方式,削减系统性能损失,从而使高位也能利用起来,最后与HASH_BITS相与的目的是让得到的hash值总是正数,保证正数的目的是,因为hash值为-1表示哈希表正在扩容中,该哈希桶已经被迁移到了新的临时hash表,此时节点为ForwardingNode类型。

1
2
3
4
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

构造函数中并没有对数组进行初始化,当put数据时才进行初始化,多线程竞争是通过对sizeCtl进行CAS操作实现的,若某个线程成功把sizeCtl设置为-1,则其就拥有初始化权利,进入初始化代码模块,等初始化完成,再把sizeCtl设置回去,其他线程一直执行while循环自旋等待,直到数组不为null,即结束初始化时退出整个初始化函数。因为初始化工作量很小,故是让其他线程一直等待,而没有帮助其初始化

sizeCtl在Hash表处于不同状态时,表达含义不同,当sizeCtl=-1时,表示整个HashMap正在初始化,当sizeCtl=某个其他负数时,表示多个线程在对HashMap做并发扩容,当sizeCtl=cap时,tab=null,表示未初始化之前的初始容量扩容成功后,sizeCtl存储的是下一次扩容的阈值即0.75n

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 数组初始化
// 判断put的对象的Key在数组中是已经存在Hash冲突,若不存在,则直接创建一个节点放入数组中
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 第i个元素初始化
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // 放入成功则退出,若初始化失败,会自旋直到成功
}
else if ((fh = f.hash) == MOVED) // 判断当前是否在扩容
tab = helpTransfer(tab, f); // 若在扩容则帮助扩容
else { // 放入元素
V oldVal = null;
synchronized (f) { // 加锁,这里的锁是加在链表的头,或红黑树的根节点上的
if (tabAt(tab, i) == f) { // 再次检查,判断f节点是否发生变化,若发生变化再循环一次
if (fh >= 0) { // 若是链表
binCount = 1; // 用来记录链表的长度
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val; // 若插入元素的key已存在,则替换value
if (!onlyIfAbsent) // onlyIfAbsent默认为false
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 已经遍历到链表的尾部,还是没有KEY重复的,则新建Node插入链表尾部
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
} else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val; // 若插入元素的key已存在,在替换value
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { // 若是链表,则上面的binCount为从1一直累加
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 链表长度超出阈值8,则转换为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

若发现在扩容,则帮助其扩容,这里加锁的是ff是对应数组下标位置的头节点,意味着每个链表有一把锁并发度等于数组的长度。当binCount即链表元素个数超过8时,通过treeifyBin函数把链表转换成红黑树,但该函数内部不一定需要把链表转成红黑树,可能只是进行扩容操作,数组长度小于阈值64直接扩容,否则才转红黑树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1); // 数组长度小于阈值64,直接扩容
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) { // 加锁,链表转红黑树
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) { // 遍历链表构建红黑树,改成双向链表
TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

tryPresize是根据期望元素个数对整个Hash表进行扩容,其核心是调用transfer函数,第一次扩容sizeCtl会被设置为一个很大的负数U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2),之后每一个线程扩容时,sizeCtl加一U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)待扩容完成后sizeCtl减一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private final void tryPresize(int size) {
// 根据元素个数计算数组大小
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) { // Hash表初始化,和上面初始化时一样
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 即 n - n/4 = 0.75n,下一次扩容阈值
}
} finally {
sizeCtl = sc;
}
}
} else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) { // 扩容分支
int rs = resizeStamp(n);
if (sc < 0) { // 说明多个线程正在进行并发扩容
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
break; // 扩容结束
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt); // 帮助扩容
}
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null); // 第一次扩容
}
}
}

该函数会被多个线程调用,故每个线程只是扩容旧的HashMap部分旧数组长度是N,每个线程扩容一段,一段的长度用变量stride步长来表示transferIndex表示整个数组扩容进度。在单核模式下没有办法多个线程并行扩容,只需要一个线程来扩容整个数组,故stride直接等于n多核模式下(n >>> 3) / NCPU,且保证步长的最小值是16,则需要的线程数约为n/stride

transferIndexConcurrentHashMap的一个成员变量,记录了扩容进度初始值为n从大到小扩容,每次减stride个位置,最终减至n<=0表示整个扩容完成,因此从[0, transferIndex-1]的位置表示还没分配到线程扩容的部分。因为transferIndex会被多个线程同时修改,每次减需要通过CAS操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // 扩容前的HashMap,扩容后的HashMap
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 计算步长,最小步长为16
if (nextTab == null) { // 初始化新HashMap
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 扩容2倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; // 初始化的transferIndex为旧HashMap的数组长度
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // 当前线程需不需要继续往前找需要帮助扩容的槽位
boolean finishing = false; // 当前线程扩容任务是否已经做完
// i为遍历下标,bound为边界,若成功拿到一个任务,则i=nextIndex-1,bound=nextIndex-stride,若拿不到任务,则i=0,bound=0
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// advance表示从i=nextIndex-1遍历到bound位置的过程中,是否一直继续,每次advance只能进一步
// 三个子分支中都是advance=false,则若三个分支都不执行,才可能一直执行while循环
// 目的在于,当对transferIndex执行CAS操作不成功时,需要自旋以期拿到一个stride的迁移任务
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) // 通过--i对数组遍历,若成功执行了--i,则不用继续while循环
advance = false;
else if ((nextIndex = transferIndex) <= 0) { // transferIndex<=0,整个HashMap完成
i = -1;
advance = false;
}
// 对transferIndex进行CAS操作,即当前线程分配一个stride,若CAS成功,则拿到一个迁移任务,否则继续while自旋
else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1; // 数组从后往前i到bound
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) { // i已越界,整个HashMap已遍历完成
int sc;
if (finishing) { // 表示整个HashMap扩容完成
nextTable = null;
table = nextTab; // 把nextTab赋值给当前table
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) // tab[i]迁移完毕,赋值一个ForwardingNode
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) // tab[i]的位置已经在迁移过程中
advance = true; // already processed
else { // 对tab[i]进行迁移操作,tab[i]可能是一个链表或红黑树
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { // 链表
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p; // 在lastRun之后的所有元素,hash值都一样,记录下该最后的位置
}
}
if (runBit == 0) { // 判断lastRun属于高位还是低位
ln = lastRun; // 类是JDK7链表迁移的优化做法
hn = null;
} else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0) // 判断高低位
ln = new Node<K,V>(ph, pk, pv, ln); // 头插法
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // 将拆分后的低位设置到新表的槽中
setTabAt(nextTab, i + n, hn); // 将拆分后的高位设置到新表的槽中
setTabAt(tab, i, fwd); // 将旧的槽的位置设置为fwd
advance = true;
} else if (f instanceof TreeBin) { // 红黑树,迁移办法和链表类似
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null; // 采用高低位
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
if ((h & n) == 0) { // 对低位的处理
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc; // 记录低位元素个数
} else { // 对高位的处理
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc; // 记录高位元素个数
}
}
// 对高低位的元素个数分别判断,其是否需要转回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln); // 低位
setTabAt(nextTab, i + n, hn); // 高位
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

扩容完成之前,数组下标对应的槽有的已经迁移到新的HashMap中,有的还没有,这时所有调用get方法的线程还是会访问旧HashMap,若Node[0]已迁移成功,其他Node还在迁移中,若有线程要读取Node[0]中的数据,会访问失败,为此新建一个转发节点ForwardingNode,在该节点中记录的是新的ConcurrentHashMap的引用,当线程访问到ForwardingNode时会查询新的ConcurrentHashMap

链表的迁移不需要记录高低位的元素个数,因为链表拆分后高低位肯定都是小于等于原来的链表长度的,故肯定还是链表,不需要判断是否转红黑树。当对红黑树的迁移,拆分后很可能长度打到转链表的临界值了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0) // 若正在扩容
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
} else return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

ForwardingNode节点只是做一个标记作用,表示其他线程正在扩容,且此节点已经扩容完毕,关联了nextTable,扩容期间可通过find方法,访问到已迁移到nextTable中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果counterCells未被初始化,且CAS对baseCount加一成功就不会走if内的逻辑,否则走if内的逻辑
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 若counterCells数组不为null,或CAS对baseCount加一失败
if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 若counterCells数组为null、或counterCells数组长度为0、或每个线程生成的随机数取余后再取模得到的counterCells数组下标对应的元素为null、或若不为空对该元素value CAS加一失败才执行fullAddCount方法
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) { // 看是否进行扩容
Node<K,V>[] tab, nt; int n, sc;
// 若完成一次扩容后继续判断扩容后的HashMap是否需要再次扩容,若需要则再次扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
break; // nextTable是在transfer中赋值的
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 如果CounterCell数组不为空
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) { // 当前随机数对应的CounterCell数组下标的CounterCell为null
if (cellsBusy == 0) { // 判断当前CounterCell数组是否繁忙
CounterCell r = new CounterCell(x); // 新建一个CounterCell
// CAS将cellsBusy标志设置为1即繁忙
if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 再次判断当前数组下标是否为null,若不为null说明其他线程改变了则继续循环,若为空则将当前数组下标设置为沙面生成的r,且退出循环
if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created) break;
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended) // 若wasUncontended为false
// 置为true,下面执行ThreadLocalRandom.advanceProbe(h)新的随机数h
wasUncontended = true; // Continue after rehash
// 若a = as[(n - 1) & h]) != null 则CAS操作a,成功则退出循环
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU) // 若数组长度大于CPU数了,则不再进行扩容
collide = false; // collide设置为false
else if (!collide)
collide = true; // collide为true时才会触发CounterCell数组扩容
else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// 再次校验counterCells是否改变,若改变则继续循环
CounterCell[] rs = new CounterCell[n << 1]; // 扩容CounterCell数组
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h); // 生成一个新的随机数
} else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 若CounterCell数组为空,若cellsBusy=0说明没有其他线程在对CounterCell初始化,cas将cellsBusy设置为1,若成功则对CounterCell数组初始化,且初始化长度为2
boolean init = false;
try { // Initialize table
if (counterCells == as) { // 再次判断counterCells是否已经改变
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0; // 完成则将cellsBusy还原为0
}
if (init) // 若初始化完成则直接退出循环
break;
} else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // 在CounterCell数组为空,且cellsBusy为1说明其他线程在对CounterCell初始化,剩余线程不是等待而是对baseCount进行CAS操作,若成功则退出循环
}
}

对Size的统计是通过遍历CounterCell数组中存储的value相加的总和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}