ConcurrentHashMap详解

ConcurrentHashMap摘要

ConcurrentHashMap存在于java.util.concurrent包中,此包扩展了java.util包,丰富了Java容器在多线程环境下的运用。ConcurrentHashMap是HashMap在多线程环境下的实现,保证了容器的线程安全性,并且取代了HashTable这样效率极其低下的容器。

ConcurrentHashMap继承体系

ConcurrentHashMap<K,V>依旧是key-value键值对形式,继承了AbstractMap<K,V>,实现了ConcurrentMap<K,V>以及Serializable接口,内部定义一个serialVersionUID用于序列化,继承体系图如下所示。

ConcurrentHashMap数据结构

和HashMap类似,ConcurrentHashMap底层也是数组+链表的形式,并且JDK1.7和JDK1.8存在差异。

下图是JDK1.7中ConcurrentHashMap的结构图。ConcurrentHashMap是由Segment分段锁数组组成,分段锁数组每个单元中包含了多个HashEntry,每个HashEntry之间使用链表串联。

Segment分段锁ConcurrentHashMap一个内部类,主要代码如下所示。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;
    
    // 和HashMap中的HashEntry作用一样,真正存放数据的桶
    transient volatile HashEntry<K,V>[] table;

    transient int count;
    
    // 快速失败fail-fast使用的变量
    transient int modCount;
    
    // 大小
    transient int threshold;
    
    // 负载因子
    final float loadFactor;
}

HashEntry和HashMap差不多,但差别在于HashEntry使用了volatile修饰了数据以及下一个节点。

volatile是一个轻量级的同步机制关键字,和synchronized关键字挺相似,差别在于synchronized保证了操作的原子性,但volatile不能保证。

volatile作用:

(1)保证可见性:线程将主内存中的数据拷贝到自己的工作内存中进行操作,操作后再将数据写回主内存,一个线程对于数据的改动其他线程立即可感知到;例如开启两个线程,一个线程对于加了volatile的变量进行改变,之后第二个线程会感知到此变量已被改变;

(2)不保证原子性:原子性即完整性,某个线程在做某个业务时,中间不可以被分割,要么同时成功,要么同时失败;注意,i++这样的操作不是原子性操作;例如20个线程,每个线程循环1000次,每次将count值加1,结束后很可能不等于20000;如何解决不保证原子性:使用JUC下的AtomicInteger的getAndIncrement方法,底层原理是CAS;

(3)禁止指令重排:单线程环境中确保程序最终执行结果和代码顺序执行结果一致,多线程环境线程交替执行,由于编译器优化重排存在,两个线程中使用的变量能否保证一致无法确定,结果无法预测;例如如下所示,语句可以按1234顺序执行也可以按1324执行,但不可以按4123执行,因为存在数据依赖性。

int x = 11;
int y = 12;
x = x + 5;
y = x * x;

为何ConcurrentHashMap并发度高呢?因为其采用了Segment分段锁机制,Segment继承自ReentrantLock。ConcurrentHashMap不会像HashTable那样不管如何操作都加上synchronized锁,ConcurrentHashMap支持Segment数组数量的并发数,例如Segment容量为16,那么其允许同时16个线程操作者16个Segment,每个线程访问的Segment不会影响到其他线程,就像厕所里每个隔间的使用者不会影响到其他隔间一样。

我们来看一下其put操作,首先定位到Segment,然后进行put操作。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException(); // 这里表明不能插入null值
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          
         (segments, (j << SSHIFT) + SBASE)) == null) 
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 将当前Segment中的table通过key的hashcode定位到HashEntry
    HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // 遍历该HashEntry,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖旧的value
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                // 不为空则需要新建一个HashEntry并加入到Segment中,同时会先判断是否需要扩容
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 释放锁
        unlock();
    }
    return oldValue;
}

首先通过tryLock()尝试获取锁,获取失败肯定有其他线程进行竞争,则利用scanAndLockForPut()自旋获取锁(若是达到最大自旋次数则改为阻塞获取锁,因为自旋次数过多会严重损耗CPU性能)。

get的逻辑十分简单,首先key通过hash之后定位到具体的Segment,然后再通过一次hash定位到具体元素上。get逻辑十分高效,因为整个过程都不用加锁。

JDK1.7和JDK1.8在ConcurrentHashMap数据结构上发生了些许变化,主要目的就是提高查询效率。由于JDK1.7还是基于数组+链表的方式构造数据结构,前面HashMap文章中说明了引入红黑树代替链表能够得到更高的效率,因此JDK1.8中也引入了红黑树,链表大于8时进行链表转换为红黑树。

此外,JDK1.8放弃了Segment分段锁,转而采用了CAS+synchronized保证并发访问安全性,并且将HashEntry改为Node。

JDK1.8中ConcurrentHashMap进行put操作主要有以下步骤:

(1)根据key计算出HashCode;

(2)判断是否需要进行初始化;

(3)为当前key定位出Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋;

(4)如果当前位置的hashCode==MOVED==-1,表示需要扩容;

(5)若都不满足,则利用synchronized锁写入数据;

(6)若链表数量大于TREEIFY_THRESHOLD则要转化为红黑树。

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException(); // 不允许插入null
    // (1)根据key计算出hashCode
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
        ConcurrentHashMap.Node<K,V> f; int n, i, fh;
        // (2)判断是否需要初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 为当前key定位出Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋
            if (casTabAt(tab, i, null,
                    new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果当前位置的hashCode==MOVED==-1,表示需要扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 若都不满足,则利用synchronized锁写入数据
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            ConcurrentHashMap.Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new ConcurrentHashMap.Node<K,V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof ConcurrentHashMap.TreeBin) {
                        ConcurrentHashMap.Node<K,V> p;
                        binCount = 2;
                        if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key,
                                value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 若链表数量大于TREEIFY_THRESHOLD则要转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

补充一下上面说到的CAS与自旋(后面还会在多线程高并发章节中重点讲解到)。

CAS即compareAndSwap,比较与交换,是一种乐观锁,JUC包下很多类都是基于CAS实现的。线程在读取数据时不进行加锁,在准备写回数据时,比较下要写回位置的数据是否进行了修改,未修改则写回,若被修改再重新进行读取修改流程。乐观锁即表示我们人为并发的操作并不会总发生,当发生时我们再去加锁,其他时候是无锁状态。

下面是我们使用AtomicInteger类实现的CAS操作示例,AtomicInteger是原子类,用于替换i++之类的非原子性操作。

import java.util.concurrent.atomic.AtomicInteger;

class CAS {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(5);
        System.out.println(atomicInteger.compareAndSet(5, 2019) + "\t current data " + atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(5, 2014) + "\t current data " + atomicInteger.get());
        // true   current data 2019
        // false  current data 2019
    }
}

当然,CAS会产生一个不大不小的问题,叫做ABA,意思是某个时候某个内存位置值从A被某线程修改为B后,在其他线程还未反应过来时此线程又将该值从B修改回A,等其他线程来访问时貌似A值并未改变。这个问题在有些情况下或许无所谓,只要最终结果一致就行,但类似银行流水之类的问题,每个记录操作都需要被记录,这时ABA问题就是大问题了。解决方法很多,可以使用带时间戳(或者版本号)的原子引用,每次操作后将版本号加1,因此,即使内存值相同,但操作后版本号肯定不同。如下示例。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;

public class Test {
    static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);

    public static void main(String[] args) {
        System.out.println("============以下是ABA问题的产生============");
        new Thread(() -> {
            atomicReference.compareAndSet(100, 101);
            atomicReference.compareAndSet(101, 100);
        }, "t1").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("============以下是ABA问题的解决============");
        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "\t第一次版本号:" + stamp);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "\t第二次版本号:" + atomicStampedReference.getStamp());
            atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "\t第三次版本号:" + atomicStampedReference.getStamp());
        }, "t3").start();
        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "\t第一次版本号:" + stamp);
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName() + "\t修改成功否:" + result + "\t当前最新版本号:" + atomicStampedReference.getStamp());
            System.out.println(Thread.currentThread().getName() + "\t当前最新值:" + atomicStampedReference.getReference());
        }, "t4").start();
    }
}
//============以下是ABA问题的产生============
//true  2019
//============以下是ABA问题的解决============
//t3    第一次版本号:1
//t4    第一次版本号:1
//t3    第二次版本号:2
//t3    第三次版本号:3
//t4    修改成功否:false 当前最新版本号:3
//t4    当前最新值:100

CAS性能是相当高的,但synchronized应该是重量级锁,那为何要引入synchronized呢?其实在JDK中,每个版本都对synchronized关键字锁进行了升级,使其并不是那么“重”。JVM采用了锁升级策略,先使用偏向锁优先同一线程然后再次获取锁;若失败,则升级为CAS轻量级锁,若失败进行短暂自旋,防止被系统挂起;最后若以上都失败则升级为重量级锁。

JDK1.8中get操作又是如何的呢?

(1)首先计算出key的hashCode,根据hashCode寻址找到对应的桶,若在桶上直接返回值;

(2)若是红黑树就按照红黑树方式遍历获取值;

(3)若是链表则按照链表方式遍历获取值。

public V get(Object key) {
    ConcurrentHashMap.Node<K,V>[] tab; ConcurrentHashMap.Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}


end
  • 作者:JJ(联系作者)
  • 发表时间:2021-02-16 10:06
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转载博主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论