您当前的位置: 首页 > 

星夜孤帆

暂无认证

  • 6浏览

    0关注

    626博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ConcurrentHashMap实现原理及源码分析

星夜孤帆 发布时间:2021-08-29 20:30:02 ,浏览量:6

一、ConcurrentHashMap跟HashMap,HashTable的对比

1. HashMap不是线程安全:

在并发环境下,可能会形成环状链表(扩容时可能造成,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的

2. HashTable是线程安全的: HashTable和HashMap的实现原理几乎一样,

差别:

1.HashTable不允许key和value为null;

2.HashTable是线程安全的。

HashTable线程安全的策略实现代价却比较大,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,

多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,见下图:

3. ConcurrentHashMap是线程安全的:

JDK1.7版本:JDK7ConcurrentHashMap源码

容器中有多把锁,每一把锁锁一段数据,这样在多线程访问不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。

这就是ConcurrentHashMap所采用的"分段锁"思想,见下图:

 

每一个segment都是一个HashEntry[] table, table中的每一个元素本质上都是一个HashEntry的单向队列(原理和hashMap一样)。

比如table[3]为首节点,table[0]->next为A,之后为B,依次类推。

public class ConcurrentHashMap extends AbstractMap
        implements ConcurrentMap, Serializable {

    // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对
    // 不属于同一个片段的节点可以并发操作,大大提高了性能
    final Segment[] segments;

    // 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了ReentrantLock, 可以作为互斥锁使用
    static final class Segment extends ReentrantLock implements Serializable {
        transient volatile HashEntry[] table;
        transient int count;
    }

    // 基本节点,存储Key, Value值
    static final class HashEntry {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry next;
    }
}

JDK1.8版本:做了2点修改,见下图:

取消Segments字段,直接采用transient volatile HashEntry[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,

并发控制使用Synchronized和CAS来操作

将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构.

在ConcurrentHashMap中通过一个Node[]数组来保存添加到map中的键值对,而在同一个数组位置是通过链表和红黑树的形式来保存的。

但是这个数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个ConcurrentHashMap对象的话,只是设定了一个sizeCtl变量,

这个变量用来判断对象的一些状态和是否需要扩容,后面会详细解释。

第一次添加元素的时候,默认初始长度为16,当往map中继续添加元素的时候,通过hash值跟数组长度取余来决定放在数组的哪个位置,如果出现放在同一个位置的时候,

优先以链表的形式存放,在同一个位置的个数又达到了8个以上,如果数组的长度还小于64的时候,则会扩容数组。

如果数组的长度大于等于64了的话,才会将该节点的链表转换成红黑树。

通过扩容数组的方式来把这些节点给分散开。然后将这些元素复制到扩容后的新数组中,同一个链表中的元素通过hash值的数组长度位来区分,

是放在原来的位置还是放到扩容的长度的相同位置去 。

在扩容完成之后,如果某个节点是树,同时现在该节点的个数又小于等于6个了,则会将该树转为链表。

取元素的时候,相对来说比较简单,通过计算hash来确定该元素在数组的哪个位置,然后在通过遍历链表或树来判断key和key的hash,取出value值。

二、ConcurrentHashMap源码分析

 2.1 基本属性
// node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1  8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
// 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量, kc = null;
                for (TreeNode p = r;;) {
                    int dir, ph;
                    K pk = p.key;
                    if ((ph = p.hash) > h)
                        dir = -1;
                    else if (ph < h)
                        dir = 1;
                    else if ((kc == null &&
                              (kc = comparableClassFor(k)) == null) ||
                             (dir = compareComparables(kc, k, pk)) == 0)
                        dir = tieBreakOrder(k, pk);
                        TreeNode xp = p;
                    if ((p = (dir = TREEIFY_THRESHOLD)    //当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree
                        treeifyBin(tab, i);    
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);    //计数
        return null;
    }

当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,

如果没有初始化就先调用initTable() 方法来进行初始化过程

然后通过计算hash值来确定放在数组的哪个位置

如果没有hash冲突就直接CAS插入,如果hash冲突的话,则取出这个节点来操作

如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制

最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作

然后判断当前取出的节点位置存放的是链表还是树

如果是链表的话,则遍历整个链表,直到取出来的节点的key来和要放的key进行比较,如果key相等,并且key的hash值也相等的话,

则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾

如果是树的话,则调用putTreeVal方法把这个元素添加到树中去

最后在添加完成之后,调用addCount()方法统计size,判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,

则调用treeifyBin方法来尝试将此处的链表转为树,或者扩容数组。

如果没有初始化就先调用initTable()方法来进行初始化过程

private final Node[] initTable() {
    Node[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {//空的table才能进入初始化操作
        if ((sc = sizeCtl) < 0) //sizeCtl 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node[] nt = (Node[])new Node[n];//初始化
                    table = tab = nt;
                    sc = n - (n >>> 2);//记录下次扩容的大小
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容

/**
 *帮助从旧的table的元素复制到新的table中
 */
final Node[] helpTransfer(Node[] tab, Node f) {
    Node[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode)f).nextTable) != null) { //新的table nextTba已经存在前提下才能帮助扩容
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex  1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node[] nt = (Node[])new Node[n = bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex)  stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 已经完成所有节点复制了
                if (finishing) {
                    nextTable = null;
                    table = nextTab;        // table 指向nextTable
                    sizeCtl = (n >> 1);     // sizeCtl阈值为原来的1.5倍
                    return;     // 跳出死循环,
                }
                // CAS 更扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) = 0 ,表示为链表节点
                        if (fh >= 0) {
                            // 构造两个链表  一个是原链表  另一个是原链表的反序排列
                            int runBit = fh & n;
                            Node lastRun = f;
                            for (Node p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node(ph, pk, pv, ln);
                                else
                                    hn = new Node(ph, pk, pv, hn);
                            }
                            // 在nextTable i 位置处插上链表
                            setTabAt(nextTab, i, ln);
                            // 在nextTable i + n 位置处插上链表
                            setTabAt(nextTab, i + n, hn);
                            // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
                            setTabAt(tab, i, fwd);
                            // advance = true 可以执行--i动作,遍历节点
                            advance = true;
                        }
                        // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
                        else if (f instanceof TreeBin) {
                            TreeBin t = (TreeBin)f;
                            TreeNode lo = null, loTail = null;
                            TreeNode hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode p = new TreeNode
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 扩容后树节点个数若>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex             
关注
打赏
1636984416
查看更多评论
0.1308s