同步類容器?
1,這些復合操作在多線程并發地修改容器時,可能會表現出意外的行為,最經典的便是ConcurrentModificationException,原因是當容器迭代的過程中,被并發的修改了內容,這是由于早期迭代器設計的時候并沒有考慮并發修改的問題? ?增強for循環和iterator的形式不容許遍歷的時候修改元素
- 出現java.util.ConcurrentModificationException
package com.example.core.collection;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;public class UseSyncCollection {// 出現java.util.ConcurrentModificationExceptionpublic Collection<String> m1(Vector<String> list) {for (String temp : list) {if ("3".equals(temp)) {list.remove(temp);}}return list;}public static void main(String[] args) {Vector v = new Vector<>();v.add("1");v.add("2");v.add("3");UseSyncCollection test = new UseSyncCollection();Collection<String> ret1 = test.m1(v);System.err.println(ret1.toString());}
}
- 出現java.util.ConcurrentModificationException?
package com.example.core.collection;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;public class UseSyncCollection {// 出現java.util.ConcurrentModificationExceptionpublic Collection<String> m2(Vector<String> list) {Iterator<String> iterator = list.iterator();while (iterator.hasNext()) {String temp = iterator.next();if ("3".equals(temp)) {list.remove(temp);}}return list;}public static void main(String[] args) {Vector v = new Vector<>();v.add("1");v.add("2");v.add("3");UseSyncCollection test = new UseSyncCollection();Collection<String> ret2 = test.m2(v);System.err.println(ret2.toString());}
}
- success
package com.example.core.collection;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;public class UseSyncCollection {//successful!普通for循環,單線程,先刪除,再返回public Collection<String> m3(Vector<String> list) {for (int i = 0; i < list.size(); i++) {if ("3".equals(list.get(i))) {list.remove(i);}}return list;}public static void main(String[] args) {Vector v = new Vector<>();v.add("1");v.add("2");v.add("3");UseSyncCollection test = new UseSyncCollection();Collection<String> ret3 = test.m3(v);System.err.println(ret3.toString());}
}
2,同步類容器: 如Vector、HashTable。 這些容器的同步功能其實都是有JDK的Collections.synchronized***等工廠方法去創建實現的。 其底層的機制無非就是用synchronized關鍵字對每個公用的方法都進行同步,或者使用Object mutex對象鎖的機制使得每次只能有一個線程訪問容器的狀態。?不滿足如今既要保證線程安全,又要追求高并發的目的
List<String> list = new ArrayList<>();
Collections.synchronizedCollection(list);
并發類容器的概念
- jdk5.0以后提供了多種并發類容器來替代同步類容器從而改善性能。
- 同步類容器的狀態都是串行化的。 (鎖競爭)
- 他們雖然實現了線程安全,但是嚴重降低了并發性,造成了cpu的使用率激增,在多線程環境時,嚴重降低了應用程序的吞吐量?
ConcurrentMap
- 接口下有倆個重要的實現: ConcurrentHashMap ConcurrentSkipListMap(支持并發排序功能)
- ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的HashTable,它們有自己的鎖。
- 只要多個修改操作發生在不同的段上,它們就可以并發進行。把一個整體分成了16個段(Segment),也就是最高支持16個線程的并發修改操作。
- 這也是在多線程場景時減小鎖的粒度從而降低鎖競爭的一種方案。并且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能非常好。
package com.example.core.collection;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class UseConcurrentMap {public static void main(String[] args) {ConcurrentHashMap<String, Object>map = new ConcurrentHashMap<>();map.put("k1","v1");map.put("k2","v1");map.put("k1","vv1");//如果輸入key已經存在,就會覆蓋掉原值map.putIfAbsent("k1","vvv1");//如果輸入key已經存在,不會進行任何操作for(Map.Entry<String,Object>me : map.entrySet()){System.err.println("key: "+ me.getKey()+",value:"+me.getValue());}}
}
Copy-On-Write
- Copy-On-Write簡稱COW,是一種用于程序設計中的優化策略。
- JDK里的COW容器有兩種: CopyOnWriteArrayList CopyOnWriteArraySet
- COW容器非常有用,可以在非常多的并發場景中使用到。
- CopyOnWrite容器即寫時復制的容器。 通俗的理解是當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。
- 這樣做的好處是我們可以對CopyOnWrite容器進行并發的讀,而不需要加鎖,因為當前容器不會添加任何元素。 所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器
- A線程執行寫操作,會基于原本容器的副本(黃色的)進行操作,如果C線程此時也執行寫操作,會等待A線程的鎖釋放,再進行寫入操作。B線程執行讀操作,直接在原本的容器上面操作即可。等寫入完成之后,OrderList會將指針從原本的容器(黃色的)指向容器的副本(藍色的),而原本的容器(黃色的)會被gcc刪除
- 如果原容器容量很大的話,就不要使用copy-on-write,因為要執行對于容器的復制,會占據內存很大的空間
- 如果頻繁的寫入操縱,也不適合
- 適用于讀多寫少的情況,且容器的容量也不要很大
并發Queue
- 在并發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高性能隊列,一個是以BlockingQueue接口為代表的阻塞隊列,無論哪種都繼承自Queue接口
ConcurrentLinkedQueue
- ConcurrentLinkedQueue:是一個適用于高并發場景下的隊列,通過無鎖的方式,實現了高并發狀態下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。
- 它是一個基于鏈接節點的無界線程安全隊列。 該隊列的元素遵循先進先出的原則,頭是最先加入的,尾是最近加入的,該隊列不允許null元素。
- ConcurrentLinkedQueue重要方法: add() 和 offer() 都是加入元素的方法 (在ConcurrentLinkedQueue中,這倆個方法沒有任何區別) poll() 和 peek() 都是取頭元素節點,區別在于前者會刪除元素,后者不會
BlockingQueue
- offer(anObject): 表示如果可能的話, 將anObject加到BlockingQueue里,即如果BlockingQueue可以容納, 則返回true, 否則返回false.(本方法不阻塞當前執行方法的線程) 不等待
- offer(E o, long timeout, TimeUnit unit), 可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。 設置等待超時時間
- put(anObject): 把anObject加到BlockingQueue里, 如果BlockQueue沒有空間, 則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續。堵塞等待
- poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗,設置等待時間
- ?take(): 取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入,阻塞等待
- drainTo(): 一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖
阻塞隊列的模擬
- 擁有固定長度承裝元素的容器
- 計數器統計容器的容量大小
- 當隊列里面沒有元素的時候需執行線程要等待
- 當隊列元素已滿的時候執行線程也需要等待?? ?
package com.example.core.collection;import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class MyQueue {// 創建隊列容器private final LinkedList<Object> list = new LinkedList<>();// 創建計數器countprivate final AtomicInteger count = new AtomicInteger(0);//單個服務的原子性,保證數據的一致性private final int maxSize;//最大容量的限制private final int minSize = 0;//最小容量的限制private final Object lock = new Object();//鎖public MyQueue(int maxSize){this.maxSize = maxSize;}public void put(Object obj){synchronized (lock){while(count.get() == maxSize){try{lock.wait();}catch(InterruptedException e){e.printStackTrace();}}// 添加新的元素進入容器中list.add(obj);count.getAndIncrement();//i++System.err.println("元素"+obj+"已經添加到容器中");//進行喚醒可能正在等待的take方法操作中的線程,當take來取數值時,容器為空,take進行等待,當數據放入,通知take取數lock.notify();}}public Object take(){Object temp = null;synchronized (lock){while(count.get() == minSize){try{lock.wait();}catch (InterruptedException e){e.printStackTrace();}}temp = list.removeFirst();//移除第一個元素count.getAndDecrement();//i--System.err.println("元素"+temp+"已經從容器中取走");//進行喚醒可能正在等待的put方法操作線程,當put方法往容器里面放數值,但是容器已滿,put進入等待,當take取走數據,喚醒put來存入數據lock.notify();}return temp;}public int size(){return count.get();}public List<Object> getQueueList(){return list;}
}
編寫MyQueueTest.java的測試代碼
package com.example.core.collection;public class MyQueueTest {public static void main(String[] args) throws Exception{MyQueue mq = new MyQueue(5);mq.put("a");mq.put("b");mq.put("c");mq.put("d");mq.put("e");System.out.println("當前元素的個數:"+mq.size());Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {mq.put("f");mq.put("g");}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {try{Thread.sleep(1000);Object o1 = mq.take();Thread.sleep(1000);Object o2 = mq.take();}catch(InterruptedException e){e.printStackTrace();}}},"t2");t1.start();Thread.sleep(1000);t2.start();Thread.sleep(5000);System.out.println(mq.getQueueList().toString());}
}
/*
輸出結果如下
元素a已經添加到容器中
元素b已經添加到容器中
元素c已經添加到容器中
元素d已經添加到容器中
元素e已經添加到容器中
當前元素的個數:5
元素a已經從容器中取走
元素f已經添加到容器中
元素b已經從容器中取走
元素g已經添加到容器中
[c, d, e, f, g]
*/
ArrayBlockingQueue
- ArrayBlockingQueue:基于數組的阻塞隊列實現
- 在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,其內部沒實現讀寫分離,也就意味著生產和消費不能完全并行,長度是需要定義的,可以指定先進先出或者先進后出,也叫有界隊列,在很多場合非常適合使用。
高性能無阻塞無界隊列
package com.example.core.collection;import java.util.concurrent.ConcurrentLinkedQueue;public class UseBlockingQueue {public static void main(String[] args) throws Exception{//高性能的無阻塞的無界限隊列ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();clq.offer("a");clq.add("b");clq.add("c");clq.add("d");System.out.println("從容器頭部取出元素"+clq.poll());//從頭部取出元素,并且從容器本身移除System.out.println("容器的長度"+clq.size());System.out.println("從容器的頭部取出元素"+clq.peek());//從頭部取出元素,并且不會從容器本身移除System.out.println("容器的長度"+clq.size());}
}
基于阻塞-有界隊列
package com.example.core.collection;import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;public class UseBlockingQueue {public static void main(String[] args) throws Exception{//基于阻塞的有界隊列ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<>(5);abq.put("a");abq.add("b");abq.add("c");abq.add("d");abq.add("e");System.out.println(abq.offer("f",2, TimeUnit.SECONDS));ArrayBlockingQueue<String> abq2 = new ArrayBlockingQueue<>(5);abq.drainTo(abq2,3);for(Iterator iterator = abq2.iterator();iterator.hasNext();){String string = (String)iterator.next();System.out.println("元素"+string);}}
}
LinkedBlockingQueue
- 基于鏈表的阻塞隊列 同ArrayBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成)
- LinkedBlockingQueue之所以能夠高效的處理并發數據,是因為其內部實現采用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全并行運行。他是一個無界隊列
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>();
SynchronousQueue?
- 一種沒有緩沖的隊列
- 生產者產生的數據直接會被消費者獲取并消費
- A線程一直等待B線程的輸入,B產生的數據被A消費,SynchronousQueue只是做一個中轉,不負責存儲
package com.example.core.collection;import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.*;public class UseBlockingQueue {public static void main(String[] args) throws Exception{//不能存放任何元素的 阻塞隊列SynchronousQueue<String>sq = new SynchronousQueue<>();new Thread(new Runnable() {@Overridepublic void run() {try{System.out.println("元素內容:"+sq.take());}catch (InterruptedException e){e.printStackTrace();}}},"t1").start();new Thread(new Runnable() {@Overridepublic void run() {sq.add("a");}},"t2").start();}
}
PriorityBlockingQueue
- 基于優先級的阻塞隊列
- 優先級的判斷通過構造函數傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現Comparable接口,在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖,他也是一個無界的隊列
package com.example.core.collection;import java.util.concurrent.PriorityBlockingQueue;public class UsePriorityBlockingQueue {public static void main(String[] args) throws InterruptedException{PriorityBlockingQueue<Node> pdq = new PriorityBlockingQueue<Node>();Node n3 = new Node(3,"node3");Node n4 = new Node(4,"node4");Node n2 = new Node(2,"node2");Node n1 = new Node(1,"node1");pdq.add(n4);pdq.add(n3);pdq.add(n1);pdq.add(n2);System.out.println("0 容器為:"+pdq);System.out.println("1 獲取元素:"+pdq.take().getId());System.out.println("1 容器為 :"+pdq);System.out.println("2 獲取元素:"+pdq.take().getId());System.out.println("2 容器為 :"+pdq);System.out.println("3 獲取元素:"+pdq.take().getId());System.out.println("3 容器為 :"+pdq);System.out.println("4 獲取元素為:"+pdq.take().getId());}
}
package com.example.core.collection;public class Node implements Comparable<Node>{private int id;private String name;public Node(){}public Node(int id,String name){super();this.id = id;this.name = name;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic int compareTo(Node node){return this.id > node.id ? 1 : (this.id < node.id ? -1 : 0);}public String toString(){return this.id + ":" + this.name;}
}
DelayQueue
- 帶有延遲時間的Queue
- 其中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue中的元素必須實現Delayed接口,DelayQueue是一個沒有大小限制的隊列,應用場景很多,比如對緩存超時的數據進行移除、 任務超時處理、空閑連接的關閉等等
package com.example.core.collection;import java.util.concurrent.DelayQueue;public class Wangba implements Runnable{private DelayQueue<WangMin> delayQueue = new DelayQueue<WangMin>();public boolean start = true;//表示網吧營業public void startMachine(String id,String name,int money){WangMin wm = new WangMin(id,name,System.currentTimeMillis()+money * 1000);System.out.println("網名:"+ name +",身份證: "+id+",繳費:"+money+"元,開始上網");delayQueue.add(wm);}public void overMachine(WangMin wm){System.out.println("網名:"+wm.getName()+",身份證:"+wm.getId()+",已經到了下機時間了");}@Overridepublic void run(){while (start){try{WangMin wm = delayQueue.take();overMachine(wm);}catch(InterruptedException e){e.printStackTrace();}}}public static void main(String[] args) {Wangba wangba = new Wangba();System.out.println("網吧正常營業");Thread yingye = new Thread(wangba);yingye.start();wangba.startMachine("001","張三",2);wangba.startMachine("001","李四",4);wangba.startMachine("001","王五",7);}
}
package com.example.core.collection;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class WangMin implements Delayed {private String id;private String name;private long endTime;//上網截止日期private final TimeUnit timeUnit = TimeUnit.SECONDS;@Overridepublic String toString() {return "WangMin{" +"id='" + id + '\'' +", name='" + name + '\'' +", endTime=" + endTime +'}';}public WangMin(){}public WangMin(String id,String name,long endTime){this.id = id;this.name = name;this.endTime = endTime;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public long getEndTime() {return endTime;}public void setEndTime(long endTime) {this.endTime = endTime;}public TimeUnit getTimeUnit() {return timeUnit;}//用來判斷是否到達下機時間@Overridepublic long getDelay(TimeUnit unit){return endTime - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed delayed){WangMin w = (WangMin)delayed;return this.getDelay(timeUnit) - w.getDelay(timeUnit) > 0 ? 1 : -1;}
}
?