14 多線程
操作系統的多任務(multitasking):在同一時刻運行多個程序的能力。
多線程在較低的層次上擴展了多任務的概念:一個程序同時執行多個任務。
通常,每一個任務稱為一個線程(tread),它是線程控制的簡稱。
可以同時運行一個以上線程的程序稱為多線程程序(multithreaded)。
?
多進程與多線程的區別:
線程擁有自己的一整套變量;線程只是共享數據。
共享變量使進程之間的通信比進程之間的通信更有效、更容易。
與進程相比較,線程更輕量級,創建、撤銷一個線程比啟動新進程的開銷要小的多。
?
實際應用中,多線程非常有用。
例如,一個瀏覽器可以同時下載幾幅圖片、一個web服務器需要同時處理幾個并發請求、GUI用一個獨立的線程來收集用戶界面事件。
?
多線程可能相當復雜。本章涵蓋了應用程序可能需要的全部工具。
?
14.1 什么是線程
Thread類的靜態sleep方法將暫停給定的毫秒數。
?
在一個單獨的線程中執行一個任務的簡單過程:
1、將任務代碼移到實現了Runnable接口的類的run方法中;
public interface Runnable
{void run();
}
class MyRunnable implements Runnable
{public void run(){ task code }
}
2、創建一個類對象;
Runnable r = new MyRunnable();
3、由Runnable創建一個Thread對象;
Thread t = new Thread(r);
4、啟動線程;
t.start();
?
?
14.2 中斷線程
當線程的run方法執行方法體中的最后一條語句后,并經由執行return語句返回時,或者出現了在方法中沒有捕獲的異常時,線程將終止。
?
interrupt方法將中斷狀態置位。
每個線程都應該不時的檢查這個標志,以判斷線程是否被中斷。
首先調用靜態的Thread.currentThread方法獲得當前線程,然后調用isInterrupted方法:
Thread.currentThread().isInterrupted()
?
如果線程被阻塞,就無法檢測中斷狀態。
當在一個被阻塞的線程(調用sleep或wait)上調用interrupt方法時,阻塞調用將會被InterruptException異常中斷。
存在不能被中斷的阻塞I/O調用,應該考慮選擇可中斷的調用。
?
沒有任何語言方面的需求要求一個被中斷的線程應該終止。中斷一個線程不過是引起它的注意。被中斷的線程可以決定如何響應中斷。
?
如果在中斷狀態被置位時調用sleep方法,它不會休眠。其清除這一狀態并拋出InterruptedException。
?
靜態的interrupted方法檢測當前線程是否被中斷,并且會清除中斷狀態。
isInterrupted方法是一個實例方法,來檢測是否被中斷,不改變中斷狀態。
void mySubTask()
{try{ sleep(delay); }catch (InterruptedException e) { } //DON’T IGNORE!
}
void mySubTask() throws InterruptedException
{sleep(delay);
}
void mySubTask()
{try{ sleep(delay); }catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
14.3 線程狀態
6種:
New ?
Runnable
Blocked
Waiting
Timed waiting
Terminated
新創建進程:
new Thread(r);
?
可運行狀態:
一旦調用start方法,線程處于runnable狀態。
可運行的線程可能正在運行,也可能沒有運行。
Java的規范說明沒有將它作為一個單獨狀態。
一個正在運行中的線程仍然處于可運行狀態。
?
運行中的線程被中斷,目的是為了讓其他線程獲得運行機會。
搶占式調度系統給每一個可運行線程一個時間片來執行任務。
當時間片用完,操作系統剝奪該線程的運行權,并給另一個線程運行機會。
當選擇下一個線程時,操作系統考慮線程的優先級。
?
現在所有的桌面以及服務器操作系統都使用搶占式調度。
像手機這樣的小型設備可能使用協作式調度。在這樣的設備中,一個線程只有在調用yield方法、或者被阻塞或等待時,線程才會失去控制權。
?
在具有多個處理器的機器上,每一個處理器運行一個線程,可以有多個線程并行運行。
如果線程的數目多于處理器的數目,調度器依然采用時間片機制。
?
在任何給定時刻,一個可運行的線程可能正在運行也可能沒有運行。
?
被阻塞進程和等待進程:
當線程處于被阻塞或等待狀態時,它暫時不活動。
?
被終止的線程:
·run方法正常退出;
·沒有捕獲的異常終止了run方法。
?
14.4 線程屬性
線程優先級 守護線程線程組 處理未捕獲異常的處理器
?
線程優先級:
默認情況下,一個線程繼承它的父線程的優先級。
setPriority方法
1-10之間
MIN_PRIORITY 1
NORM_PRIORITY 5
MAX_PRIORITY 10
?
線程優先級是高度依賴于系統的。
當虛擬機依賴于宿主機平臺的線程實現機制時,Java線程的優先級被映射到宿主機平臺的優先級上,優先級個數也許更多,也許更少。
例如,Windows中有7個優先級別。一些Java優先級將映射到相同的操作系統優先級。
在Sun為Linux提供的Java虛擬機,線程的優先級被忽略——所有線程具有相同的優先級。
不要將程序功能的正確性依賴于優先級。
?
守護線程:
t.setDaemon(true);
這個方法必須在線程啟動之前調用。
將線程轉換為守護線程(daemon thread)。
守護線程的唯一用途:為其他線程服務。如計時線程。
當只剩下守護線程時,虛擬機就退出了。
?
?
未捕獲異常處理器:
線程的run方法不能拋出任何被檢測的異常,但是,不被檢測的異常會導致線程終止。
?
在線程死亡之前,異常被傳遞到一個用于未捕獲異常的處理器。
該處理器必須屬于一個實現Thread.UncaughtExceptionHandler接口的類。
這個接口只有一個方法:void uncaughtException(Thread t, Throwable e)
?
可以用setUncaughtExceptionHandler方法為任何線程安裝一個處理器。
也可以用Thread類的靜態方法setDefaultUncaughtException為所有線程安裝一個默認處理器。
替換處理器可以使用日志API發送未捕獲的報告到日志文件。
?
如果不安裝默認的處理器,默認的處理器為空。
但是,如果不為獨立的線程安裝處理器,此時的處理器就是該線程的ThreadGroup對象。
?
線程組是一個可以統一管理的線程集合。
默認情況下,創建的所有線程屬于相同的線程組。
但是,也可能會建立其他的組。
建議不要在自己的程序中使用線程組。
?
ThreadGroup類實現了Thread.UncaughtExceptionHandler接口。
它的uncaughtException方法如下:
1、如果該線程組有父線程組,那么父線程組的uncaughtException方法被調用;
2、否則,如果Thread.getDefaultExceptionHandler方法返回一個非空的處理器,則調用該處理器;
3、否則,如果Throwable是ThrowDeath的一個實例,什么都不做;
4、否則,線程的名字以及Throwable的棧蹤跡被輸出到System.err。
?
14.5 同步
多個線程對同一數據存取。這樣一個情況通常稱為競爭條件(race condition)。
?
鎖對象:
有兩種機制防止代碼塊受并發訪問的干擾。
synchronized關鍵字
ReentrantLock類
?
public class Bank
{private Lock bankLock = new ReentrantLock(); //ReentrantLock implements the Lock interfacepublic void transfer( int from, int to, int amount){bankLock.lock()try{accounts[from] -= amount;accounts[to] += amount;}finally{bankLock.unlock();}}
}
?
鎖是可重入的,線程可以重復地獲得已經持有的鎖。
鎖保持一個持有計數(hold count)來跟蹤對lock方法的嵌套調用。
線程在每一次調用lock都要調用unlock來釋放鎖。
被一個鎖保護的代碼可以調用另一個使用相同的鎖的方法,此時鎖計數加1。
?
條件對象:
通常被稱為條件變量(conditional variable)。
通常,線程進入臨界區,卻發現在滿足一定條件之后才能執行。
要使用一個條件對象來管理那些已經獲得了一個鎖但是卻不能做有用工作的線程。
?
一個鎖對象可以有一個或多個相關的條件對象。
可以用newCondition方法獲得一個條件對象。習慣上給條件對象命名為可以反映它所表達的條件的名字。
class Bank
{private Condition sufficientFunds;public Bank(){sufficientFunds = banLock.newCondition();}
}
如果不符合條件,調用sufficientFunds.await()方法。
當前線程現在被阻塞了,并放棄了鎖。
?
一旦一個線程調用await方法,它進入該條件的等待集。
直到另一個線程調用同一條件上的signalAll方法時,喚醒等待集的進程。
sufficientFunds.signalAll();//這一調用重新激活因為這一條件而等待的所有線程。
從await調用返回,獲得鎖并從被阻塞的地方繼續執行。
此時,線程應該再次測試該條件。
?
通常,對await的調用應該在如下形式的循環體中
while (!(ok to proceed))
condition.await();
?
至關重要的是最終需要某個其它線程調用signalAll方法。
另一個方法signal,隨機解除等待集中某個線程的阻塞狀態。
?
小結:
·鎖用來保護代碼片段,任何時刻只能有一個線程執行被保護的代碼;
·鎖可以管理試圖進入被保護代碼段的線程;
·鎖可以擁有一個或多個相關的條件對象;
·每個條件對象管理那些已經進入被保護的代碼段但還不能運行的線程。
?
synchronized關鍵字;
Lock和Condition接口為程序設計人員提供了高度的鎖定控制。
然而,在大多數情況下,并不需要那樣的控制,并且可以使用一種嵌入到Java語言內部的機制。
Java中的每一個對象都有一個內部鎖。
如果一個方法用synchronized關鍵字聲明,那么對象鎖將保護整個方法。也就是說,要調用該方法,線程必須獲得內部的對象鎖。
public synchronized void method()
{method body
}
?
等價于
public void method()
{this.intrinsicLock.lock();try{method body}finally { this.intrinsicLock.unlock(); }
}
?
內部對象鎖只有一個相關條件。
wait方法添加一個線程到等待集中,notifyAll/notify方法解除等待線程的阻塞狀態。
換句話說,調用wait或notifyAll等價于
intrinsicCondition.await();
intrinsicCondition.signalAll();
?
wait、notifyAll以及notify方法是Object類中的final方法。
Condition方法必須被命名為await、signalAll和signal以便它們不會發生沖突。
?
class Bank
{private double[] accounts;public synchronized void transfer(int from, int to, int amount) throws InterruptedException{while(accounts[from] < amount)wait();accounts[from] -= amount;accounts[to] += amount;notifyAll();}
}
每一個對象有一個內部鎖,并且該鎖有一個內部條件。
由鎖來管理那些試圖進入synchronized方法的線程,由條件來管理那些調用wait的線程。
?
將靜態方法聲明為synchronized也是合法的。如果調用這種方法,該方法獲得相關類對象的內部鎖。
如果Bank類有一個靜態同步方法,那么當該方法被調用時,Bank.class對象的鎖被鎖住。沒有其他線程可以調用同一個類的這個或其他的同步靜態方法。
?
內部鎖和條件的局限:
·不能中斷一個正在試圖獲得鎖的線程;
·試圖獲得鎖時不能設定超時;
·每個鎖僅有單一的條件,可能是不夠的;
?
在代碼中該使用哪種鎖?Lock和Condition對象還是同步方法?
·最好既不使用Lock/Condition也不使用synchronized關鍵字。在很多情況下可以使用java.util.concurrent包中的一種機制,它會處理所有的加鎖;
·如果synchronized關鍵字適合程序,那么盡量使用它,這樣可以減少編寫的代碼數量,減少出錯幾率;
·如果特別需要Lock/Condition結構提供的獨有特性時,才使用Lock/Condition。
?
同步阻塞:
線程可以通過調用同步方法獲得鎖。
還有另一種機制可以獲得鎖,通過進入一個同步阻塞。
當線程進入如下形式的阻塞:
synchronized (obj) //this is the syntax for a synchronized block
{critical section
}
于是它獲得obj的鎖。
?
public class Bank
{private double[] accounts;private Object lock = new Object();public void transfer( int from, into to, int amount){synchronized (lock) // an ad-hoc lock{accounts[from] -= amount;accounts[to] += amount;}}
}
有時程序員使用一個對象的鎖來實現額外的原子操作,實際上稱為客戶端鎖定(client-side locking)。
public void transfer(Vector<Double> accounts, int from, int to, int amount)
{synchronized (accounts){accounts.set(from, accounts.get(from) - amount);accounts.set(to, accounts.get(to) + amount);}
}
這個方法完全依賴于這樣一個事實,Vector類對自己的所有可修改方法都使用內部鎖。
?
監視器概念:
鎖和條件是線程同步的強大工具,但是,嚴格地講,它們不是面向對象的。
多年來,研究人員努力尋找一種方法,可以在不需要程序員考慮如何加鎖的情況下,就可以保證多線程的安全性。
最成功的的解決方案之一是監視器(monitor)。
?
監視器具有如下特性:
·監視器是只包含私有域的類;
·每個監視器類的對象有一個相關的鎖;
·使用該鎖對所有的方法進行加鎖;
·該鎖可以有任意多個相關條件。
?
每一個條件變量管理一個獨立的線程集。
?
Volatile域:
同步格言:如果向一個變量寫入值,而這個變量接下來可能會被另一個線程讀取,或者,從一個變量讀值,而這個變量可能是之前被另一個線程寫入的,此時必須使用同步。
volatile關鍵字為實例域的同步訪問提供了一種免鎖機制。
如果聲明一個域為volatile,那么編譯器和虛擬機就該知道該域是可能被另一個線程并發更新的。
?
原子性:
java.util.concurrent.atomic包中有很多類使用了高效的機器級指令(而不是使用鎖)來保證其他操作的原子性。
如,AtomicInteger類提供了方法incrementAndGet和decrementAndGet,它們分別以原子的方式將一個整數自增或自減。可安全的使用AtomicInteger作為共享計數器而無需同步。
還有AtomicBoolean、AtomicLong、AtomicReference以及Boolean值、整數、long值和引用的原子數組。
應用程序員不應該使用這些類,僅供開發并發工具的系統程序員使用。
?
死鎖:
deadlock
?
線程局部變量:
使用ThreadLocal輔助類可為各個線程提供各自的實例。
public static final ThreadLocal<SimpleDateFormat> dateFormat =
new ThreadLocal<SimpleDateFormat>()
{protected SimpleDateFormat initialValue(){return new SimpleDateFormat(“yyyy-MM-dd”);}
}
要訪問具體的格式化方法,可以調用String dateStamp = dateFormat.get().format(new Date());
在一個線程中首次調用get時,會調用initialValue方法,此后,get方法會返回屬于當前線程的那個實例。
在多線程中生成隨機數也存在類似問題。
java.util.Random類是線程安全的。
但是如果多個線程需要等待一個共享的隨機數生成器,會很低效。
可以使用ThreadLocal輔助類為各個線程提供一個單獨的生成器。
不過JavaSE7還提供了一個便利類。只需做以下調用:
int random = ThreadLocalRandom.current().nextInt(upperBound);
ThreadLocalRandom.current() 調用會返回特定于當前線程的Random實例。
?
鎖測試與超時:
線程在調用lock方法來獲得另一個線程所持有的鎖的時候,很可能發生阻塞。
應該更加謹慎的申請鎖。
tryLock方法試圖申請一個鎖,在成功獲得鎖后返回true,否則,立即返回false,而且線程可以立即離開去做其他事情。
if (myLock.tryLock())//now the thread owns the locktry { ... }finally { myLock.unlock()}
else//do something else
可以調用tryLock時,使用超時參數:if (myLock.tryLock(100, TimeUnit.MILLISECONDS)) ...
TimeUnit是一個枚舉類型,取值包括SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。
?
lock方法不能被中斷。如果一個線程在等待獲得一個鎖時被中斷,中斷線程在獲得鎖之前一直處于阻塞狀態。如果出現死鎖,那么,lock方法就無法終止。
然而,如果調用帶有超時參數的tryLock,那么如果線程在等待期間被中斷,將拋出InterruptedException。這是一個非常有用的特性,因為允許程序打破死鎖。
?
也可調用lockInterruptibly方法。相當于一個超時設為無限的tryLock方法。
?
在等待一個條件時,也可以提供一個超時:
myCondition.await(100, TimeUnit.MILLISECONDS)
如果一個線程被另一個線程通過調用signalAll或signal激活,或者超時時限已達到,或者線程被中斷,那么await方法將返回。
如果等待的線程被中斷,await方法將拋出一個InterruptedException。
當希望線程中斷后繼續等待,可使用awaitUninterruptibly方法代替await。
?
讀寫鎖:
java.util.concurrent.locks包定義了兩個鎖類:
ReentrantLock類和ReentrantReadWriteLock類
?
很多線程從一個數據結構讀取數據,很少線程修改其中的數據用ReentrantReadWriteLock。
寫線程互斥訪問。
1、構造一個ReentrantReadWriteLock對象;
private ReentrantReadWriteLocck rwl = new ReentrantReadWriteLock();
2、抽取讀鎖和寫鎖;
private Lock readLock = rwl.readLock();
private Lock writeLock = rwl.writeLock();
3、對所有的獲取方法加讀鎖;
public double getTotalBalance()
{readLock.lock()try{ ... }finally { readLock.lock(); }
}
4、對所有的修改方法加寫鎖;
public void transfer( ... )
{writeLock.lock();try { ... }finally { writeLock.unlock(); }
}
14.6 阻塞隊列
前面介紹了Java并發程序設計的底層構建塊。
對于實際編程來說,應該盡可能遠離底層結構。使用由并發處理的專業人士實現的較高層次的結構要方便的多、安全的多。
?
不需要顯式的線程同步,使用隊列作為一種同步機制。
?
對于許多線程問題,可以通過使用一個或多個隊列以優雅且安全的方式將其形式化。
生產線者程向隊列插入元素,消費者線程則取出它們。
使用隊列,可以安全的從一個線程向另一個線程傳遞數據。
?
當試圖向滿隊列添加元素,或從空隊列移出元素時,阻塞隊列(blocking queue)導致線程阻塞。
在協調多個線程之間的合作時,阻塞隊列是一個有用的工具。
?
阻塞隊列方法分3類:
put | 添加元素 | 滿,阻塞 |
take | 移出并返回頭元素 | 空,阻塞 |
add | 添加元素 | 滿,拋出IllegalStateException |
element | 返回隊列頭元素 | 空,拋出NoSuchElementException |
remove | 移出并返回 | 空,拋出NoSuchElementException |
offer | 添加元素并返回true | 滿,返回false |
peek | 返回隊列頭元素 | 空,返回null |
poll | 移出并返回隊列頭元素 | 空,返回null |
?
還有帶有超時的offer方法和poll方法的變體。
boolean sucess = q.offer(x, 100, TimeUnit.MILLISECONDS);
//在100毫秒尾插,成功返回ture;失敗返回false。
Object head = q.poll(100, TimeUnit.MILLISECONDS)
//嘗試100毫秒內移出隊列的頭元素,成功返回頭元素,失敗返回null。
?
java.util.concurrent包提供了阻塞隊列的幾個變種:
默認情況下,LinkedBlockingQueue的容量是沒有上邊界的,但是也可以選擇指定最大容量。
LinkedBlockingDeque是一個雙端的版本。
ArrayBlockingQueue在構造時需要指定容量,并且有一個可選的參數來指定是否需要公平性。若設置了公平參數,則等待時間最長的線程會優先得到處理。通常,公平性會降低性能,只有在確實非常需要時才使用它。
PriorityBlockingQueue是一個帶優先級的隊列。元素按優先級被移出,沒有容量上限。
DelayQueue包含實現Delayed接口的對象:
interface Delayed extends Comparable<Delayed>
{
? ? long getDelay(TimeUnit unit);
}
getDelay方法返回對象的殘留延遲。負值表示已經結束。
元素只有在延遲用完的情況下才能從DelayQueue移出。
還必須實現compareTo方法。DelayQueue使用該方法對元素進行排序。
?
JavaSE7增加了一個TransferQueue接口,允許生產者線程等待,直到消費者準備就緒可以接受一個元素。如果生產者調用q.transfer(item);這個調用會阻塞,直到另一個線程將元素(item)刪除。
LinkedTransferQueue類實現了這個接口。
?
14.7 線程安全集合
如果多線程要并發地修改一個數據結構,如散列表,那么很容易會破壞這個數據結構。
可以通過提供鎖來保護共享數據結構,但是選擇線程安全的實現作為替代可能更容易。
?
高效的映射表、集合、隊列:
java.util.concurrent包提供了ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue。
size方法不在常量時間內操作,需遍歷。
?
并發的散列映射表,可以高效的支持大量地讀者和一定數量的寫者。
默認16個寫者,超過16個則阻塞。
?
ConcurrentHashMap、ConcurrentSkipListMap有相應的方法用于插入和刪除:
cache.putIfAbsent(key, value);
cache.remove(key, value);
cache.replace(key, oldValue, newValue);
?
寫數組的拷貝:
CopyOnWriteArrayList和CopyOnWriteArraySet是線程安全集合。
所有修改線程對底層數組進行復制。
如果在集合上進行迭代的線程數超過修改線程數,這樣的安排是很有用的。
當構建一個迭代器時,它包含一個對當前數組的引用。
如果數組后來被修改了,迭代器仍然引用舊數組,但是,集合的數組已經被替換了。
舊的迭代器擁有一致的(可能過時的)視圖,訪問它無需任何同步開銷。
?
較早的線程安全集合:
Vector和Hashtable類提供了線程安全的動態數組和散列表。
現在被棄用。
代之的是ArrayList和HashMap類,但不是線程安全的。
可以使用同步包裝器(synchronization wrapper)變成線程安全的:
List<E> synchArrayList = Collections.synchronizedList( new ArrayList<E>());
Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>());
?
最好使用java.util.concurrent包中定義的集合,不使用同步包裝器中的。
有一個例外是經常被修改的數組列表,咋那種情況下,同步的ArrayList可以勝過CopyOnWriteArrayList
?
?
14.8 Callable與Future
Runnable封裝一個異步運行的任務,無返回值和參數;
?
Callable與Runnable類似,但是有返回值;
public interface Callable<T>
{
? ? T call() throws Exception;
}
?
Future保存異步計算的結果,可以啟動一個計算,將Future對象交給某個線程,然后忘掉它。
Future對象的所有者在結果計算好之后就可以獲得它。
public interface Future<V>
{
? ? V get() throws ...;
? ? V get(long timeout, TimeUnit unit) throws ...;
? ? void cancel( boolean mayInterrupt);
? ? boolean isCancelled();
? ? boolean isDone();
}
第一個get方法的調用被阻塞,直到計算完成;
如果在計算完成之前,第二個方法的調用超時,拋TimeoutException。
如果運行該計算的線程被中斷,兩個get方法都拋InterruptedException。
如果計算完成,那么get方法立即返回。
如果計算還在進行,isDone方法返回false;如果完成了,返回ture;
可以用cancel方法取消該計算。如果計算未開始,它被取消且不再開始。如果計算處于運行中,那么如果mayInterrupt參數為true,它就被中斷。
?
FutureTask包裝器是一種非常便利的機制,可將Callable裝換成Future和Runnable,它同時實現二者的接口。
Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task); //it’s a Runnable
t.start();
...
Integer result = task.get(); //it’s a Future
?
14.9 執行器
構建一個新的線程有一定代價,因為涉及與操作系統的交互。
如果程序創建了大量的生命周期很短的線程,應該使用線程池(thread pool)。
一個線程池中包含許多準備運行的空閑線程。
將Runnable對象交給線程池,就會有一個線程調用run方法。
當run方法退出時,線程不會死亡,而是在池中準備為下一個請求提供服務。
?
另一個使用線程池的理由是減少并發線程的數目。
?
執行器(Executor)類有許多靜態工廠方法來構建線程池:
newCachedThreadPool ?必要時創建新線程;空閑線程會被保留60秒
newFixedThreadPool 包含固定數量的線程
newSingleThreadExecutor ?只有一個線程,該線程順序執行每一個提交的任務
newScheduledThreadPool 用于預定執行而構建的固定線程池,替代java.util.Timer
newSingleThreadScheduledExecutor 用于預定執行而構建的單線程池
?
前三個方法返回實現了ExecutorService接口的ThreadPoolExcecutor類的對象。
?
將一個Runnable對象或Callable對象提交給ExecutorService:
Future<?> submit(Runnable task)
Future<T> submit(Runnable task, T result)
Future<T> submit(Callable<T> task)
?
?
該線程池調用submit時,會得到一個Future對象,可以用來查詢該任務的狀態。
第一個submit方法,get方法在完成的時候只簡單的返回null;
第二個submit方法,get方法在完成的時候返回指定的result對象;
第三個submit方法,同Future用法。
?
當完成一個線程池的時候,調用shutdown。線程池不再接受新任務,所有任務完成后,線程池中的線程死亡。
另一種方法時調用shutdownNow,取消尚未開始的所有任務并中斷正在運行的線程。
?
線程池該做的事:
1、調用Executors類中的靜態的方法newCachedThreadPool或newFixedThreadPool;
2、調用submit提交Runnable或Callable對象;
3、如果想要取消一個任務,或如果提交Callable對象,那就要保存好返回的Future對象;
4、當不再提交任務時,調用shutdown。
?
預定執行:
ScheduledExecutorService接口具有為預定執行(Scheduled Execution)或重復執行任務而設計的方法。它是一種允許使用線程池機制的java.util.Timer的泛化。
Executors類的newScheduledThreadPool和newSingleThreadScheduleExecutor方法將返回實現了ScheduledExecutorService接口的對象。
可以預定Runnable或Callable在初始的延遲之后只運行一次
也可以預定一個Runnable對象周期性地運行。
?
?
控制任務組:
有時,使用執行器的原因是,控制一組相關任務。
例如,可以在執行器中使用shutdownNow方法取消所有的任務。
invokeAny方法提交一個Callable對象集合中的所有對象,并返回一個Future對象的列表,代表所有任務的解決方案。無法知道返回的究竟是那個任務的結果,也許是最先完成的那個任務的結果。對于搜索問題,如果你愿意接受任何一種解決方案的話,就可以使用這個方法。
例如需要一個大整數進行因數分解計算來解碼RSA密碼。可以提交很多任務,每一個任務使用不同范圍內的數來進行分解。只要其中一個任務得到了答案,計算就可以停止了。
?
List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for (Future<T> result : results)
? ? processFuther( result.get());
?
可以將結果按可獲得的順序保存起來。
一個更有效的組織如下:
ExecutorCompletionService service = new ExecutorCompletionService(executor);
for (Callable<T> task : tasks )
service.submit(task);
for ( int i = 0; i < tasks.size(); ++i)
? ? ?processFurther( service.take().get() );
?
Fork-Join框架:
對于一些應用,可能對每個處理器內核分別使用一個線程,來完成計算密集型任務,如圖像或視頻處理。javase7引入了fork-join框架,來支持這種應用。
?
假設有一個處理任務,可以自然地分解為子任務:
if (problemSize < threshold)
? ? ?solve problem directly
else
{
? ? break down into subproblems
? ? recursively solve each subproblem
? ? combine the results
}
?
如果計算會生成一個類型為T的結果,需擴展RecursiveTask<T>類;
如果不產生任何結果,需擴展RecursiveAction類。
再覆蓋compute方法調用子任務,然后合并其結果。
class Counter extends RecursiveTask<Integer>
{
...protected Integer compute(){if (to - from < THRESHOLD)solve problem directly;else{int mid = (from + to) / 2;Counter first = new Counter(values, from, mid, filter);Counter second = new Counter(values, mid, to, filter);invokeAll(first, second);return first.join() + second.join();}}
}
在這里,invokeAll方法接收到很多任務并阻塞,直到所有這些任務都已經完成。
join方法將生成結果。
?
在后臺,fork-join框架使用了工作密取(work stealing)方法來平衡可用線程的工作負載。
?
14.10 同步器
java.util.concurrent包包含了幾個管理線程集的類。
這些機制具有為線程之間的共用集結點模式(common rendezvous patterns)提供的預置功能(canned functionality)。
如果有一個相互合作的線程集滿足這些行為模式之一,那么應該直接重用合適的庫類,而不要試圖提供手工的鎖與條件的集合。
類 | 功能 | 何時使用 |
CyclicBarrier | 允許線程集等待,直至其中預定數目的線程到達一個公共障柵(barrier),然后可以選擇執行一個處理障柵的動作 | 當大量線程需要在它們的結果可用之前完成時。 |
CountDownLatch | 允許線程集等待直到計數器減為0 | 當一個或多個線程需要等待直到指定數目的事件發生 |
Exchanger | 允許兩個線程在要交換的對象準備好時交換對象 | 當兩個線程工作在同一數據結構的兩個實例上的時候,一個向實例添加數據而另一個從實例清除數據。 |
Semaphore | 允許線程集等待直到被允許繼續運行為止 | 限制訪問資源的線程總數。如果許可數是1,常常阻塞線程直到另一個線程給出許可為止 |
SynchronousQueue | 允許一個線程把對象交給另一個線程 | 在沒有顯式同步的情況下,當兩個線程準備好將一個對象從一個線程傳遞到另一個時 |
?
?
CyclicBarrier類:
實現了一個集結點(rendezvous)稱為障柵(barrier)。
考慮大量線程運行在一次計算的不同部分的情形。
當所有部分都準備好時,需要把結果組合在一起。
當一個線程完成了它的那部分任務后,則讓他運行到障柵處。
一旦所有的線程都達到了這個障柵,障柵就撤銷,線程就可以繼續運行。
?
CyclicBarrier barrier = new CyclicBarrier(nthreads);
每一個線程做一些工作,完成后在障柵上調用await:
public void run()
{
? ? doWork();
? ? barrier.await();
}
?
await方法有一個可選的超時參數:
barrier.await(100, TimeUnit.MILLISECONDS);
如果任何一個在障柵上等待的線程離開了障柵,那么障柵就被破壞了(線程可能離開時因為它調用await時設置了超時,或者因為它被中斷了)。這種情況下,所有其他線程的await方法拋出BrokenBarrierException。那些已經在等待的線程立即終止await調用。
?
可以提供一個可選的障柵動作(barrier action),當所有線程到達障柵的時候就會執行這一動作。
Runnable barrierAction = ...
CycliBarrier barrier = new CyclicBarrier(nthreads, barrierAction);
這個動作可以收集那些單個線程的運行結果。
?
障柵是循環的,可以在所有等待線程釋放后被重用。?
CountDownLatch 倒計時門栓
讓一個線程集等待直到計數器變為0。
一次性,一旦為0就不能再重用了。
?
一個有用的特例是計數值為1的門栓。假定A線程需要數據,被啟動且在門外等候;B線程準備數據,當數據準備好時,調用countDown,A線程就可繼續運行了。
?
Exchanger 交換器
一個線程向緩沖區填入數據,另一個線程消耗這些數據。
當它們都完成以后,相互交換緩沖區。
?
Semaphore 信號量
acquire請求許可 ???release釋放許可