玖叶教程网

前端编程开发入门

干货:多线程-ConcurrentHashMap源码详解

概述:

HashMap是集合中最常用的数据结构之一,由于HashMap非线程安全,因此不能用于并发访问场景。在jdk1.5之前,通常使用HashTable作为HashMap的线程安全版本。HashMap对读写操作进行全局加锁,在高并发的条件下会造成严重的锁竞争和等待,极大地降低了系统的吞吐量。

优点:

相比于HashTable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的的基础上提供了更好的写并发能力,并且读操作(get)通常不会阻塞,使得读写操作可并发执行,支持客户端修改ConcurrenHashMap的并发访问度,迭代期间也不会抛出ConcurrentModificationException等等。

缺点:

一致性问题:这是当前所有分布式系统都面临的问题。

注意:

ConcurrentHashMap中key和value值都不能为null,HashMap中key可以为null,HashTable中key不能为null。

ConcurrentHashMap是线程安全的类并不能保证使用ConcurrentHashMap的操作都是线程安全的。

返回目录

ConcurrentHashMap实现原理:

ConcurrentHashMap的基本策略是将table细分为多个Segment保存在数组segments中,每个Segment本身又是一个可并发的哈希表,同时每个Segment都是一把ReentrantLock锁,只有在同一个Segment内才存在竞争关系,不同的Segment之间没有锁竞争,这就是分段锁机制。Segment内部拥有一个HashEntry数组,数组中的每个元素又是一个链表。

为了减少占用空间,除了第一个Segment之外,剩余的Segment采用的是延迟初始化的机制,仅在第一次需要时才会创建(ensureSegment实现) 为了保证延迟初始化存在的可见性,访问segments数组以及table数组的元素,均通过volatile访问,主要借助于Unsafe中原子操作getObjectVolatile来实现,此外,segments中segment的写入以及table中元素和next域的写入均使用UNSAFE.putOrderedObject来完成。这些操作提供了AtomicReferenceArrays的功能。

源码解析:

继承关系:

public class ConcurrentHashMap<K, V>
extends AbstractMap<K, V> //集合一些基本功能的实现
 implements ConcurrentMap<K, V>, //需要实现几个删除添加操作
 Serializable { //序列化

返回目录

成员变量:

 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认容量
 static final float DEFAULT_LOAD_FACTOR = 0.75f; //加载因子
 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 
	//默认并发度,该参数影响segments数组的长度
 static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量
 //最大容量,构造ConcurrentHashMap时指定的大小超过该值,则会使用该值替换
 //ConcurrentHashMap的大小必须是2的幂,且小于等于1<<30 ,以确保不超过int的范围来索引条目
 // 注意,这是集合元素最大容量,而不是线程最大个数
 static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
	//table数组的最小长度,必须是2的幂,至少为2,以免延迟构造后立即调整大小
 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
 //允许的最大segment数量,用于限定构造函数参数concurrent_level的边界
 //也就是最大允许的线程数量。
 static final int RETRIES_BEFORE_LOCK = 2;
	//非锁定情况下调用size和containsValue方法的重试次数,
	//避免由于table连续修改导致无限重试,次数超过则对全局加锁。
 private transient final int hashSeed = randomHashSeed(this);
	//和当前相关联,用于keyhash的随机值,用来减少hash冲突
 private static int randomHashSeed(ConcurrentHashMap instance) {
 if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
 return sun.misc.Hashing.randomHashSeed(instance);
 }
 return 0;
 }
 final int segmentMask;
	//用于索引segment的掩码值(只留下高位),key高位hash码用来选择segment
 final int segmentShift; //用来索引segment偏移值
 final Segment<K,V>[] segments; //数组+数组+链表 //segments创建后其容量不可变
 transient Set<K> keySet;
 transient Set<Map.Entry<K,V>> entrySet;
 transient Collection<V> values;

返回目录

构造函数:

initialCapacity: 创建ConcurrentHashMap对象的初始容量,即HashEntity的总数量,创建时未指定initialCapacity默认16 ,最大容量为MAXIMUM_CAPACITY.

LoadFactor: 负载因子,用于计算Segment的threshlod域

concurrencyLevel: 即ConcurrentHashMap的并发度,支持同时更新ConccurentHashMap且不发生锁竞争的最大线程数。 但是其并不代表实际并发度,因为会使用大于等于该值的2的幂指数的最小值作为实际并发度,实际并发度即为segments数组的长度。如未指定则默认为16 ;

注意:并发度对ConccurentHashMap性能具有举足轻重的作用,如果并发度设置过小,会带来严重的锁竞争问题;如果并发度设置过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

 @SuppressWarnings("unchecked")
 //有参构造
 public ConcurrentHashMap(int initialCapacity, //指定容量
 float loadFactor, int concurrencyLevel) { 
 //加载因子和最大线程容量
 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 throw new IllegalArgumentException();
 if (concurrencyLevel > MAX_SEGMENTS) 
			//给定线程容量大于默认最大容量,采用默认最大容量
			
 concurrencyLevel = MAX_SEGMENTS;
 /*
 * 寻找与给定参数concurrencylevel匹配的最佳数组ssize,
 * 必须是2的幂,如果concurrencylevel是2的幂,那么ssize就是
 * concurrencyevel,否则concurrencylevel为ssize大于concurrency最小2的幂
 *例 : concurrencyLevel为7,则ssize为2^3=8;
 */
		
 // Find power-of-two sizes best matching arguments
 int sshift = 0; //记录左移次数,用来计算segment最大偏移值
 int ssize = 1; //segment数组长度,也是最大线程数量
 while (ssize < concurrencyLevel) {
 ++sshift;
 ssize <<= 1; //采用位运算而不是直接使用concurrenylevel,
 //因为此值可能不一定是2的幂
 }
 this.segmentShift = 32 - sshift; //索引偏移量
 this.segmentMask = ssize - 1; //-1是为了将低位二进制全部变1,达到掩码目的
 if (initialCapacity > MAXIMUM_CAPACITY)
 initialCapacity = MAXIMUM_CAPACITY;
		//给定容量大于默认最大容量,采用默认最大容量
		// 防止索引条目超出int值范围
 int c = initialCapacity / ssize; //每个table数组的最大容量
 if (c * ssize < initialCapacity)
 ++c;
 int cap = MIN_SEGMENT_TABLE_CAPACITY; //table数组最小长度
 while (cap < c) //因为必须是2的幂,所以采用此方式,保证cap最终是比c的2的幂函数
 cap <<= 1;
 // create segments and segments[0]
 Segment<K,V> s0 = //创建segments和第一个segment
 new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
 (HashEntry<K,V>[])new HashEntry[cap]);
 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //segments
 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
 this.segments = ss;
 }
		//指定最大容量和加载因子					 
 public ConcurrentHashMap(int initialCapacity, float loadFactor) { 
 this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
 }
 public ConcurrentHashMap(int initialCapacity) { //指定最大容量
 this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
 }
 public ConcurrentHashMap() { //默认构造
 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
 }
	 //由一个集合构建
 public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
 this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, 
 //集合元素数量处以加载因子就是当前集合容量
 DEFAULT_INITIAL_CAPACITY), //默认容量,两个去最大值
 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); //默认加载因子,默认并发度
 putAll(m); //将集合中元素添加到当前集合中 
 }

返回目录

原子方法:

ConcurrentHashMap主要使用下面几个方法对segments数组和table数组进行读写,并且保证线程安全性。

其主要使用了UNSAFE.getObjectVolatile提供的volatile读语义,UNSAFE.getObjectVolatile提供了Volatile写语义。

使用其好处为:

UNSAFE.getObjectVolatile使得非volatile声明的对象具有volatile读的语义

要使非volatile声明的对象具有volatile写语义则需要借助操作UNSAFE.putObjectvolatile

UNSAFE.putOrderedObject操作的含义和作用是什么:

为了控制特定条件下的指令重排序和内存可见性问题,java编辑器使用了内存屏障的CPU指令来禁止指令重排序。java中volatile写入使用了内存的屏障中的LoadStore屏障规则,对于 Load1->LoadStore->Store2, 在Store2以及后续写入操作被刷出之前,要保证Load1要读取的数据被读取完毕。

volatile的写所插入的storeLoad是一个耗时的操作,因此出现了一个对volatile写的升级版本,利用lazySet方法对性能进行优化,在实现上对volatile的写只会在之前插入storestore屏障,对于这样的Store1 ;StoreStore;Store2,在store2及后续写入操作执行前,保证store1的写入对其它处理器是可见,也就是按顺序写入。 UNSAFE.putOrderedObject正是提供了这样的语义,避免了写写指定重排序,但是不保证内存可见性,因此需要借助volatile读来保证可见性。

ConcurrentHashMap正是利用了这些高性能的原子读写来避免加锁带来的开销。

// 获取给定table的第i个元素,使用volatile读语义。

 @SuppressWarnings("unchecked")
 static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
 return (tab == null) ? null :
 (HashEntry<K,V>) UNSAFE.getObjectVolatile
 (tab, ((long)i << TSHIFT) + TBASE);
 }
 /**
 * Sets the ith element of given table, with volatile write
 * semantics. (See above about use of putOrderedObject.)
 */
	//设置给定table的第i个元素,使用volatile写入语义
 static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
 HashEntry<K,V> e) {
 UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
 }
 /*
	 * 通过Unsafe提供的具有volatile元素访问语义的操作获取Segment数组的第j个元素(如果ss为空)
	 * 注意:因为Segment数组的每个元素只能设置一次(使用完全有序的写入)
	 * 所以,一些性能敏感的方法只能依靠此方法作为对空读取的重新检查。
	 */
 @SuppressWarnings("unchecked")
 static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
 long u = (j << SSHIFT) + SBASE;
 return ss == null ? null :
 (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
 }
 /*
 * 根据给定的Hash获取segment
 */
	
 @SuppressWarnings("unchecked")
 private Segment<K,V> segmentForHash(int h) {
 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
 }
 /*
 * 根据给定的segment和hash获取table entry 一条链表
 */
 @SuppressWarnings("unchecked")
 static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
 HashEntry<K,V>[] tab;
 return (seg == null || (tab = seg.table) == null) ? null :
 (HashEntry<K,V>) UNSAFE.getObjectVolatile
 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
 }

返回目录

HashEntry:内部类,用来存储key-value的数据结构

注意:value和next声明为volatile,是为了保证内存的可见性,也就是保证读取的值都是最新的值,而不会从缓存读取。 写入next域使用volatile写入是为了保证原子性。写入使用原子性操作,读取使用volatile,保证多线程访问的安全性。

 //存储key-value的数据结构,
 static final class HashEntry<K,V> {
 final int hash; //hash值
 final K key;
 volatile V value; //全局可见,用来实现containsValue方法
 volatile HashEntry<K,V> next; //全局可见,netxt节点
 //初始化一个节点
 HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
 this.hash = hash;
 this.key = key;
 this.value = value;
 this.next = next;
 }
 设置下一节点
 final void setNext(HashEntry<K,V> n) { //原子性设置下next节点
 UNSAFE.putOrderedObject(this, nextOffset, n);
 }
 static final sun.misc.Unsafe UNSAFE;
 static final long nextOffset;
 static {
 try {
 UNSAFE = sun.misc.Unsafe.getUnsafe();
 Class k = HashEntry.class;
 nextOffset = UNSAFE.objectFieldOffset
 (k.getDeclaredField("next"));
 } catch (Exception e) {
 throw new Error(e);
 }
 }
 }

返回目录

Segment:内部类,用来实行并发操作

segment为ConcurrentHashMap的专用数据结构,同时扩展了ReentrantLock,使得Segment本身就是一把重入锁,方便执行锁定。Segment内部持有一个始终处于一致状态的entry列表,使得读取状态无需加锁(通过volatile读table数组)。调整table大小期间通过复制节点实现,使旧版本的table仍然可以进行遍历。

Segment仅定义需要加锁的可变方法,针对ConcurrentHashMap中相应方法的调用都会被代理到Segment中的方法。这些可变方法使用scanAndLock和scanAndLockForPut在竞争中使用受控旋转(自旋次数受限制的自旋锁) 由于线程的阻塞与唤醒通常伴随着上下文切换,CPU抢占等,都是开销比较大的操作。使用自旋次数受限制的自旋锁,可以提高获取锁的概率,降低线程阻塞的概率,这样可极大提升性能。为什么受限自旋呢?(自旋会不断消耗CPU的时间片,无限制自旋会导致开销增加)所以自旋锁适合多核CPU下,同时线程等待所的时间非常短,若等待时间较长,应该尽早进入阻塞。

成员变量和继承关系:

 static final class Segment<K,V> 
 extends ReentrantLock //继承了重入锁
 implements Serializable {
 
 private static final long serialVersionUID = 2249069246763182397L;
		//对segment加锁时,在阻塞之前进行的最大自旋次数,
		//在多处理器上,使用有限数量的重试来维护在定位节点时获取的高速缓存
		//最多自旋64
 static final int MAX_SCAN_RETRIES =
 Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
 //每个segment的table数组,访问数组中元素通过entryAt/setEntryAt提供的
 //volatile语义来完成。
 transient volatile HashEntry<K,V>[] table;
		//元素数量,只能在锁中或其它volatile读保证可见性之间进行访问。
 transient int count;
		//当前segment中可变操作发生的次数,put,remove等,可能会溢出32位,
		//它为isEmpry()和size()方法中的稳定性检查提供了足够的准确性。
		//只能在锁中或者其它volatile读保证可见性之间进行访问。
 transient int modCount;
 //table大小超过阈值对table进行扩容
 transient int threshold; //阈值
 final float loadFactor; //负载因子
 //构造函数
 Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
 this.loadFactor = lf;
 this.threshold = threshold;
 this.table = tab;
 }

返回目录

scanAndLockForPut:

while每循环一次,都会尝试获取锁,成功则返回, retries初始值设为-1是为了遍历当前hash对应的桶的链表,找到则停止遍历,未找到则会预创建一个节点;同时,如果头节点发生变化,则会重新进行遍历,直到自旋次数大于MAX_SCAN_RETRIES,使用lock进行加锁,如果失败则会进入等待队列。

为何要遍历一次链表:

scanAndLockForPut使用自旋次数受限制的自旋锁进行优化加锁的方式,此外遍历链表也是一种优化方法,主要是尽可能使当前链表中的节点进入CPU高速缓存,提高缓存命中率,以便获取锁定后的遍历速度更快。 实际上加锁后并没有使用已经找到的节点,因为它们必须在锁定下重新获取,以确保更新的顺序一致性,但是遍历一次后可以更快的进行定位 ,这是一种预热优化方法。 scanAndLock中也使用了该优化方式。

scanAndLock内部实现方式与scanAndLockForPut更简单,scanAndLock不需要预创建节点。因此主要用于remove和replace操作。

 //自旋获取锁
 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
 HashEntry<K,V> first = entryForHash(this, hash); //根据key的hash值找到头节点
 HashEntry<K,V> e = first;
 HashEntry<K,V> node = null;
 int retries = -1; // negative while locating node
 while (!tryLock()) { //尝试获取锁,成功返回,不成功开始自旋
 HashEntry<K,V> f; // 用于后续重新检查头结点
 if (retries < 0) { //第一次自旋
 if (e == null) { //结束遍历节点
 if (node == null) //创建节点
 node = new HashEntry<K,V>(hash, key, value, null); 
 retries = 0;
 }
 else if (key.equals(e.key)) //找到节点,结束遍历
 retries = 0;
 else
 e = e.next; //下一节点 
 //如果链表有节点,那么如果没有找到,
 //那么就会一致遍历(retries=-1)的状态下
 //直到节点到达末尾
 }
 else if (++retries > MAX_SCAN_RETRIES) { //达到最大尝试次数,
 lock(); //进入加锁方法,失败则会进入排队,阻塞当前线程
 break;
 }
 else if ((retries & 1) == 0 &&
 (f = entryForHash(this, hash)) != first) {
 //头节点发生变化需要重新遍历,说明油新结点加入或者被移除
 e = first = f; // re-traverse if entry changed
 retries = -1; //自旋次数归0,重新自旋
 }
 }
 return node;
 }

返回目录

put操作:

流程:

先对Segment加锁,然后根据(tab.length-1)&hash找到对应的slot

然后根据slot遍历对应的链表,

如果key对应的entry存在(根据onlyIfAbsent)决定是否替换新值

如果key对应的entry不存在,创建新节点头插法插入

若容量超出限制,则判断是否进行rehash;

几个优化:

在 scanAndLockForPut()中:

如果锁能够很快的获取到,有限次数的自旋可防止线程进入阻塞,有助于提升性能。

在自旋期间会遍历链表,希望遍历后的链表被 cache缓存,为实际put操作过程中的链表遍历操作提供性能(预热优化:遍历一次后可以更快进行定位)

并且还会预创建节点;

HashTable<K,V>[] tab =table 好处:为什么不直接用table操作

table被声明为volatile,为了保证内存的可见性,table上的修改都必须立即更新到主内存,

volatile写实际是具有一定开销的。 由于put是中代码是加锁执行的,锁是既能保证可见性,也能保证原子性的,因此不需要在对table进行volatile写,将其赋给一个局部变量实现编译,运行时优化。

node.setNext(first)也是同样的道理,next同样是被声明为volatile,因此也是使用优化的方式UNSAFE.putOrderedObject进行volatile写入操作。

put已经加锁,为何访问tab元素不直接通过数组索引,而用entryAt(tab,index):

加锁保证了volatile同步语义,但是对table数组中元素的写入使用UNSAFE.putOrderedObject进行顺序写,该操作只是禁止写写重排序指令,不能保证写入后内存的可见性 所以必须使用使用entryAt(tab,index)提供的volatile读获取最新的数据

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 HashEntry<K,V> node = tryLock() ? null : //获得锁成功,
 scanAndLockForPut(key, hash, value); //未成功获取锁,则自旋获取锁,
 //如果超过自旋次数,则阻塞
 
 V oldValue;
 try {
 HashEntry<K,V>[] tab = table;
 int index = (tab.length - 1) & hash; // 得到桶位置
 HashEntry<K,V> first = entryAt(tab, index); 
 //volatile读语义获取index的头节点
 
 for (HashEntry<K,V> e = first;;) { //遍历链表
 if (e != null) {
 K k;
 if ((k = e.key) == key ||
 (e.hash == hash && key.equals(k))) { //找到节点
 oldValue = e.value;
 if (!onlyIfAbsent) { //是否替换旧值
 e.value = value;
 ++modCount;//被修改次数
 }
 break;
 }
 e = e.next; //不是,继续遍历
 }
 else { //e==null //没有找到key值
 if (node != null) //scanAndLockForPut只有找不到节点才会不返回null
 node.setNext(first); 
 //将node设置为头节点,此处可以看出其为头插法链表插入元素
 
 else //处理tryLock成功返回null值,没有找到节点的情况
 node = new HashEntry<K,V>(hash, key, value, first);
 int c = count + 1;
 if (c > threshold && tab.length < MAXIMUM_CAPACITY) //当前table中元素数量大于阈值,
 //需要重新进行rehash
 rehash(node); //重新hash
 else //没有大于阈值
 setEntryAt(tab, index, node);//将node头节点插入table中
 ++modCount; //被修改次数
 count = c; //元素个数
 oldValue = null; //旧址=null
 break;
 }
 }
 } finally {
 unlock();
 }
 return oldValue;
 }

返回目录

rehash操作:

rehash主要的作用是扩容,将扩容前table中的节点重新分配到新table中。由于table的capacity都是2的幂,按照2的幂扩容为原来的一倍,扩容前在slot i 中的元素,扩容后要么在slot i中或者 i+扩容前table的capacity的solt中,这样使得只需要移动原来桶中的部分元素即可将所有节点分配到新table中。

为了提高效率,rehash首先找到第一个后续所有节点在扩容后index都保持不变的结点,将这个结点加入扩容后的table的index对应的slot中,然后将节点之前的所有节点重排即可。

	 //重新进行扩容hash,这个操作是已经在put加锁的
 @SuppressWarnings("unchecked")
 private void rehash(HashEntry<K,V> node) {
 HashEntry<K,V>[] oldTable = table; //旧的table数组
 int oldCapacity = oldTable.length;
 int newCapacity = oldCapacity << 1; //二倍扩容
 threshold = (int)(newCapacity * loadFactor); //新阈值
 HashEntry<K,V>[] newTable = //新数组长度
 (HashEntry<K,V>[]) new HashEntry[newCapacity];
 int sizeMask = newCapacity - 1;
			//将旧数组中所有节点复制到新数组中,对旧数组中链表最后同index的进行复用(提高效率)
 for (int i = 0; i < oldCapacity ; i++) {//遍历旧数组
 HashEntry<K,V> e = oldTable[i];
 if (e != null) { //当前数组节点不为空
 HashEntry<K,V> next = e.next; //当前节点next节点
 int idx = e.hash & sizeMask;//新数组角标
 if (next == null) // Single node on list 
 newTable[idx] = e; //旧table数组此角标只有一个节点
 else { //当前角标链表不止一个节点
						// Reuse consecutive sequence at same slot
 HashEntry<K,V> lastRun = e; //链表头节点
 int lastIdx = idx;
 for (HashEntry<K,V> last = next; //对链表进行遍历
 //找到后续节点新index不变的节点
 last != null;
 last = last.next) {
 int k = last.hash & sizeMask; //当前节点重新hash后的角标
 if (k != lastIdx) { //当前节点k与lastIdx不同则进行替换
 //目的是找到该链表最后相同新角标的节点,这样就可以
 //最后一段链表一次性加入到新index
 lastIdx = k;
 lastRun = last;
 }
 }
 newTable[lastIdx] = lastRun; 
 // Clone remaining nodes
 //后续节点新index不变节点前的所有节点均需要重新创建分配
 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
 V v = p.value;
 int h = p.hash;
 int k = h & sizeMask;
 HashEntry<K,V> n = newTable[k]; //当前角标头节点
 newTable[k] = new HashEntry<K,V>(h, p.key, v, n); //头插法
 }
 }
 }
 }
			//找到当前要插入节点对应的角标
 int nodeIndex = node.hash & sizeMask; // add the new node
 node.setNext(newTable[nodeIndex]); //头插法插入
 newTable[nodeIndex] = node; //设置为新的头节点
 table = newTable;
 }

返回目录

scanAndLock操作:自旋并获取锁

 private void scanAndLock(Object key, int hash) {
 // similar to but simpler than scanAndLockForPut
 HashEntry<K,V> first = entryForHash(this, hash); 
			//通过volatile读获取指定坐标的链表
 HashEntry<K,V> e = first;
 int retries = -1;
 while (!tryLock()) { //自旋尝试获取锁
 HashEntry<K,V> f;
 if (retries < 0) {
 if (e == null || key.equals(e.key)) //链表为空||找到key对应节点
 retries = 0;
 else
 e = e.next; //遍历链表
 }
 else if (++retries > MAX_SCAN_RETRIES) { //大于最大自旋次数,阻塞线程
 lock();
 break;
 }
 else if ((retries & 1) == 0 && //头结点发生变化,重新自旋并且遍历链表
 (f = entryForHash(this, hash)) != first) {
 e = first = f;
 retries = -1;
 }
 }
 }

返回目录

remove操作:

final V remove(Object key, int hash, Object value) {
 if (!tryLock()) //未成功获取锁,自旋获取
 scanAndLock(key, hash);
 V oldValue = null;
 try {
 HashEntry<K,V>[] tab = table; //已经加锁,不需要volatile写
 int index = (tab.length - 1) & hash;
 HashEntry<K,V> e = entryAt(tab, index);
 //volatile读,获取index角标的链表
 HashEntry<K,V> pred = null; //前驱
 while (e != null) {
 K k;
 HashEntry<K,V> next = e.next;
 if ((k = e.key) == key || //找到要删除节点
 (e.hash == hash && key.equals(k))) {
 V v = e.value;
						//
 if (value == null || value == v || value.equals(v)) {
 if (pred == null) //要删除的是头节点
 setEntryAt(tab, index, next);
 else //非头节点
 pred.setNext(next); 
 ++modCount; //修改次数+1
 --count; //元素数量--
 oldValue = v; //旧值
 }
 break;
 }
 pred = e;
 e = next;
 }
 } finally {
 unlock();
 }
 return oldValue;
 }

返回目录

replace操作:

boolean replace(K key, int hash, V oldValue, V newValue):根据旧值替换新值,若旧值发生变化,则返回false

 final boolean replace(K key, int hash, V oldValue, V newValue) {
 if (!tryLock()) //尝试获取锁,不成功则自旋获取
 scanAndLock(key, hash);
 boolean replaced = false;
 try {
 HashEntry<K,V> e;
				//遍历给定角标链表
 for (e = entryForHash(this, hash); e != null; e = e.next) {
 K k;
 if ((k = e.key) == key || //找到key值
 (e.hash == hash && key.equals(k))) {
 //典型CAS操作
 if (oldValue.equals(e.value)) { //旧址相同则替换为新值
 e.value = newValue;
 ++modCount;
 replaced = true;
 }
 break; //说明oldvValue已经被别的线程修改
 }
 }
 } finally {
 unlock();
 }
 return replaced;
 }

final V replace(K key, int hash, V value) :直接进行替换,并返回旧值

final V replace(K key, int hash, V value) {
 if (!tryLock())
 scanAndLock(key, hash);
 V oldValue = null;
 try {
 HashEntry<K,V> e;
				//遍历链表
 for (e = entryForHash(this, hash); e != null; e = e.next) {
 K k;
					//找到直接进行替换
 if ((k = e.key) == key ||
 (e.hash == hash && key.equals(k))) {
 oldValue = e.value;
 e.value = value;
 ++modCount;
 break;
 }
 }
 } finally {
 unlock();
 }
 return oldValue;
 }

几个常用操作源码解析:

返回目录

get操作: 不需要进行加锁,只关心一个segment;非线程安全,得到的数据可能是过时数据。

public V get(Object key) {
 Segment<K,V> s; // manually integrate access methods to reduce overhead
 HashEntry<K,V>[] tab;
 int h = hash(key); //对key进行hash
 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //得到对应segment
 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
 (tab = s.table) != null) { //table数据存在
 //遍历数组
 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
 e != null; e = e.next) {
 K k;
				//找到指定key
 if ((k = e.key) == key || (e.hash == h && key.equals(k)))
 return e.value; //返回对应value值
 }
 }
 return null;
 }

返回目录

boolean isEmpty():判断集合是否为空
 public boolean isEmpty() {
 
 long sum = 0L;
 final Segment<K,V>[] segments = this.segments;
 for (int j = 0; j < segments.length; ++j) { //遍历sgment数组
 Segment<K,V> seg = segmentAt(segments, j); //得到对应table
 if (seg != null) {
 if (seg.count != 0)
 return false;
 sum += seg.modCount; //可变操作次数相加
 }
 }
		//有过修改痕迹,再次遍历
 if (sum != 0L) { // recheck unless no modifications
 for (int j = 0; j < segments.length; ++j) {
 Segment<K,V> seg = segmentAt(segments, j);
 if (seg != null) {
 if (seg.count != 0)
 return false;
 sum -= seg.modCount;
 }
 }
 if (sum != 0L) //一加一减不为0 说明这期间有被修改过,所以不为null
 return false;
 }
 return true;

返回目录

int size() 得到集合元素个数:

size和containsValue与put和get最大的区别在于,都需要遍历所有的Segment才能得到结果。

这两个源码实现都是先给三次机会。不lock所有的segment,比较相邻两次的modCount和,如果相同则说明在这之间整个集合是没有进行更新操作的。得到的size是正确的。如果三次循环之后仍然没有得到正确答案。那么就对所有的segment进行加锁。计算完毕后在进行解锁。

public int size() {
 // Try a few times to get accurate count. On failure due to
 // continuous async changes in table, resort to locking.
 final Segment<K,V>[] segments = this.segments;
 int size;
 boolean overflow; // true if size overflows 32 bits
 //为true表示size溢出32
 long sum; // sum of modCounts //modCount和
 long last = 0L; // previous sum
 int retries = -1; // first iteration isn't retry
 //第一次迭代不计入重试,所以会重试三次
 try {
 for (;;) {
				//前三次(-1,0,1) 进行不加锁统计size,如果得不到准确值则第四次加锁统计
 if (retries++ == RETRIES_BEFORE_LOCK) { //默认table大小为2
 //sgments全部加锁
 for (int j = 0; j < segments.length; ++j)
 ensureSegment(j).lock(); // force creation
 }
 sum = 0L;
 size = 0;
 overflow = false;
				//遍历segment
 for (int j = 0; j < segments.length; ++j) {
 Segment<K,V> seg = segmentAt(segments, j);
 if (seg != null) { //当前table不为空
 sum += seg.modCount;
 int c = seg.count; //当前segment元素个数
 if (c < 0 || (size += c) < 0)
 overflow = true; //size溢出
 }
 }
 if (sum == last) //两次统计的可变操作修改次数相同,说明全部加锁成功
 //并且获得准确size
 break;
 last = sum;
 }
 } finally {
 if (retries > RETRIES_BEFORE_LOCK) { 
				// 三次以后才进行加锁,因此此时需要解锁
 for (int j = 0; j < segments.length; ++j)
 segmentAt(segments, j).unlock();
 }
 }
 return overflow ? Integer.MAX_VALUE : size; //溢出返回最大值,否则返回size
 }

返回目录

boolean containsValue(Object value) /boolean contains(Object value) :集合中是否包含此值
public boolean containsValue(Object value) {
 // Same idea as size()
 if (value == null) //hashMap不允许value为null
 throw new NullPointerException();
 final Segment<K,V>[] segments = this.segments;
 boolean found = false; //是否包含
 long last = 0;
 int retries = -1;
 try {
 outer: for (;;) {
				//三次不加锁尝试寻找,如果未成功则进行加锁寻找
 if (retries++ == RETRIES_BEFORE_LOCK) {
 for (int j = 0; j < segments.length; ++j)
 ensureSegment(j).lock(); // force creation
 }
 long hashSum = 0L;
 int sum = 0;
				//遍历segments
 for (int j = 0; j < segments.length; ++j) {
 HashEntry<K,V>[] tab;
 Segment<K,V> seg = segmentAt(segments, j);
				 
 if (seg != null && (tab = seg.table) != null) {
						//遍历table
 for (int i = 0 ; i < tab.length; i++) {
 HashEntry<K,V> e;
							//遍历链表
 for (e = entryAt(tab, i); e != null; e = e.next) {
 V v = e.value;
 if (v != null && value.equals(v)) { //找到value
 found = true;
 break outer; //跳出死循环
 }
 }
 }
 sum += seg.modCount; //可变操作次数相加
 }
 }
 if (retries > 0 && sum == last) //两次操作中间没有发生可变操作次数变化
 //说明value值不存在
 break;
 last = sum;
 }
 } finally {
 if (retries > RETRIES_BEFORE_LOCK) { //大于2 需要解锁
 for (int j = 0; j < segments.length; ++j)
 segmentAt(segments, j).unlock();
 }
 }
 return found;
 }

返回目录

弱一致性:

ConcurrentHashMap是弱一致性的,它的get/containsKey/clear/iterator都是弱一致性的。

get和containsKey都是无锁操作,均通过getObjectVolatile()提供的原子读来获得Segment以及对应的链表,然后遍历链表。由于遍历期间其它线程可能对链表结构做了调整 ,所以返回的可能是过时数据。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言