详解 ConcurrentHashMap

文章目录

  • ConcurrentHashMap 的底层数据结构?
  • ConcurrentHashMap 的带参构造方法的流程?
  • ConcurrentHashMap 的 put 方法的流程?
  • ConcurrentHashMap addCount 方法的流程是怎样的呢?
  • ConcurrentHashMap transfer 方法的流程是怎样的呢?
  • ConcurrentHashMap helpTransfer 方法的流程是怎样的呢?
  • ConcurrentHashMap 的 get 方法的流程?
  • ConcurrentHashMap 的 sizeCtl 的含义,以及值的流转过程?
  • ConcurrentHashMap 的 size 方法的流程?
  • 其他
    • 如果 ConcurrentHashMap 的某个数组下标位置是一颗红黑树,那么这个位置上的节点类型是 TreeNode 吗?
    • 为什么要用 TreeBin 对象作为这个位置上的节点,而不是 TreeNode 对象呢?
    • ConcurrentHashMap 的 size 方法会返回最新的值吗?
    • transferIndex 的真正含义
  • ConcurrentHashMap 总结
    • put 方法流程总结
    • ConcurrentHashMap 的元素数量计数
    • ConcurrentHashMap 的扩容操作
  • ConcurrentHashMap 的设计思想总结
    • 大量的无锁并发安全处理操作
    • 细化临界资源粒度
    • 高效的扩容机制
    • 高效的状态管理机制

ConcurrentHashMap 的底层数据结构?

ConcurrentHashMap 的底层数据结构是 Node 数组。Node 类的定义如下:

static class Node<K,V> implements Map.Entry<K,V> {//节点的 hash 值final int hash;//节点的 key 值final K key;//节点的 value 值volatile V val;//后继节点volatile Node<K,V> next;
}

其中,元素的 keyvalue 均不能为空。

ConcurrentHashMap 的带参构造方法的流程?

  • 判断传入的初始容量是否合法,小于 0 将抛出异常
  • 判断是否传入的初始容量大于最大值(2^30 次方)的一半,如果是,则将容量设置为最大值
  • 否则将容量设置为大于传入的初始容量的最小的 2 的整数次幂
  • sizeCtl 参数赋值为初始容量

ConcurrentHashMap 的 put 方法的流程?

ConcurrentHashMapput 方法流程如下:

  • 首先检查 keyvalue 是否为空,如果为空,则直接抛出空指针异常
  • 其次调用 spread 方法计算 hash
    • keyhashcode 往右移 16 位,跟原 hashcode 值做异或运算
    • 异或运算得到的结果,跟 HASH_BITSHASH_BITS = 0x7fffffff,换算成二进制有 31 个 1)做运算得到最终结果
  • 判断数组是否为空,如果数组为空,则执行初始化方法
    • 当表为空时,一直执行循环
    • 完成构造方法后,sizeCtl 参数要么等于 0,(即使用的无参构造器),要么等于初始容量大小,(使用的指定了初始容量的构造器)
    • sizeCtl 为负数时,即表正在被其他线程初始化或者正在被其他线程扩容时,调用 Thread.yield 方法主动让出 cpu 执行权(即等待其他线程完成初始化或表扩容的操作
    • sizeCtl 不为负数时,使用 CASsizeCtl 的值设置为 -1
    • 再次判断表是否为空
      • 如果表不为空,则说明表已经被其他线程初始化完成,则直接跳出循环
      • 如果表为空,判断是否指定了初始容量,如果指定了初始化容量,则使用指定的数值作为初始化容量;如果没有指定初始容量,则使用默认容量 16
      • 初始化一个大小为上一步中得到的容量的 Node 数组
      • sizeCtl 的值设置为容量的 0.75(可类比于 HashMap 中的扩容阈值
  • 根据 hash数组长度 - 1进行运算后,得到元素在数组中的下标,并检查该下标位置是否存在元素
    • 如果该下标位置不存在元素,则用 CAS 对该下标位置进行赋值,如果赋值成功,则跳出循环
    • 如果 CAS 操作失败,则继续循环
  • 如果数组该下标位置存在元素(以下简称该元素为 f),则检查 fhash 值是否等于 -1(当元素的 hash 值为 -1 时,代表该数组正在进行扩容),即 MOVED
    • 如果是,则说明其他线程正在进行扩容,则执行 helpTransfer 方法协助完成扩容操作
  • 否则,开始对该数组下标位置上的桶中的元素进行遍历比较
    • 首先使用 synchronized 关键字对 f 进行加锁
    • 加锁成功,则重新获取一遍该数组下标位置上的元素,判断其与 f 是否相等,即判断 f 是否发生了变化,如果发生了变化,则直接进入下一次循环
    • 如果没有发生变化,则判断 fhash 值是否大于等于 0
      • 如果大于等于 0,则说明是链表结构,则遍历链表,将 binCount 值赋为 1,每次遍历都将 binCount +1
      • 使用 keyequals 方法逐一比对元素,如果该 key 不存在,则将待插入元素加入到链表的尾部
      • 如果存在该 key,则根据 onlyIfAbsent 参数来判断是否需要将旧 value 值进行覆盖
    • 如果 fhash 值小于 0 ,则判断 f 是否是 TreeBin 类型的元素
    • 如果是,将 binCount 值赋为 2,将待插入元素插入到红黑树中
      • 如果红黑树插入失败,则说明存在该 key,则根据 onlyIfAbsent 参数来判断是否需要将旧 value 值进行覆盖
  • 判断 binCount 的值是否不等于 0,即是否进行了红黑树和链表的查找过程
    • 如果不等于 0,则判断链表是否需要转化成红黑树,当链表上的元素个数大于 8(即在插入第 9 个元素时),且数组的长度大于 64 时,将链表转化成红黑树
    • 转化成红黑树后,将该数组下标位置上的元素使用 CAS 替换成 TreeBin 类型的元素
    • 如果替换了旧值,则将旧值返回
  • 执行 addCount 方法,即尝试将元素数量 +1

结合源码来看:

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;
}

ConcurrentHashMap addCount 方法的流程是怎样的呢?

addCount 方法,即尝试将当前元素数量自增的方法,其主要的流程如下:

  • 首先判断 counterCells 是否不为空
  • 或者尝试使用 CASbaseCount 属性进行增加的时候是否失败
  • 如果满足上面的条件
    • 继续判断 counterCells 是否为空
    • 如果 counterCells 不为空,则调用 ThreadLocalRandom.getProbe 方法生成一个随机数,跟 countCells.length-1 进行操作之后,得到 counterCells 数组的下标,判断 counterCells 该下标位置上的元素是否为空(即得到一个没有线程正在占用的)
    • 如果上述条件都不满足,则使用 CAScounterCells 下标位置的 value 值进行增加,判断 CAS 操作是否失败
    • 如果上述任一条件满足,说明已经发生了线程间的竞争,则调用 fullAddCount 方法进行 counterCells 内部的自增操作
    • 如果上述所有条件都不满足,说明对于 countCells 下标位置的 value 值进行 CAS 增加的操作成功了
      • 如果 check 参数小于等于 1,则直接返回
      • 否则,调用 sumCount 方法统计一下当前数组中的元素数量
        • sumCount 方法,就是简单地将 baseCount 的值和所有 counterCells 数组的所有元素的 value 值求和,此方法没有加锁,同步措施主要依靠 baseCountCounterCellvalue 属性都是用 volatile 关键字来修饰的。
  • 检查 check 变量是否大于等于 0
    • 如果大于等于 0,说明需要检查是否要进行扩容
    • 判断当前元素数量是否大于 sizeCtl 参数,且表不为空,且表的长度小于最大长度时,此时说明需要扩容,则进入循环
      • 首先计算扩容戳(即计算当前表长度数值的最高非 0 位前的 0 的个数,跟 2152^{15}215 进行运算)
      • 接下来判断 sizeCtl 是否小于 0
        • 如果小于 0 代表数组正在扩容,即有线程正在对数组进行扩容
          • 判断 sizeCtl 往右移 16 位后是否不等于 扩容戳
          • 判断 nextTable 属性是否等于 0
          • 判断 transferIndex 是否小于等于 0
          • 如果上述 3 个条件任一成立,代表数组已经被其他线程扩容完成,则直接返回
          • 如果上述 3 个条件都不成立,则尝试使用 CASsizeCtl 进行 +1
            • 如果 CAS 成功,代表该线程开始执行协助扩容操作,参与扩容的线程数(sizeCtl 参数的低 16 位)+1,则开始执行协助扩容
        • 如果 sizeCtl 不小于 0,则尝试使用 CASsizeCtl 的值修改成扩容戳左移 16 位且 +2
          • 如果 CAS 成功,则执行初始化扩容操作(此前没有其他线程在对数组进行扩容)
        • 重新计算当前元素数量(调用 sumCount 方法)后进入下次循环

结合源码来看:

private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}
}

ConcurrentHashMap transfer 方法的流程是怎样的呢?

ConcurrentHashMaptransfer 方法,即为扩容方法,其主要的流程如下:

  • 首先,需要通过 CPU 核心数确定每个线程需要处理的桶的数量 stride,最小为 16
  • 如果 nextTable 属性为空,则说明正在执行初始化扩容,则新建一个原数组长度两倍的新数组,并赋值给 nextTable,并将 nextTransferIndex 属性赋值为原数组长度
  • 创建一个 FowardingNode 类型的节点,此类节点的 hash 值为 -1,其中有一个 nextTab 属性,记录的就是扩容时的新数组
  • 根据 transferIndexstride 的值,尝试使用 CAStransferIndex 的值修改为 transferIndex - stride,这一步是确定当前线程要处理的桶的范围,即当前线程要处理的数组下标范围是 [transferIndex - stride,transferIndex) 这个区间内的所有桶
  • 分配到需要处理的桶的范围后,从右到左逆序遍历这个范围中的每一个桶,遍历的下标为 i
    • 判断位置为 i 上的这个节点是否为空,如果为空,则尝试使用 CAS 将这个位置上的节点修改成创建好的 FowardingNode 节点
    • 如果这个节点不为空,那么判断这个节点的 hash 值是否等于 -1,如果是,代表这个节点是 ForwardingNode 类型的节点,则不予处理
    • 否则说明这个节点上的元素还没有被迁移,则开始迁移这个桶中的所有节点
      • 首先对这个节点使用 synchronized 进行加锁
      • 加锁成功后,判断这个节点有没有被改变
      • 如果没被改变,则判断这个节点的 hash 值是否大于 0
      • 如果大于 0,则说明这个节点是链表的头节点,则开始对链表进行迁移
        • 首先,遍历链表,计算每一个节点的 runBit ,其计算方式就是将节点的 hash 值与原数组长度进行运算,计算结果只有两种
          • 如果 runBit 的值为 0,则说明节点在新数组中的位置等于原来的下标位置
          • 如果 runBit 的值不为 0,则说明节点在新数组中的位置等于原来的下标 + 原数组的长度位置
        • 找到最后一个与前驱节点的 runBit 值不相等的节点 lastRun,最后的 runBit 值等于 lastRun 节点的 runBit
        • lastRun 节点的含义,就是在链表中找到一个其后续节点的 runBit 值都相等的节点,在发生迁移的时候,只需要移动这个 lastRun 节点,就可以完成其后续所有节点的迁移
        • 如果最后的 runBit 等于 0,则将 lastRun 赋值给低位链表头节点 ln;如果最后的 runBit 不等于 0,则赋值给高位链表头节点 hn
        • 从头遍历链表,直到找到 lastRun 的位置停止,根据 runBit 值的不同,使用头插法将元素插入到低位链表中,或者高位链表
        • 使用 CAS 将新数组的 i 的位置上的元素赋值为低位链表头节点 ln
        • 使用 CAS 将新数组的 i + 原数组长度 的位置上的元素赋值为高位链表头节点 hn
        • 使用 CAS 将原数组的 i 位置上的元素赋值为创建好的 ForwardingNode 节点
      • 如果原数组 i 上的元素是 TreeBin 类型,则执行红黑树的迁移工作,迁移过程与链表类似,也是根据每个节点的 runBit 来确定在高位的红黑树中,还是在低位的红黑树
  • 当待处理区间内的所有桶都处理完毕后,再次尝试获取任务,如果获取成功,则遍历新获取的区间内的所有桶进行迁移处理
  • 如果 transferIndex 已经小于等于 0,则说明已经没有任务可以分配了,那么尝试使用 CAS 将参与扩容的线程数 -1后(即将 sizeCtl -1 ),看是否当前扩容的线程数是否只剩下一个(即 sizeCtl - 2 = resizeStamp() << 16,即回到了初始扩容时将 sizeCtl 修改成的数值),如果是则直接返回
  • 如果不是,则进行 recheck 处理,将原数组上的所有位置,从右到左再次重新遍历一遍,检查是否还存在元素还没有被迁移
  • recheck 处理完毕后,则原数组上的所有位置上的元素都已经迁移完毕,则将新数组替换掉旧数组,将 sizeCtl 参数设置为新数组长度的 0.75,并将 nextTable 属性置空后返回
    结合源码来看:
/**** @param tab 当前的数组* @param nextTab 不为空时,说明正在扩容,传入的即为尚未扩容完成的数组;为空时,说明尚未开始扩容*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;//stride 变量即为每个 CPU 要处理的桶的数量//判断 CPU 核心数是否大于 1,如果大于 1,则 stride 等于当前数组长度除以 8 再除以 CPU 核心数//否则 stride 等于当前数组长度//判断 stride 是否小于最小值,即 16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)//如果 stride 小于 16,则赋值为 16 stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) {            // initiating//如果传入的 nextTab 值为空,则说明需要初始化一个扩容后的数组 try {@SuppressWarnings("unchecked")//创建一个长度为旧数组长度两倍的新数组Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}//将初始化好的新数组赋值给 nextTable 属性nextTable = nextTab;//将 transferIndex 属性赋值为旧数组的长度transferIndex = n;}//将 nextn 变量赋值为新数组的长度int nextn = nextTab.length;//初始化 ForwardingNode 类型的数组,将 nextTab 变量传入,当作这个节点的 nextTab 属性ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//初始化 advance 变量为 trueboolean advance = true;//初始化 finishing 变量为 falseboolean finishing = false; // to ensure sweep before committing nextTab//初始化 i 和 bound 变量,初始值都为 0,进入循环for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;//当 advance 属性为 true 时,一直进行循环//这段循环的目的即为给当前线程分配一段需要处理的桶的区间//即给当前线程分配扩容任务while (advance) {int nextIndex, nextBound;//如果 i-1 大于等于 bound ,或者说 finishing 为 trueif (--i >= bound || finishing)//则将 advance 变量赋值为 false,即跳出循环的条件advance = false;//将 nextIndex 赋值为 transferIndex//并判断值是否小于等于 0else if ((nextIndex = transferIndex) <= 0) {//如果 transferIndex 小于等于 0,代表给线程分配扩容任务已经完成,接下来就该跳出循环了//则将 i 赋值为 -1//将 advance 属性赋值为 falsei = -1;advance = false;}//使用 CAS 尝试将 transferIndex 修改为 transferIndex - stride 的差值//这是因为,需要给当前线程分配处理桶的区间//即,当前线程需要处理的桶的区间为:[transferIndex-stride,transferIndex)else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {//如果 CAS 成功,即将 bound 的值赋值为 transferIndex-stride,即为需要处理桶的左边界(含)bound = nextBound;//将 i 赋值为 nextIndex -1,即为需要处理桶的右边界(含)i = nextIndex - 1;//将 advance 变量赋值为 false,即跳出循环的条件advance = false;}}//判断如果 i < 0,或者 i >= 原数组长度//或者 i + n 大于等于新数组长度if (i < 0 || i >= n || i + n >= nextn) {//实测只有 i = -1 的时候会满足条件,即走进了上面一个循环的第二个分支的条件的时候//而第二个条件满足,即说明 transferIndex 已经 <= 0 了//即说明给线程分配任务已经完成了int sc;//如果扩容已经结束if (finishing) {//将 nextTable 属性赋值为 nullnextTable = null;//将当前数组替换为新数组table = nextTab;//将 sizeCtl 属性赋值为新数组长度的 0.75 倍//即 sizeCtl 重新变成扩容阈值sizeCtl = (n << 1) - (n >>> 1);//扩容操作完成,直接返回return;}//使用 CAS 尝试将 sizeCtl -1,即参与扩容的线程数量 -1if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//如果 CAS 成功,判断参与扩容的线程数量是否只剩 1 个了//扩容戳往左移 16 位 +2 即为初始化扩容时的 sizeCtl 参数的值if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)//如果参与扩容的线程数量只剩一个了,则说明扩容操作已经完成,则直接返回return;//否则说明整个扩容操作还没有完成,只是当前线程的当前任务完成了//将 finishing 和 advance 参数都赋值为 truefinishing = advance = true;//将 i 赋值为原数组长度//完整地从右到左重新检查一遍原数组上的每一个位置,查看是否还有元素没有迁移i = n; // recheck before commit}}//判断下标 i(即当前处理的桶的位置)位置上是否为空else if ((f = tabAt(tab, i)) == null)//如果为空,则尝试用 CAS 把旧数组上的第 i 个元素,修改为 ForwardingNode 类型的节点//ForwardingNode 节点的 hash 值比较特殊,为 -1,枚举值为 MOVED//将 advance 的值赋值为 CAS 的结果advance = casTabAt(tab, i, null, fwd);//判断,如果下标 i (当前处理的桶的位置)上的 hash 值为 -1else if ((fh = f.hash) == MOVED)//代表这个下标对应的节点已经被赋值为了 ForwardingNode 类型的节点//说明该位置已经被处理了,则将 advance 赋值为 trueadvance = true; // already processedelse {//否则,说明下标 i(当前处理的桶的位置)上的元素不为空,且还没有被处理//首先对该下标元素 f 使用 synchronized 进行加锁synchronized (f) {//进来之后第一件事情,先判断数组下标位置的元素是否还等于 fif (tabAt(tab, i) == f) {//如果等于,则说明还没有被修改过Node<K,V> ln, hn;//如果 f 的 hash 值大于等于 0(即判断该元素是链表还是红黑树的节点)if (fh >= 0) {//如果大于 0 ,说明这个桶中的元素是链表类型的节点//实际上这个分支中的代码应该是将链表转移的逻辑//将 f 的 hash 值与原数组的长度进行与操作//runBit 变量其实就是节点的 hash 值参与计算数组下标位置的比较部分往左移了一位的值//如果这一位是 0(runBit = 0),代表迁移过去的位置还是原数组下标位置//如果这一位是 1,代表迁移过去的位置是原数组下标 + 旧数组长度的位置int runBit = fh & n;//lastRun 变量即为链表上,最后一个与前节点的 runBit 不相等的节点//为什么要这样设置?//因为这样的话,到了这个 lastRun 节点后面的节点就没有必要再往下遍历了//因为到了 lastRun 节点,后面的节点的 runBit 都跟 lastRun 节点一样//意思就是说后面节点都不用动,只需要将 lastRun 迁过去就可以了Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {//遍历旧链表,//这个循环的作用就是找到 lastRun 的位置int b = p.hash & n;if (b != runBit) {//如果计算出来的 runBit 与通过 f 计算出来的 runBit 不一致//就把 runBit 重新赋值// lastRun 变量也赋值为最新遍历到的这个元素runBit = b;lastRun = p;}}//判断 runBit 是否等于 0if (runBit == 0) {//如果是,则将 ln 赋值为 lastRun//所以 ln 代表的含义就是 lastRun 应该要迁移到原数组下标的链表头节点ln = lastRun;hn = null;}else {//如果不等于 0,则将 hn 赋值为 lastRun//所以 hn 代表的含义就是 lastRun 应该要迁移到原数组下标 + 原数组长度位置的链表头节点hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {//这个循环//将 f 到 lastRun 中间的所有节点使用头插法,再根据 runBit 的不同分别组成高位和低位两条新的链表//即 ln 与 hn,低位链表与高位链表int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}//将新数组中,下标为 i (即原数组下标位置)位置的元素设置为低位链表setTabAt(nextTab, i, ln);//将新数组中,下标为 i + n(即原数组下标 + 原数组长度位置)位置的元素设置为高位链表setTabAt(nextTab, i + n, hn);//将原数组中,下标为 i (即原数组下标位置)位置的元素设置为 ForwardingNode 类型的节点//表示这个位置上的元素已经迁移完成setTabAt(tab, i, fwd);//将 advance 属性赋值为 truadvance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}
}

ConcurrentHashMap helpTransfer 方法的流程是怎样的呢?

helpTransfer协助扩容方法,其主要流程如下:

  • 首先进行一些判断
    • 当前数组不能为空
    • 数组下标位置的节点是 FowardingNode 类型
    • FowardingNode 节点的 nextTable 属性不为空
  • 同时满足上述的三个条件后,进行下一步逻辑处理,否则直接将当前数组 table 对象返回出去
  • 使用 resizeStamp 方法,计算数组长度的扩容戳(resizeStamp,简写为 int rs 变量)
    • 具体的实现就是首先调用 Integer.numberOfLeadingZeros() 计算数组长度最高非 0 位前的 0 的个数,由于数组的长度始终是 2 的整数次幂,所以当数组的长度发生变化时(即发生扩容时),该值肯定是会变化的(每次扩容后最高非 0 位往左移 1 位,则该数值减少了 1
    • 再将 1 往左移 15 位,最后将两个值做 ^ 或运算,(相当于将两个值相加),即得到了扩容戳数值
    • 可以看出,扩容戳的取值范围为 [215[2^{15}[215 , 215+32]2^{15}+32]215+32],且数组每次扩容,该数值将会 -1
  • 进入循环,判断 nextTabtable 属性是否发生变化(判断其引用是否发生变化),判断 sizeCtl 属性是否小于 0(初始化完成后,sizeCtl 属性小于 0 说明在进行扩容)
  • 如果不满足条件,直接将栈帧中的本地变量 nextTab 属性返回出去
  • 满足条件则进入循环
  • 判断 sizeCtl 往右移 16 位后是否等于扩容戳(如果不等于,说明数组的大小已经发生了变化)
  • 判断 transferIndex 是否小于等于 0
  • 如果满足条件,则说明线程已经完成了扩容,则直接跳出循环,将栈帧中的本地变量 nextTable 属性返回出去
  • 如果不满足条件,则使用 CAS 尝试将 sizeCtl 属性 +1(代表协助扩容的线程数量 +1 了)
  • 如果 CAS 成功,则执行扩容方法

结合源码来看:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;//首先进行判断//1.当前数组不能为空//2.数组下标位置的节点是 FowardingNode 类型//3.数组下标位置的节点的 nextTable 属性不为空if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 使用 resizeStamp 方法,计算数组长度的扩容戳int rs = resizeStamp(tab.length);//判断 nextTable,table 属性是否发生变化(判断其引用是否发生变化)//判断 sizeCtl 属性是否小于 0while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {//判断 sizeCtl 往右移 16 位后是否不等于扩容戳//判断 transferIndex 是否小于等于 0//其他两个条件我认为是无效条件,不可能成立的,所以不去纠结代表的含义了//如果满足上面说的两个条件,则说明线程已经完成了扩容,则直接跳出循环if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;//使用 CAS 尝试将 sizeCtl 属性 +1if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//如果成功,代表协助扩容的线程数量 +1 了//执行扩容方法transfer(tab, nextTab);break;}}//将栈帧中的本地变量 nextTab 属性返回出去return nextTab;}//将当前数组返回出去return table;
}

ConcurrentHashMap 的 get 方法的流程?

ConcurrentHashMapget 方法,是不加锁的,具体的流程如下:

  • 首先通过 sepread 方法,计算出 keyhash 值(计算方法就是将 keyhashcode 往右移 16 位后与原 hashcode 进行异或运算)
  • 判断数组是否为空,如果为空,则直接返回空
  • 如果数组不为空,则根据 hash & 数组长度 -1 得到节点在数组中的位置,判断这个位置上的节点是否为空
  • 如果为空,则返回空
  • 如果不为空,则用 equals 方法判断下标位置的这个节点的 key 是否与输入的 key 相等,如果相等,则将 value 返回出去
  • 如果不相等,则判断下标位置的节点的 hash 值是否小于 0
    • 如果小于 0,则说明该位置上的节点是 FowardingNode 类型(hash 值为 -1),或者 TreeBin 类型(hash 值为 -2
      • 如果是 FowardingNode 类型,则说明数组正在进行扩容且这个节点已经迁移到了新的数组中,则在 ForwardingNodenextTable 属性(即扩容后的新数组)中,查找节点
      • 如果是 TreeBin 类型,则在红黑树中执行查找的逻辑
    • 如果大于等于 0,则说明该位置上的节点是链表类型,则遍历链表查找元素

ConcurrentHashMap 的 sizeCtl 的含义,以及值的流转过程?

ConcurrentHashMapsizeCtl,在不同的时间有不同的含义,详解如下:

  • 调用构造器完成后,sizeCtl 表示当前容量(使用无参构造器时,sizeCtl = 0,使用带参构造器时,sizeCtl = 当前容量
  • 当前正在执行初始化数组时,sizeCtl 的值为 -1,代表正在初始化数组
  • 当数组初始化完成后,sizeCtl 表示扩容阈值,值为数组长度的 0.75
  • 当扩容中时,sizeCtl 的高 16 位代表的是扩容戳(即 2152^{15}215 + 当前数组长度的最高非 0 位前面的 0 的个数),低 16 位代表的是参与扩容的线程数 + 1

ConcurrentHashMap 的 size 方法的流程?

size 方法,即统计 ConcurrentHashMap 当前已存入的元素个数

  • 调用 sumCount 方法
  • sumCount 方法内部,将 baseCount 和所有的 CounterCell 内部的 value 值进行累加,得到的就是当前已存入的元素个数
  • 判断元素个数是否大于整型值的最大值,如果是就返回整型值的最大值

其他

如果 ConcurrentHashMap 的某个数组下标位置是一颗红黑树,那么这个位置上的节点类型是 TreeNode 吗?

ConcurrentHashMap 如果某个桶里面是一颗红黑树,那么该数组下标位置就是一个 TreeBin 对象,而不是一个 TreeNode 对象,TreeBin 对象相当于在 TreeNode 对象外面套了个壳子,TreeBin 对象有一个 TreeNode 属性,这个属性就是红黑树的根节点。

为什么要用 TreeBin 对象作为这个位置上的节点,而不是 TreeNode 对象呢?

这是因为在修改红黑树的时候,理论上来说需要对红黑树的根节点进行加锁,但是实际上,在红黑树的修改过程中,根节点很可能因为树的自平衡动作而被修改为其他节点。所以单纯使用红黑树的根节点作为锁对象是不靠谱的。

ConcurrentHashMap 的 size 方法会返回最新的值吗?

ConcurrentHashMapsize 方法不会返回最新的值,只会返回调用方法那一刻元素数量的快照结果。

意思就是说,如果在 size 方法被调用的过程中,元素的数量发生了变化,那返回的元素数量依然是调用 size 方法那一刻的快照值。

这是因为,在 size 方法内部,是没有采取任何同步措施的

  • 计算时取的计算依据 counterCellsbaseCount 属性,都是在调用方法那一刻的快照引用,如果在计算的过程中,这两个计算依据发生了变化,那么计算时还是用的旧值进行计算的
  • 在对 counterCellls 数组中的 CounterCell 对象的 value 属性进行遍历累加时,如果累加过后,该属性发生了变化,那么返回的数值就不是最新的值了

transferIndex 的真正含义

代表的是,当前给线程分配任务的边界,即已经分配给线程处理扩容的区间为 [transferIndex, newTableSize),而还没有被分配给线程处理扩容的区间为:[0,transferIndex)
所以,transferIndex 小于等于 0 时,并不意味着扩容就结束了,而只是意味着将整个数组的扩容任务都分给了参与扩容的线程

ConcurrentHashMap 总结

ConcurrentHashMap 是一个高性能的并发安全的 Map,常用做堆缓存,例如 Spring 的单例池,对象池等。除去处理并发相关操作外,主体流程与 HashMap 的数据操作流程基本一致。

put 方法流程总结

  • 首先计算 keyhash
  • 判断表是否为空,如果为空则需要先进行初始化
    • 当表为空时,一直循环操作
    • 首先看是否有其他线程正在执行初始化操作(判断 sizeCtl 参数是否小于 0),如果有,则调用 Thread.yield() 方法让出 CPU 执行时间片,进入下次循环
    • 尝试使用 CASsizeCtl 参数替换为 -1,如果替换成功,则当前线程执行表初始化操作
  • 根据 hash& 数组长度 - 1 找到数组中对应桶的位置
  • 如果该位置上没有元素,则尝试使用 CAS 把待插入的元素替换到该位置上,如果成功则跳出循环
  • 如果该位置上有元素,则判断该位置上的元素是否处于扩容状态,如果是,则协助进行扩容
  • 上述条件都不满足,则尝试对该位置上的元素使用 synchronized 进行加锁
    • 加锁成功后,判断该位置上的元素有没有变化,如果有,说明有其他线程已经对这个位置上的元素做了改变,进入下次循环
    • 判断该桶上的数据结构是链表还是红黑树,如果是链表则使用尾插法插入新元素,如果是红黑树则执行红黑树的插入逻辑
  • 判断链表是否要转化为红黑树(当前表的长度大于等于 64 且链表的长度大于等于 8),如果是,则执行链表转化红黑树的操作
  • 如果是覆盖了旧值,则直接将旧值返回
  • 将元素数量 + 1(执行 addCount 方法)

ConcurrentHashMap 的元素数量计数

ConcurrentHashMap 中的元素数量,是采用了 LongAdder 类的设计思想,当前元素的数量并不是用一个数值变量来表示的,而是由一个计数器数组(CounterCell 类型的数组) 来维护的,当需要获取当前元素数量时,会将当前计数器数组的快照进行遍历累加,最后才能得到当前数组中的元素数量。
这样做的好处就是当由多个线程都要去并发修改元素数量时,降低发生竞争的可能性。
试想一下,如果说只是用一个 volatile 修饰的数值类型 + CAS 来修改元素数量,那么当同一时刻有多个线程去修改元素数量时,每次都只会有一个线程修改成功,那么其余的线程都相当于空转了一次,当并发的线程数量很多时,大多数线程将都会做类似自旋操作,这样就白白浪费了 CPU 资源。
造成上述问题的根本性原因就是临界资源的粒度太粗,导致发生竞争的可能性非常大。所以 CounterCell 数组的设计,正是将临界资源的粒度给细化了,当一个线程对某个 CounterCell 的计数值修改失败后,将会转而去尝试修改其他 CounterCell 的数值,这样就降低了发生竞争的可能性,从而提升了修改操作的命中率。

ConcurrentHashMap 的扩容操作

ComcurremtHashMap 的扩容操作,是允许多个线程协助共同进行扩容操作的。

  • 在判断当前数组需要扩容(sizeCtl > 0 时,代表的含义就是扩容阈值)之后,首先发起扩容操作的线程就会把 sizeCtl 的值使用 CAS 修改为高 16 位代表扩容戳(215 次方 + 扩容前数组的长度最高非 0 位前的 0 的个数),低 16 位为 2 的数值,这个值小于 0
  • 第一个进行扩容操作的线程负责进行新数组的初始化
  • 后来在 ConcurrentHashMap 中执行操作的线程发现当前正在执行扩容后,将会进行协助扩容,协助扩容之前将会用 CAS 操作尝试将 sizeCtl 的值 +1,即 sizeCtl 的低 16+1,即参与扩容的线程数量 +1
  • 在参与扩容的每个线程,都会尝试使用 CAS 修改 transferIndex 的值(领取任务),修改后的 transferIndex 的值与修改前的 transferIndex 的值的区间范围,即为该线程负责进行扩容的数组下标范围,线程将会针对该范围内的每一个位置上的元素都进行扩容操作
  • 线程完成自己负责扩容的数组下标范围后,将会再次判断扩容有没有完成
    • 如果没有,再次尝试修改 transferIndex 的值以获取负责进行扩容的数组下标范围(再次领取任务),再次进行扩容操作
    • 如果 transferIndex 的值已经小于 0 了(已经没有可以领取的任务了),那么线程会完整地检查一遍原数组,看还有没有元素没有被转移
  • 所有工作完成,将会把 sizeCtl 参数 -1 后退出扩容方法,最后一个线程将会把原数组替换成新数组

ConcurrentHashMap 的设计思想总结

大量的无锁并发安全处理操作

  • ConcurrentHashMap 中的很多变量都使用了 volatile 关键字修饰,可以确保在变量值在被一个线程修改后,其他线程能立马得到这个修改后的值
  • ConcurrentHashMap 在修改变量值时,采用的是 CAS + 自旋重试的操作,可以在不使用锁来阻塞其他参与线程的情况下并发安全地修改变量值

细化临界资源粒度

  • ConcurrentHashMap 使用了计数器数组(CounterCell 数组)来降低修改元素数量时的发生并发竞争概率
  • 在添加新元素且这个新元素对应的数组下标位置有节点存在时,ConcurrentHashMap 锁住的是数组下标位置上的这个元素(链表头节点或者红黑树的根节点),使不同数组下标位置的桶上的修改操作互不影响,降低了发生并发竞争的概率

高效的扩容机制

高效的扩容机制主要的核心设计思想在于 ConcurrentHashMap 使用 transferIndex 来进行分段扩容,这样做的好处有:

  • 多线程协助共同完成扩容:ConcurrentHashMap 使用了多线程协助共同完成扩容的机制,使得 ConcurrentHashMap 的扩容操作在多线程场景下,不会让其他线程阻塞等待单个线程操作扩容完毕,提高了单个线程的执行效率,也使整体的扩容效率大大提升
  • 在扩容期间仍可以无阻塞访问数据:假设现在有一个线程想要调用 get 方法,并且当前 ConcurrentHashMap 正在执行扩容操作,那么可能遇见的场景有以下几种:
    • key 对应的桶已经完成了扩容(但是还有其他桶没有完成扩容),那么原数组中的桶的位置上将会放置一个 ForwardingNode 类型的桶,那么线程可以通过 nextTable(新数组) 完成对数据的访问
    • key 对应的桶还没有开始进行扩容,那么直接访问原数组中的桶就可以完成对数据的访问
    • key 对应的桶正在执行扩容,由于 get 方法访问的是调用时刻的原数组快照,所以该桶正在执行扩容时还没有对其完成改变,所以直接访问原数组中的桶就可以完成对数据的访问

高效的状态管理机制

ConcurrentHashMap 使用单个整形变量来标识当前数组所处状态,将单个整形变量根据位数不同划分了不同的含义,减少了多余的状态值定义,一定程度上减少了内存消耗以及提升了整体效率

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/557296.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js 二叉树图形_js数据结构和算法(三)二叉树

二叉树的概念二叉树(Binary Tree)是n(n>0)个结点的有限集合&#xff0c;该集合或者为空集(空二叉树)&#xff0c;或者由一个根结点和两棵互不相交的、分别称为根结点的左子树和右子树的二叉树组成。二叉树的特点每个结点最多有两棵子树&#xff0c;所以二叉树中不存在度大于…

【java8新特性】——Stream API详解(二)

一、简介 java8新添加了一个特性&#xff1a;流Stream。Stream让开发者能够以一种声明的方式处理数据源&#xff08;集合、数组等&#xff09;&#xff0c;它专注于对数据源进行各种高效的聚合操作&#xff08;aggregate operation&#xff09;和大批量数据操作 (bulk data op…

Spring bean 不被 GC 的真正原因

概述 自从开始接触 Spring 之后&#xff0c;一直以来都在思考一个问题&#xff0c;在 Spring 应用的运行过程中&#xff0c;为什么这些 bean 不会被回收&#xff1f; 今天深入探究了这个问题之后&#xff0c;才有了答案。 思考点 大家都知道&#xff0c;一个 bean 会不会被回…

ad域时间源配置_域控制器server2012时间同步NTP配置

一、域控配置1.修改注册表&#xff0c;设置域控服务器名称2.设置组策略&#xff0c;启动NTP服务器3.域策略中设置windows time服务自动启动二、服务端配置(Ntp服务器&#xff0c;客户端将根据这台服务器的时间进行同步)1、微软键R键&#xff0c;进入“运行”&#xff0c;输入“…

【java8新特性】——Optional详解(三)

一、简介 Optional类是Java8为了解决null值判断问题&#xff0c;借鉴google guava类库的Optional类而引入的一个同名Optional类&#xff0c;使用Optional类可以避免显式的null值判断&#xff08;null的防御性检查&#xff09;&#xff0c;避免null导致的NPE&#xff08;NullPo…

不使用 + 和 - 运算符计算两整数之和

问题概述 不使用运算符 和 -&#xff0c;计算两整数之和 思考 不使用 和 - &#xff0c;那就只能想到用位运算来处理了。思路如下&#xff1a; 两数进行 ^&#xff08;异或运算&#xff09;&#xff0c;可以得到两个数在相同位上数值不同的相加结果两数进行 &&#x…

vts传感器采取船舶的_详解虎门大桥监测系统:传感器与物联网功不可没

来源&#xff1a;传感器专家网近日&#xff0c;虎门大桥“虎躯一震”给全国人民来了个“深呼吸”。虎门大桥是广东沿海地区重要的交通枢纽&#xff0c;始建于1992年&#xff0c;1997年通车至今&#xff0c;大桥一直都十分平稳。但在5月5日下午&#xff0c;虎门大桥发生异常抖动…

宝塔安装sqlserver_linux宝塔面板安装安装 pdo_sqlsrv扩展

第一步安装源curl https://packages.microsoft.com/config/rhel/7/prod.repo > /etc/yum.repos.d/mssqlrelease.repo第二步安装驱动yum install msodbcsql mssql-tools unixODBC-devel第三步下载pdo-sqlsrv源码wget http://pecl.php.net/get/pdo_sqlsrv-5.6.1.tgztar -zxvf …

【java8新特性】——方法引用(四)

一、简介 方法引用是java8的新特性之一&#xff0c; 可以直接引用已有Java类或对象的方法或构造器。方法引用与lambda表达式结合使用&#xff0c;可以进一步简化代码。 来看一段简单代码&#xff1a; public static void main(String[] args) {List<String> strList Ar…

MySQL 排名函数.md

概述 MySQL 自带的排名的函数&#xff0c;主要有&#xff1a; row_number()rank()dense_rank()ntile() 测试数据 测试数据如下所示&#xff1a; row_number() 函数 用法如下&#xff1a; SELECT row_number() OVER (ORDER BY Salary DESC) row_num,Salary FROMEmployee查…

【java8新特性】——默认方法(五)

一、简介 默认方法是指接口的默认方法&#xff0c;它是java8的新特性之一。顾名思义&#xff0c;默认方法就是接口提供一个默认实现&#xff0c;且不强制实现类去覆写的方法。默认方法用default关键字来修饰。 默认方法可以解决的痛点&#xff1a; 在java8之前&#xff0c;修…

Java 序列化总结.md

概述 序列化&#xff1a;将对象写入到 IO 流中反序列化&#xff1a;从 IO 流中恢复对象 实现方法 实现 Serializable 或者 Externalizable Serializable&#xff1a;标记接口&#xff0c;不用实现任何方法&#xff0c;可以指定序列化 IDExternalizable&#xff1a;增强的序…

多线程买票案例

测试类 package thead;public class testThread {public static void main(String [] arg){Tickets ticket new Tickets();Thread t1 new Thread(ticket,"窗口一&#xff1a;");Thread t2 new Thread(ticket,"窗口二&#xff1a;");Thread t3 new Thr…

深度学习auc_机器学习集成学习与模型融合!

↑↑↑关注后"星标"Datawhale每日干货 & 每月组队学习&#xff0c;不错过Datawhale干货 作者&#xff1a;李祖贤&#xff0c;深圳大学&#xff0c;Datawhale高校群成员对比过kaggle比赛上面的top10的模型&#xff0c;除了深度学习以外的模型基本上都是集成学习的…

常用并发工具类(锁和线程间通信工具类)

常用并发工具类总结 JUC 下的常用并发工具类&#xff08;锁和线程间通信工具类&#xff09;&#xff0c;主要包括 ReentrantLock、ReentrantReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore、Exchanger ReentrantLock 和 ReentrantReadWriteLock ReentrantLock 是…

of方法:给集合一次性添加多个元素

of()方法只是Map&#xff0c;List&#xff0c;Set这三个接口的静态方法&#xff0c;其父类接口和子类实现并没有这类方法&#xff0c;比如 HashSet&#xff0c;ArrayList返回的集合是不可变的&#xff0c;再次添加会报错Set与Map集合不可以存储重复的元素&#xff0c;否则会报错…

数控车椭圆编程实例带图_数控车床编程教程,图文实例详解

一、数控车编程特点(1) 可以采用绝对值编程(用X、Z表示)、增量值编程(用U、W表示)或者二者混合编程。(2) 直径方向(X方向) 系统默认为直径编程&#xff0c;也可以采用半径编程&#xff0c;但必须更改系统设定。(3) X向的脉冲当量应取Z向的一半。(4)采用固定循环&#xff0c;简化…

常用并发工具类(并发集合类)

文章目录概述BlockingQueueArrayBlockingQueue数据存储相关属性阻塞特性相关属性主要方法LinkedBlockingQueueLinkedBlockingQueue 主要属性LinkedBlockingQueue 设计思想ConcurrentLinkedQueuePriorityBlockingQueuePriorityBlockingQueue 主要属性PriorityBlockingQueue 设计…

参考文献起止页码怎么写_毕业论文文献综述不会写?快来看看这篇文章(附含通用模板)...

文献综述是对所研究主题的现状进行客观的叙述和评论、寻求新的研究突破点。一个资料全面、研究深入的综述不仅可以帮助作者确立毕业论文的选题&#xff0c;还可以为论文的深入研究提供有力的支撑。本文分享一份"毕业论文文献综述万能模板",以供参考。一、文献综述的基…

常用并发工具类(线程池)

文章目录概述ThreadPoolExecutorThreadPoolExecutor 的主要属性Worker 主要属性线程池的状态线程池的状态流转线程池提交任务的执行流程线程数量的设置线程池的种类FixedThreadPoolCachedThreadPoolSingleThreadExecutorScheduledThreadPoolExecutorSingleThreadScheduledExecu…