FutureTask是一種可以取消的異步的計算任務。它的計算是通過Callable實現的,它等價于可以攜帶結果的Runnable,并且有三個狀態:等待、運行和完成。完成包括所有計算以任意的方式結束,包括正常結束、取消和異常。
Future有個get方法而獲取結果只有在計算完成時獲取,否則會一直阻塞直到任務轉入完成狀態,然后會返回結果或者拋出異常。
FutureTask有下面幾個重要的方法:
1.get()
阻塞一直等待執行完成拿到結果
2.get(int timeout, TimeUnit timeUnit)
阻塞一直等待執行完成拿到結果,如果在超時時間內,沒有拿到拋出異常
3.isCancelled()
是否被取消
4.isDone()
是否已經完成
5.cancel(boolean mayInterruptIfRunning)
試圖取消正在執行的任務
用ThreadPoolExecutor和FutureTask實現可取消任務線程池
private?Map> tasks =?new?HashMap>();
// 構造一個線程
private?ThreadPoolExecutor executor =?new?ThreadPoolExecutor(1,?1, 0L, TimeUnit.MILLISECONDS,?newLinkedBlockingQueue());
/**
* @param tasksList
*/
public?void?addTaskList(List> tasksList) {
for?(Callable t : tasksList) {
FutureTask futureTask =?new?FutureTask(t);
executor.execute(futureTask);
String key = Long.toHexString(System.nanoTime());
tasks.put(key, futureTask);
}
}
/**
* @param task
* @return
*/
public?String addTask(Callable task) {
FutureTask futureTask =?new?FutureTask(task);
executor.execute(futureTask);
String key = Long.toHexString(System.nanoTime());
tasks.put(key, futureTask);
return?key;
}
/**
* @param task
* @return
*/
public?String addDBTask(Callable task) {
FutureTask futureTask =?new?FutureTask(task) {
public?boolean?cancel(boolean?mayInterruptIfRunning) {
System.out.println("Roll Back and Closs Session");
return?super.cancel(mayInterruptIfRunning);
}
};
executor.execute(futureTask);
String key = Long.toHexString(System.nanoTime());
tasks.put(key, futureTask);
return?key;
}
/**
* @param key
* @return
*/
public?boolean?taskIsDone(String key) {
FutureTask futureTask = tasks.get(key);
if?(futureTask !=?null) {
return?futureTask.isDone();
}
return?false;
}
/**
* @param key
* @return
*/
public?boolean?taskIsCancelled(String key) {
FutureTask futureTask = tasks.get(key);
if?(futureTask !=?null) {
return?futureTask.isCancelled();
}
return?false;
}
/**
* @param key
* @return
*/
public?String getTaskResult(String key) {
FutureTask futureTask = tasks.get(key);
if?(futureTask.isDone()) {
try?{
String result = futureTask.get();
tasks.remove(key);
return?result;
}?catch?(InterruptedException e) {
e.printStackTrace();
return?null;
}?catch?(ExecutionException e) {
e.printStackTrace();
return?null;
}
}?else?{
return?null;
}
}
/**
* @param task
* @return
*/
public?String addTaskAndWaitResult(Callable task) {
FutureTask futureTask =?new?FutureTask(task);
executor.execute(futureTask);
String key = Long.toHexString(System.nanoTime());
tasks.put(key, futureTask);
try?{
return?futureTask.get();
}?catch?(InterruptedException e) {
e.printStackTrace();
return?null;
}?catch?(ExecutionException e) {
e.printStackTrace();
return?null;
}
}
/**
*
*/
public?void?removeAllTask() {
for?(String key : tasks.keySet()) {
executor.remove((Runnable) tasks.get(key));
tasks.remove(key);
}
}
/**
* @param key
*/
public?void?removeQueryTask(String key) {
executor.remove((Runnable) tasks.get(key));
}
/**
* @param key
*/
public?void?removeTask(String key) {
tasks.remove(key);
}
/**
*
*/
public?void?clearTaskList() {
tasks.clear();
}
public?synchronized?void?stop(){
try?{
executor.shutdownNow();
executor.awaitTermination(1L, TimeUnit.MILLISECONDS);
}?catch?(InterruptedException e) {
e.printStackTrace();
}?finally?{
executor =?null;
tasks.clear();
tasks =?null;
}
}
/**
* @param key
*/
public?void?cancelTask(String key) {
FutureTask futureTask = tasks.get(key);
if?(futureTask !=?null) {
if?(!futureTask.isDone()) {
futureTask.cancel(true);
}
}
}
public?void?purgeCancelTask() {
executor.purge();
executor.getQueue();
}
/**
* @param args
*/
public?static?void?main(String[] args) {
ImageSearchThreadPool exec =?new?ImageSearchThreadPool();
ArrayList keyList =?new?ArrayList();
ArrayList removeKeyList =?new?ArrayList();
ArrayList cancelKeyList =?new?ArrayList();
for?(int?i =?0; i
// 產生一個任務,并將其加入到線程池
String task =?"task@ "?+ (i +?1);
System.out.println("put "?+ task);
keyList.add(exec.addDBTask(new?DBTask(task)));
}
try?{
Thread.sleep(1L);
}?catch?(InterruptedException e) {
e.printStackTrace();
}
for?(int?i =?0; i
if?(exec.taskIsDone(keyList.get(i))) {
System.out.println(exec.getTaskResult(keyList.get(i)));
exec.removeTask(keyList.get(i));
removeKeyList.add(keyList.get(i));
}?else?{
exec.cancelTask(keyList.get(i));
System.out.println("Cancel task: "?+ (i +?1));
exec.removeTask(keyList.get(i));
cancelKeyList.add(keyList.get(i));
}
}
exec.purgeCancelTask();
exec.stop();
try?{
Thread.sleep(6000L);
}?catch?(InterruptedException e) {
e.printStackTrace();
}
for?(String key : cancelKeyList) {
if?(exec.taskIsCancelled(key)) {
System.out.println("Cancel: "?+ key);
}
}
for?(int?i =?0; i
keyList.get(i);
}
}