首页 > 程序开发 > 软件开发 > 其他 >

jdk8之ConcurrentHashMap源码解析

2016-10-19

1、java util concurrent 这个包下面的 类都很经典。2、ConcurrentHashMap 这个类是java中讨论最多的,也是争论最多的类了。很多人对这个类很好奇。3、作为并发集合,大家比较关心 读写,锁,与 map的散列。

1、java.util.concurrent 这个包下面的 类都很经典。
2、ConcurrentHashMap 这个类是java中讨论最多的,也是争论最多的类了。很多人对这个类很好奇。
3、作为并发集合,大家比较关心 读写,锁,与 map的散列。
4、读写如何的锁
5、get操作下载 Java代码
publicVget(Objectkey){
Node[]tab;Nodee,p;intn,eh;Kek;
inth=spread(key.hashCode());
if((tab=table)!=null&&(n=tab.length)>0&&
(e=tabAt(tab,(n-1)&h))!=null){
if((eh=e.hash)==h){
if((ek=e.key)==key||(ek!=null&&key.equals(ek)))
returne.val;
}
elseif(eh<0)
//TreeBin操作
return(p=e.find(h,key))!=null?p.val:null;
while((e=e.next)!=null){
if(e.hash==h&&
((ek=e.key)==key||(ek!=null&&key.equals(ek))))
returne.val;
}
}
returnnull;
} 明显是没有上锁的。包括所有的读操作。都是不上锁的。
put操作(remove等修改操作)
所有读操作是这样一个模式,如果hash桶中坐标没有数据。就使用CAS 操作。如果有数据。就使用synchronized关键字。比起jdk1.7,1.6使用读写锁,代码比较简洁,同样使用cas操作比 读写锁的性呢过开销底得太多了。但是程序设计变得十分复杂,请下添加的代码下载
Java代码收藏代码
finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){
if(key==null||value==null)thrownewNullPointerException();
inthash=spread(key.hashCode());
intbinCount=0;
//为什么要这个循序,大家会很疑惑
//其实很简单,因为多线程操作,然后没有使用锁,使用unsafe,多个unsafe不是原子性的,在多线程的情况下,会出现问题。所以使用for来解决这个问题
for(Node[]tab=table;;){
//f是节点,n是数组长度,i是hash与数组长度的数组下标,fh是在数组下标已经坐在的节点的hash值
Nodef;intn,i,fh;
if(tab==null||(n=tab.length)==0)
tab=initTable();//延迟初始化table
//通过unsafe判断下标是否有节点
elseif((f=tabAt(tab,i=(n-1)&hash))==null){
//如果不存在。就创建一个节点通过unsafe加入到数组中,所以读是不加锁的
//注意:如果在多个线程同时加入hash后同一下标的node,那么只有一个会成功,其他失败。失败的就会在再次循序。
//这是循环解决的问题之一
if(casTabAt(tab,i,null,
newNode(hash,key,value,null)))
break;//nolockwhenaddingtoemptybin
}
//大家很奇怪这个if是干什么用额,去了解ForwardingNode这个对象
//这个对象在hash散列的时候用,原来的一个节点会重新散列到下个表,原来表的节点的hash就成为了moved
elseif((fh=f.hash)==MOVED)
tab=helpTransfer(tab,f);
else{
VoldVal=null;
//如果节点存在,就需要添加链了。
//添加链的时候就上锁这个节点,那么所有在这个节点的更新操作都会上锁
//这里上锁,比双桶的开销小多了。如果设计好,可以说几乎忽略不计。
synchronized(f){
if(tabAt(tab,i)==f){
if(fh>=0){
//添加链表
binCount=1;
for(Nodee=f;;++binCount){
Kek;
if(e.hash==hash&&
((ek=e.key)==key||
(ek!=null&&key.equals(ek)))){
oldVal=e.val;
if(!onlyIfAbsent)
e.val=value;
break;
}
Nodepred=e;
if((e=e.next)==null){
pred.next=newNode(hash,key,
value,null);
break;
}
}
}
//添加在tree下添加节点
elseif(finstanceofTreeBin){
Nodep;
binCount=2;
if((p=((TreeBin)f).putTreeVal(hash,key,
value))!=null){
oldVal=p.val;
if(!onlyIfAbsent)
p.val=value;
}
}
}
}
//java8,明确的改革
if(binCount!=0){
//binCount是链表的操作次数,操作多少次。表示链表有多长。当链表大于等于8的时候,链表会变成tree
if(binCount>=TREEIFY_THRESHOLD)
treeifyBin(tab,i);
if(oldVal!=null)
returnoldVal;
break;
}
}
}
addCount(1L,binCount);
returnnull;


}
这里是大家最关心的要点,重新散列,也就是重新hash下载 Java代码收藏代码
finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){
if(key==null||value==null)thrownewNullPointerException();
inthash=spread(key.hashCode());
intbinCount=0;
//为什么要这个循序,大家会很疑惑
//其实很简单,因为多线程操作,然后没有使用锁,使用unsafe,多个unsafe不是原子性的,在多线程的情况下,会出现问题。所以使用for来解决这个问题
for(Node[]tab=table;;){
//f是节点,n是数组长度,i是hash与数组长度的数组下标,fh是在数组下标已经坐在的节点的hash值
Nodef;intn,i,fh;
if(tab==null||(n=tab.length)==0)
tab=initTable();//延迟初始化table
//通过unsafe判断下标是否有节点
elseif((f=tabAt(tab,i=(n-1)&hash))==null){
//如果不存在。就创建一个节点通过unsafe加入到数组中,所以读是不加锁的
//注意:如果在多个线程同时加入hash后同一下标的node,那么只有一个会成功,其他失败。失败的就会在再次循序。
//这是循环解决的问题之一
if(casTabAt(tab,i,null,
newNode(hash,key,value,null)))
break;//nolockwhenaddingtoemptybin
}
//大家很奇怪这个if是干什么用额,去了解ForwardingNode这个对象
//这个对象在hash散列的时候用,原来的一个节点会重新散列到下个表,原来表的节点的hash就成为了moved
elseif((fh=f.hash)==MOVED)
tab=helpTransfer(tab,f);
else{
VoldVal=null;
//如果节点存在,就需要添加链了。
//添加链的时候就上锁这个节点,那么所有在这个节点的更新操作都会上锁
//这里上锁,比双桶的开销小多了。如果设计好,可以说几乎忽略不计。
synchronized(f){
if(tabAt(tab,i)==f){
if(fh>=0){
//添加链表
binCount=1;
for(Nodee=f;;++binCount){
Kek;
if(e.hash==hash&&
((ek=e.key)==key||
(ek!=null&&key.equals(ek)))){
oldVal=e.val;
if(!onlyIfAbsent)
e.val=value;
break;
}
Nodepred=e;
if((e=e.next)==null){
pred.next=newNode(hash,key,
value,null);
break;
}
}
}
//添加在tree下添加节点
elseif(finstanceofTreeBin){
Nodep;
binCount=2;
if((p=((TreeBin)f).putTreeVal(hash,key,
value))!=null){
oldVal=p.val;
if(!onlyIfAbsent)
p.val=value;
}
}
}
}
//java8,明确的改革
if(binCount!=0){
//binCount是链表的操作次数,操作多少次。表示链表有多长。当链表大于等于8的时候,链表会变成tree
if(binCount>=TREEIFY_THRESHOLD)
treeifyBin(tab,i);
if(oldVal!=null)
returnoldVal;
break;
}
}
}
addCount(1L,binCount);
returnnull;


}




privatefinalvoidaddCount(longx,intcheck){
CounterCell[]as;longb,s;
//这个判断的目的是解决unsafu的死循环问题
if((as=counterCells)!=null||
!U.compareAndSwapLong(this,BASECOUNT,b=baseCount,s=b+x)){
CounterCella;longv;intm;
booleanuncontended=true;
if(as==null||(m=as.length-1)<0||
(a=as[ThreadLocalRandom.getProbe()&m])==null||
!(uncontended=
U.compareAndSwapLong(a,CELLVALUE,v=a.value,v+x))){
fullAddCount(x,uncontended);
return;
}
if(check<=1)
return;
s=sumCount();
}
//有一个问题,只有添加操作是大于0的,那么没有hash收缩功能
if(check>=0){
Node[]tab,nt;intn,sc;
//我靠,这里又有一个循序,是解决什么问题了?
//sc也就是sizeCtl等于-1的情况只有,table初始化与序列化的时候。
//这个循环是保证序列化之后,还可以加入数据,目测是这样,不敢保证。
while(s>=(long)(sc=sizeCtl)&&(tab=table)!=null&&
(n=tab.length) intrs=resizeStamp(n);
//这个if基本可以忽略
if(sc<0){
if((sc>>>RESIZE_STAMP_SHIFT)!=rs||sc==rs+1||
sc==rs+MAX_RESIZERS||(nt=nextTable)==null||
transferIndex<=0)
break;
if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))
transfer(tab,nt);
}
//把扩容的值替换原先的值
//并发情况下,多个线程都到达这步,只有一个操作会成功,成功之后其他的线程都会进不来这个。
elseif(U.compareAndSwapInt(this,SIZECTL,sc,
(rs< transfer(tab,null);
s=sumCount();
}
}
}




privatefinalvoidtransfer(Node[]tab,Node[]nextTab){
intn=tab.length,stride;
if((stride=(NCPU>1)?(n>>>3)/NCPU:n) stride=MIN_TRANSFER_STRIDE;//subdividerange
if(nextTab==null){//initiating
try{
@SuppressWarnings("unchecked")
Node[]nt=(Node[])newNode[n<<1];
nextTab=nt;
}catch(Throwableex){//trytocopewithOOME
sizeCtl=Integer.MAX_VALUE;
return;
}
nextTable=nextTab;
transferIndex=n;
}
intnextn=nextTab.length;
//good这个对象,的设计。让散列操不会堵塞
ForwardingNodefwd=newForwardingNode(nextTab);
booleanadvance=true;//只要过了第一个while基本所有操作都会advance=true;,每次循环都要进入while那么i这个值才会改变
booleanfinishing=false;//toensuresweepbeforecommittingnextTab
for(inti=0,bound=0;;){
Nodef;intfh;
//这个while真心不怎么明白
//唯一能解释的是,防止散列完成,才知道多线程操作问题,快速知道多线程问题
while(advance){
intnextIndex,nextBound;
if(--i>=bound||finishing)
advance=false;
//这个判断只可能进来一次,
elseif((nextIndex=transferIndex)<=0){
i=-1;
advance=false;
}
//比如n等于128,stride为128>>>3/8=2,
//i=128,bound-126,那么每两次就会在进来一次,那么就会知道,(nextIndex=transferIndex)这操作就会知道多线程在操作
elseif(U.compareAndSwapInt
(this,TRANSFERINDEX,nextIndex,
nextBound=(nextIndex>stride?
nextIndex-stride:0))){
bound=nextBound;
i=nextIndex-1;
advance=false;
}
}
//
//那个操作会让i大于等于n,这个真想不到
//i+n也大于等于不了nextn啊,
//只有一个可能,在强并发下,有两个线程都进入了。进行操作了。没错
//
if(i<0||i>=n||i+n>=nextn){
intsc;
if(finishing){
nextTable=null;
table=nextTab;
sizeCtl=(n<<1)-(n>>>1);
return;
}
//先是sc-1这个操作没有错。应该addcont里面的判断是是需要减一的,没有在addcount减,放到这里,可以减少操作
if(U.compareAndSwapInt(this,SIZECTL,sc=sizeCtl,sc-1)){
//这个操作会排除最后进来之前的线程操作。
//怎么做到额,请看addCount方法的2286行
if((sc-2)!=resizeStamp(n)< return;
finishing=advance=true;
i=n;//recheckbeforecommit
}
}
elseif((f=tabAt(tab,i))==null)
advance=casTabAt(tab,i,null,fwd);
elseif((fh=f.hash)==MOVED)
advance=true;//alreadyprocessed
else{
//只有对节点操作才会上锁,性能非常
//散列这里与其他版本的不同,散列的思路很好,因为每次扩展都是2倍,那么扩张之后的hash,在进行一次移位,等于1的就在原有的地方加上扩展之前的系数(比如从16扩展到32,那么hash,下标1的桶,这个桶会分裂成2个桶,一个还在1下表,一个在16+1下标。
synchronized(f){
if(tabAt(tab,i)==f){
Nodeln,hn;
if(fh>=0){
intrunBit=fh&n;
NodelastRun=f;
//寻分裂出来的lastRun
for(Nodep=f.next;p!=null;p=p.next){
intb=p.hash&n;
if(b!=runBit){
runBit=b;
lastRun=p;
}
}
if(runBit==0){
ln=lastRun;
hn=null;
}
else{
hn=lastRun;
ln=null;
}
//把一个链表,分裂成两个列表
for(Nodep=f;p!=lastRun;p=p.next){
intph=p.hash;Kpk=p.key;Vpv=p.val;
if((ph&n)==0)
ln=newNode(ph,pk,pv,ln);
else
hn=newNode(ph,pk,pv,hn);
}
//把两个链表,加入到下个tab
setTabAt(nextTab,i,ln);
setTabAt(nextTab,i+n,hn);
//把分裂的桶,替换成fwd
setTabAt(tab,i,fwd);
advance=true;
}
elseif(finstanceofTreeBin){
TreeBint=(TreeBin)f;
TreeNodelo=null,loTail=null;
TreeNodehi=null,hiTail=null;
intlc=0,hc=0;
for(Nodee=t.first;e!=null;e=e.next){
inth=e.hash;
TreeNodep=newTreeNode
(h,e.key,e.val,null,null);
if((h&n)==0){
if((p.prev=loTail)==null)
lo=p;
else
loTail.next=p;
loTail=p;
++lc;
}
else{
if((p.prev=hiTail)==null)
hi=p;
else
hiTail.next=p;
hiTail=p;
++hc;
}
}
ln=(lc<=UNTREEIFY_THRESHOLD)?untreeify(lo):
(hc!=0)?newTreeBin(lo):t;
hn=(hc<=UNTREEIFY_THRESHOLD)?untreeify(hi):
(lc!=0)?newTreeBin(hi):t;
setTabAt(nextTab,i,ln);
setTabAt(nextTab,i+n,hn);
setTabAt(tab,i,fwd);
advance=true;
}
}
}
}
}
}
看过 jdk8文档的伙计都知道,jdk8,不再是双桶。而进入了 二叉树结构。请看上面的添加操作,与散列操作就知道。TreeBin对象与TreeNode对象就清除了。至于性能怎么样,可以用jdk8文档的标准,性能刚刚的。
使用ForwardingNode对象,保证在散列的时候读写操作是在nextbat里面。非常优秀的设计。
使用Unsafe 对象在性能上带来疯狂的性能提升,但是也给程序设计带来了,超大的复杂性。
相关文章
最新文章
热点推荐