博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ConcurrentHashMap分析(二)数据扩容
阅读量:3933 次
发布时间:2019-05-23

本文共 7827 字,大约阅读时间需要 26 分钟。

前言

在前面的文章里,我们通过从 ConcurrentHashMap 的整体结构入手,逐步了解了它的数据结构和各个节点的转换关系。这篇文章将讲述 ConcurrenHashMap 的另一个重点:如何在高并发环境下进行扩容,这对我们了解高并发编程思想很有帮助。

由于本人水平有限,分析过程中可能存在纰漏和错误,希望大家可以指出,一起学习,一起进步。

思路

我们在前文查看 ConcurrenHashMap 中发现有一个字段nextTable,它起到在扩容时充当临时存储数组的作用:

/** * The next table to use; non-null only while resizing. */private transient volatile Node
[] nextTable;

既然 ConcurrentHashMap 是采用类似渐进式数据迁移的方式进行扩容,那么对于新旧数组来说,操作多线程就灵活很多了。我们接下来来看下它具体的扩容过程。

在之前分析 putVal 方法时,在后面有一个判断链表阈值的过程,如果链表长度超过一定阈值,就会触发转换为红黑树的操作,具体代码如下:

if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 链表转换为红黑树 treeifyBin(tab, i); if (oldVal != null) return oldVal; break;}

进入treeifyBin后,我们发现其实里面还有一个判断,如果当前存储 hash 槽的数组长度小于MIN_TREEIFY_CAPACITY(64)时,那么只是对原有数组进行扩容2倍的操作。代码如下:

if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    tryPresize(n << 1);

然后我们来分析tryPersize方法:

private final void tryPresize(int size) {
// 将数组大小扩容为原来的2幂次 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) {
Node
[] tab = table; int n; // 如果原来的数组没有初始化 if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c; // 开始初始化数组 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再次判断数组是否需要初始化 if (table == tab) {
// 初始化数组 @SuppressWarnings("unchecked") Node
[] nt = (Node
[])new Node
[n]; table = nt; sc = n - (n >>> 2); } } finally {
sizeCtl = sc; } } } // 如果数组已经扩容过了或者容量大小已经大于 MAXIMUM_CAPACITY 了,就直接返回 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; // 这是重点,可以看到在这里又分为了两种情况 else if (tab == table) {
int rs = resizeStamp(n); // 表示此时有别的数组正在扩容【sizeCtl=-(1+nThreads)】 if (sc < 0) {
Node
[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 没有其它线程在扩容,则当前线程自己进行扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } }}

在第三种情况中,无论此时是否有其它线程正在扩容,都是要执行核心方法transfer方法,我们可以看到判断当前线程是否为首个扩容线程的依据就是传入transfer的第二个参数是否为null。接下来我们就要来分析核心方法transfer

原理

private final void transfer(Node
[] tab, Node
[] nextTab) {
int n = tab.length, stride; // 根据 CPU 计算步长,即每个线程要复制原数组的多少个桶 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // 第一次扩容 if (nextTab == null) {
try {
@SuppressWarnings("unchecked") // 创建新数组 Node
[] nt = (Node
[])new Node
[n << 1]; nextTab = nt; } catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 创建 ForwardingNode 节点,还记得它的作用是作为首部节点吗?我们来看看后面它的具体作用是什么 ForwardingNode
fwd = new ForwardingNode
(nextTab); // 当一个桶迁移工作完成后设置为 true boolean advance = true; // 数据迁移完成后设置为 true boolean finishing = false; // i 表示桶索引,bound 表示边界 for (int i = 0, bound = 0;;) { Node
f; int fh; while (advance) { int nextIndex, nextBound; // 当这一个桶没有迁移完成或者整个迁移工作没有结束时,advance 为 false if (--i >= bound || finishing) advance = false; // transferIndex<=0 表示已经没有需要迁移的hash桶,将i置为-1,线程准备退出 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //当迁移完这个桶后,更新transferIndex,,获取下一批待迁移的桶 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 退出 transfer 方法 if (i < 0 || i >= n || i + n >= nextn) { int sc; // 如果 finishing=true 说明数据迁移完成,最后一个线程完成收尾工作 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } /** * 第一个扩容的线程,执行transfer方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) * 后面过来扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = sizeCtl+1 * 每一个退出 transfer 的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1 * 那么最后一个线程退出时: * 必然有 sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT */ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 如果不相等,说明不是最后一个线程,那么直接退出 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 最后一个线程要再次检查数据是否全部迁移成功 finishing = advance = true; i = n; } } // 如果原数组的i索引没有数据,直接CAS将新数组设置为null else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果原数组的i索引的hash为MOVED,说明已经有线程在迁移它了 else if ((fh = f.hash) == MOVED) advance = true; // 重点:迁移node节点 else { synchronized (f) { if (tabAt(tab, i) == f) { Node
ln, hn; // 这里是链表结构的迁移 if (fh >= 0) { // 代码会在后面单独列出来 } // 这里是 TreeBin 节点的迁移 else if (f instanceof TreeBin) { // 代码会在后面单独列出来 } } } } }}

在前面的代码我们会了解到 ConcurrentHashMap 默认将所有的槽分为一个个桶,对每个桶进行并发式的数据迁移,同时它会根据(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT判断是否是迁移过程中的最后一个线程。对于其它线程来说,如果它的任务完成了那么就直接退出方法,而最后一个线程需要设置将nextTable的数据赋给table,同时将nextTablesizeCtl的值恢复为原来的。接下来我们就来分析链表节点和红黑树节点不同的迁移方式。

首先是链表节点,下面是它的相关代码:

int runBit = fh & n; // 对旧桶的该槽的首节点进行 hashNode
lastRun = f;for (Node
p = f.next; p != null; p = p.next) {
// b 要么等于0,要么等于n int b = p.hash & n; if (b != runBit) {
runBit = b; // lastRun 会记录最后一个runBit值发生变化的节点 lastRun = p; }}// 开始构建两个链表了// 尾插法if (runBit == 0) {
ln = lastRun; hn = null;} else {
hn = lastRun; ln = null;}// 以 lastRun 所指向的结点为分界,将链表拆成2个子链表ln、hnfor (Node
p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node
(ph, pk, pv, ln); else hn = new Node
(ph, pk, pv, hn);}setTabAt(nextTab, i, ln); // ln 链表存入新桶的索引i位置setTabAt(nextTab, i + n, hn); // hn 链表存入新桶的索引i+n位置setTabAt(tab, i, fwd); // 设置 ForwardingNode 占位advance = true; // 表示当前旧桶的结点已迁移完毕

我们知道 ConCurrentHashMap 的容量大小必须是2的幂次。这样fh & n的结果要么是0,要么是n。所以这里就会将链表分为两部分,结果为0的一部分和结果为n的一部分,这样可以形成扩容后原长度链表各分一半的情况。

从上面的代码我们可以发现指定容量大小为2的幂次,不仅可以使hash尽量做到均匀分布,而且对于节点迁移也有很大帮助。接下来我们来分析红黑树的迁移过程:

TreeBin
t = (TreeBin
)f;TreeNode
lo = null, loTail = null;TreeNode
hi = null, hiTail = null;int lc = 0, hc = 0;// 用链表的方式遍历,先是跟链表节点方式一样,形成两个链表// Node 提供 next 字段,TreeNode 提供 prev 字段,这样就能形成双向链表for (Node
e = t.first; e != null; e = e.next) {
int h = e.hash; TreeNode
p = new TreeNode
(h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; }}// 判断是否需要形成的两个链表是否需要转换为红黑树ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin
(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin
(hi) : t;// 设置槽位和 advance 状态信息setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;

以上就是 ConcurrentHashMap 的扩容内容,ConcurrentHashMap 十分巧妙地通过2的幂次数字与位运算将索引重计算的问题完美地解决了,将原来数组分为多个桶,使用并发的方式对数据进行迁移,设计地让人叹为观止。

转载地址:http://njqgn.baihongyu.com/

你可能感兴趣的文章
矩阵积分
查看>>
laplacian matrix
查看>>
cotangent matrix or laplacian mesh operator
查看>>
Minimizing quadratic energies with constant constraints
查看>>
Python-第三方库requests详解
查看>>
暴力破解黄巴登录网站
查看>>
python多线程
查看>>
read selection
查看>>
optimization on macOS
查看>>
Template-Based 3D Model Fitting Using Dual-Domain Relaxation
查看>>
install libfreenect2 on ubuntu 16.04
查看>>
how to use automake to build files
查看>>
using matlab drawing line graph for latex
查看>>
How package finding works
查看>>
build opencv3.3.0 with VTK8.0, CUDA9.0 on ubuntu9.0
查看>>
how to compile kinfu_remake with cuda 9.0 opencv2.4.13.4
查看>>
qtcreator4.4.1中cmake 与cmake3.5.1本身generate出来的setting是有区别的解决方法
查看>>
CMake Useful Variables/Logging Useful Variables
查看>>
使用cmake建立工程链接OPENNI2
查看>>
ubuntu下解决csdn网页打不开的问题
查看>>