一、引言
在現代網絡應用開發中,高效的任務調度機制對于提升系統性能和用戶體驗至關重要。OkHttp 作為一款廣泛使用的高性能 HTTP 客戶端庫,其任務調度模塊在處理網絡請求的并發、排隊和執行等方面發揮著關鍵作用。本文將深入 OkHttp 源碼,詳細剖析其任務調度模塊的實現原理和工作流程。
二、任務調度模塊概述
2.1 核心功能
OkHttp 的任務調度模塊主要負責管理和調度 HTTP 請求任務,其核心功能包括:
- 并發控制:限制同時執行的請求數量,避免過多請求耗盡系統資源。
- 任務排隊:當并發請求數量達到上限時,將新請求放入隊列等待執行。
- 異步執行:支持異步請求,通過線程池管理請求的執行,避免阻塞主線程。
2.2 主要組件
任務調度模塊的主要組件包括:
- Dispatcher:調度器,負責任務的分發、排隊和執行管理。
- RealCall:表示一個實際的 HTTP 請求調用。
- AsyncCall:RealCall 的異步執行包裝類。
- 線程池:用于執行異步請求任務。
2.3 整體架構
OkHttp 任務調度模塊的整體架構可以分為以下幾個層次:
-
客戶端層:應用代碼通過
OkHttpClient
發起 HTTP 請求。 -
調度層:
Dispatcher
類負責接收請求任務,對任務進行調度和管理。 -
執行層:
RealCall
和AsyncCall
類負責具體的請求執行。
三、Dispatcher 類源碼分析
3.1 類定義及屬性
java
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;// Dispatcher 類,負責 HTTP 請求任務的調度和管理
public final class Dispatcher {// 最大并發請求數,默認值為 64private int maxRequests = 64;// 每個主機的最大并發請求數,默認值為 5private int maxRequestsPerHost = 5;// 空閑線程的存活時間,默認值為 5 分鐘private long keepAliveDurationNs = TimeUnit.MINUTES.toNanos(5);// 線程池,用于執行異步請求任務private ExecutorService executorService;// 準備執行的異步請求隊列private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();// 正在執行的異步請求隊列private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();// 正在執行的同步請求隊列private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();// 空閑回調函數,當沒有正在執行的請求時會被調用private Runnable idleCallback;
}
在 Dispatcher
類中,定義了一些重要的屬性:
maxRequests
和maxRequestsPerHost
用于控制并發請求數量,防止資源過度占用。keepAliveDurationNs
用于設置空閑線程的存活時間,避免頻繁創建和銷毀線程。executorService
是線程池,用于執行異步請求任務。readyAsyncCalls
、runningAsyncCalls
和runningSyncCalls
分別用于存儲準備執行的異步請求、正在執行的異步請求和正在執行的同步請求。idleCallback
是一個回調函數,當沒有正在執行的請求時會被調用。
3.2 構造函數
java
/*** 默認構造函數,使用默認參數初始化 Dispatcher*/
public Dispatcher() {this(64, 5, 5, TimeUnit.MINUTES);
}/*** 自定義參數的構造函數,允許用戶指定最大并發請求數、每個主機的最大并發請求數、* 空閑線程的存活時間和時間單位* * @param maxRequests 最大并發請求數* @param maxRequestsPerHost 每個主機的最大并發請求數* @param keepAliveDuration 空閑線程的存活時間* @param timeUnit 時間單位*/
public Dispatcher(int maxRequests, int maxRequestsPerHost, long keepAliveDuration, TimeUnit timeUnit) {this.maxRequests = maxRequests;this.maxRequestsPerHost = maxRequestsPerHost;this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
}
構造函數提供了兩種初始化方式,一種是使用默認參數,另一種是允許用戶自定義參數。
3.3 線程池的獲取與創建
java
/*** 獲取線程池,如果線程池未初始化,則進行初始化* * @return 線程池實例*/
public synchronized ExecutorService executorService() {// 檢查線程池是否已經初始化if (executorService == null) {// 創建一個線程池,核心線程數為 0,最大線程數為 Integer.MAX_VALUE// 使用 SynchronousQueue 作為任務隊列,空閑線程存活時間為 60 秒executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}
executorService()
方法用于獲取線程池。如果線程池未初始化,會創建一個新的線程池。這里使用 SynchronousQueue
作為任務隊列,意味著線程池會立即為新任務創建線程,直到達到最大線程數。
3.4 異步請求的調度
3.4.1 enqueue 方法
java
/*** 異步執行請求的方法* * @param call 異步請求對象*/
synchronized void enqueue(AsyncCall call) {// 檢查當前正在執行的異步請求數量和每個主機的并發請求數量是否滿足條件if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {// 如果滿足條件,將請求添加到正在執行的異步請求隊列中runningAsyncCalls.add(call);// 使用線程池執行請求executorService().execute(call);} else {// 如果不滿足條件,將請求添加到準備執行的異步請求隊列中readyAsyncCalls.add(call);}
}/*** 計算指定異步請求對應的主機的正在執行的請求數量* * @param call 異步請求* @return 該主機的正在執行的請求數量*/
private int runningCallsForHost(AsyncCall call) {int result = 0;// 遍歷正在執行的異步請求隊列for (AsyncCall c : runningAsyncCalls) {if (c.host().equals(call.host())) {// 如果主機相同,則計數器加 1result++;}}return result;
}
enqueue
方法用于異步執行請求。它會檢查當前正在執行的異步請求數量和每個主機的并發請求數量是否滿足條件。如果滿足條件,將請求添加到 runningAsyncCalls
隊列中,并使用線程池執行該請求;如果不滿足條件,將請求添加到 readyAsyncCalls
隊列中等待執行。runningCallsForHost
方法用于計算指定異步請求對應的主機的正在執行的請求數量。
3.4.2 finished 方法
java
/*** 異步請求執行完成的回調方法* * @param call 完成的異步請求對象*/
void finished(AsyncCall call) {// 調用內部的 finished 方法處理請求完成邏輯finished(runningAsyncCalls, call, true);
}/*** 處理請求完成的通用方法* * @param calls 正在執行的請求隊列* @param call 完成的請求* @param promoteCalls 是否嘗試從準備隊列中取出請求執行*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {int runningCallsCount;Runnable idleCallback;// 同步操作,確保線程安全synchronized (this) {// 從正在執行的請求隊列中移除完成的請求if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");// 計算當前正在執行的請求數量runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}// 如果需要嘗試從準備隊列中取出請求執行if (promoteCalls) {// 調用 promoteCalls 方法嘗試從準備隊列中取出請求執行promoteCalls();}// 如果沒有正在執行的請求,且設置了空閑回調,則執行回調if (runningCallsCount == 0 && idleCallback != null) {idleCallback.run();}
}/*** 嘗試從準備隊列中取出請求執行*/
private void promoteCalls() {// 如果正在執行的異步請求數量已經達到最大并發請求數,直接返回if (runningAsyncCalls.size() >= maxRequests) return; // 如果準備執行的異步請求隊列為空,直接返回if (readyAsyncCalls.isEmpty()) return; // 遍歷準備執行的異步請求隊列for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall call = i.next();// 檢查該請求對應的主機的并發請求數量是否滿足條件if (runningCallsForHost(call) < maxRequestsPerHost) {// 從準備隊列中移除該請求i.remove();// 將請求添加到正在執行的異步請求隊列中runningAsyncCalls.add(call);// 使用線程池執行該請求executorService().execute(call);}// 如果正在執行的異步請求數量已經達到最大并發請求數,停止遍歷if (runningAsyncCalls.size() >= maxRequests) return; }
}/*** 計算當前正在執行的請求數量* * @return 正在執行的請求數量*/
private int runningCallsCount() {return runningAsyncCalls.size() + runningSyncCalls.size();
}
finished
方法用于處理異步請求執行完成的情況。它會從 runningAsyncCalls
隊列中移除完成的請求,并調用 promoteCalls
方法嘗試從 readyAsyncCalls
隊列中取出請求執行。promoteCalls
方法會遍歷 readyAsyncCalls
隊列,檢查每個請求對應的主機的并發請求數量是否滿足條件,如果滿足條件,則將請求從 readyAsyncCalls
隊列中移除,添加到 runningAsyncCalls
隊列中,并使用線程池執行該請求。
3.5 同步請求的調度
3.5.1 executed 方法
java
/*** 同步執行請求的方法* * @param call 同步請求對象*/
synchronized void executed(RealCall call) {// 將請求添加到正在執行的同步請求隊列中runningSyncCalls.add(call);
}/*** 同步請求執行完成的回調方法* * @param call 完成的同步請求對象*/
void finished(RealCall call) {// 調用內部的 finished 方法處理請求完成邏輯finished(runningSyncCalls, call, false);
}
executed
方法用于同步執行請求,它會將請求添加到 runningSyncCalls
隊列中。finished
方法用于處理同步請求執行完成的情況,它會從 runningSyncCalls
隊列中移除完成的請求。
3.6 其他方法
java
/*** 設置空閑回調函數* * @param idleCallback 空閑回調函數*/
public void setIdleCallback(Runnable idleCallback) {this.idleCallback = idleCallback;
}/*** 獲取最大并發請求數* * @return 最大并發請求數*/
public int getMaxRequests() {return maxRequests;
}/*** 設置最大并發請求數* * @param maxRequests 最大并發請求數*/
public void setMaxRequests(int maxRequests) {if (maxRequests < 1) {throw new IllegalArgumentException("maxRequests < 1: " + maxRequests);}synchronized (this) {this.maxRequests = maxRequests;}promoteCalls();
}/*** 獲取每個主機的最大并發請求數* * @return 每個主機的最大并發請求數*/
public int getMaxRequestsPerHost() {return maxRequestsPerHost;
}/*** 設置每個主機的最大并發請求數* * @param maxRequestsPerHost 每個主機的最大并發請求數*/
public void setMaxRequestsPerHost(int maxRequestsPerHost) {if (maxRequestsPerHost < 1) {throw new IllegalArgumentException("maxRequestsPerHost < 1: " + maxRequestsPerHost);}synchronized (this) {this.maxRequestsPerHost = maxRequestsPerHost;}promoteCalls();
}/*** 獲取準備執行的異步請求隊列* * @return 準備執行的異步請求隊列*/
public synchronized List<Call> readyCalls() {List<Call> result = new ArrayList<>();for (AsyncCall asyncCall : readyAsyncCalls) {result.add(asyncCall.get());}return result;
}/*** 獲取正在執行的異步請求隊列* * @return 正在執行的異步請求隊列*/
public synchronized List<Call> runningCalls() {List<Call> result = new ArrayList<>();result.addAll(runningSyncCalls);for (AsyncCall asyncCall : runningAsyncCalls) {result.add(asyncCall.get());}return result;
}/*** 獲取準備執行的異步請求數量* * @return 準備執行的異步請求數量*/
public synchronized int readyCallsCount() {return readyAsyncCalls.size();
}/*** 獲取正在執行的請求數量* * @return 正在執行的請求數量*/
public synchronized int runningCallsCount() {return runningAsyncCalls.size() + runningSyncCalls.size();
}
這些方法提供了對 Dispatcher
類的一些屬性的訪問和修改功能,例如設置最大并發請求數、每個主機的最大并發請求數,獲取準備執行的請求隊列、正在執行的請求隊列等。
四、RealCall 類源碼分析
4.1 類定義及屬性
java
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.cache.CacheInterceptor;
import okhttp3.internal.connection.ConnectInterceptor;
import okhttp3.internal.connection.StreamAllocation;
import okhttp3.internal.http.BridgeInterceptor;
import okhttp3.internal.http.CallServerInterceptor;
import okhttp3.internal.http.RealInterceptorChain;
import okhttp3.internal.http.RetryAndFollowUpInterceptor;
import okhttp3.internal.platform.Platform;import java.io.IOException;
import java.util.List;// RealCall 類,表示一個實際的 HTTP 請求調用
final class RealCall implements Call {// OkHttpClient 實例,用于獲取配置信息final OkHttpClient client;// 重試和重定向攔截器final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;// 原始請求對象final Request originalRequest;// 標記該請求是否已經被執行boolean executed;// 用于 WebSocket 請求的標記final boolean forWebSocket;// 事件監聽器,用于監聽請求的各個階段事件private EventListener eventListener;
}
RealCall
類表示一個實際的 HTTP 請求調用。它包含了 OkHttpClient
實例、原始請求對象、重試和重定向攔截器等重要屬性。executed
標記用于確保一個請求只能被執行一次。forWebSocket
標記用于區分是否是 WebSocket 請求。eventListener
用于監聽請求的各個階段事件。
4.2 構造函數
java
/*** 構造函數,初始化 RealCall 實例* * @param client OkHttpClient 實例* @param originalRequest 原始請求對象* @param forWebSocket 是否為 WebSocket 請求*/
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {this.client = client;this.originalRequest = originalRequest;this.forWebSocket = forWebSocket;// 創建重試和重定向攔截器this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
構造函數用于初始化 RealCall
實例,同時創建重試和重定向攔截器。
4.3 同步請求執行方法
java
@Override
public Response execute() throws IOException {// 同步操作,確保該請求只被執行一次synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}// 標記請求開始captureCallStackTrace();// 記錄請求開始時間eventListener.callStart(this);try {// 調用 Dispatcher 的 executed 方法,將該請求添加到正在執行的同步請求隊列中client.dispatcher().executed(this);// 調用 getResponseWithInterceptorChain 方法獲取響應Response result = getResponseWithInterceptorChain();if (result == null) throw new IOException("Canceled");return result;} catch (IOException e) {// 記錄請求失敗事件eventListener.callFailed(this, e);throw e;} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}
}/*** 捕獲請求調用的堆棧信息*/
private void captureCallStackTrace() {Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}/*** 通過攔截器鏈獲取響應* * @return 響應對象* @throws IOException 如果請求過程中出現 I/O 異常*/
Response getResponseWithInterceptorChain() throws IOException {// 創建攔截器列表List<Interceptor> interceptors = new ArrayList<>();// 添加用戶自定義的攔截器interceptors.addAll(client.interceptors());// 添加重試和重定向攔截器interceptors.add(retryAndFollowUpInterceptor);// 添加橋接攔截器,將用戶請求轉換為符合 HTTP 協議的請求interceptors.add(new BridgeInterceptor(client.cookieJar()));// 添加緩存攔截器,處理緩存邏輯interceptors.add(new CacheInterceptor(client.internalCache()));// 添加連接攔截器,建立與服務器的連接interceptors.add(new ConnectInterceptor(client));if (!forWebSocket) {// 如果不是 WebSocket 請求,添加用戶自定義的網絡攔截器interceptors.addAll(client.networkInterceptors());}// 添加調用服務器攔截器,負責與服務器進行實際的交互interceptors.add(new CallServerInterceptor(forWebSocket));// 創建攔截器鏈RealInterceptorChain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());// 調用攔截器鏈的 proceed 方法開始處理請求return chain.proceed(originalRequest);
}
execute
方法用于同步執行請求。首先,它會檢查該請求是否已經被執行,如果已經執行則拋出異常。然后,調用 captureCallStackTrace
方法捕獲請求調用的堆棧信息,記錄請求開始時間。接著,將該請求添加到正在執行的同步請求隊列中,并調用 getResponseWithInterceptorChain
方法通過攔截器鏈獲取響應。如果請求過程中出現異常,會記錄請求失敗事件。最后,調用 finished
方法處理請求完成邏輯。
getResponseWithInterceptorChain
方法會創建一個攔截器列表,依次添加用戶自定義的攔截器、重試和重定向攔截器、橋接攔截器、緩存攔截器、連接攔截器、網絡攔截器和調用服務器攔截器。然后創建一個 RealInterceptorChain
實例,并調用其 proceed
方法開始處理請求。
4.4 異步請求執行方法
java
@Override
public void enqueue(Callback responseCallback) {// 同步操作,確保該請求只被執行一次synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}// 標記請求開始captureCallStackTrace();// 記錄請求開始時間eventListener.callStart(this);// 創建 AsyncCall 實例,用于異步執行請求AsyncCall asyncCall = new AsyncCall(responseCallback);// 調用 Dispatcher 的 enqueue 方法,將異步請求添加到調度隊列中client.dispatcher().enqueue(asyncCall);
}// AsyncCall 類,繼承自 NamedRunnable,用于異步執行請求
final class AsyncCall extends NamedRunnable {private final Callback responseCallback;/*** 構造函數,初始化 AsyncCall 實例* * @param responseCallback 響應回調函數*/AsyncCall(Callback responseCallback) {super("OkHttp %s", redactedUrl());this.responseCallback = responseCallback;}/*** 獲取請求的主機地址* * @return 請求的主機地址*/String host() {return originalRequest.url().host();}/*** 獲取原始請求對象* * @return 原始請求對象*/Request request() {return originalRequest;}/*** 獲取 RealCall 實例* * @return RealCall 實例*/RealCall get() {return RealCall.this;}@Overrideprotected void execute() {boolean signalledCallback = false;try {// 調用 getResponseWithInterceptorChain 方法獲取響應Response response = getResponseWithInterceptorChain();// 檢查請求是否被取消if (retryAndFollowUpInterceptor.isCanceled()) {signalledCallback = true;// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {signalledCallback = true;// 調用回調函數的 onResponse 方法responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {// 記錄請求失敗事件eventListener.callFailed(RealCall.this, e);// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, e);}} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}}
}
enqueue
方法用于異步執行請求。首先,它會檢查該請求是否已經被執行,如果已經執行則拋出異常。然后,調用 captureCallStackTrace
方法捕獲請求調用的堆棧信息,記錄請求開始時間。接著,創建一個 AsyncCall
實例,并將其添加到調度隊列中。
AsyncCall
類繼承自 NamedRunnable
,用于異步執行請求。在 execute
方法中,它會調用 getResponseWithInterceptorChain
方法獲取響應,并根據請求是否被取消調用回調函數的 onFailure
或 onResponse
方法。如果請求過程中出現異常,會記錄請求失敗事件。最后,調用 finished
方法處理請求完成邏輯。
五、線程池的使用和管理
5.1 線程池的創建和配置
java
// 在 Dispatcher 類中
public synchronized ExecutorService executorService() {if (executorService == null) {executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}
這里使用了 ThreadPoolExecutor
來創建線程池,具體配置如下:
- 核心線程數:0,表示線程池在沒有任務時不會保留任何線程。
- 最大線程數:
Integer.MAX_VALUE
,表示線程池可以創建的最大線程數沒有限制。 - 空閑線程存活時間:60 秒,當線程空閑超過 60 秒時,會被銷毀。
- 任務隊列:
SynchronousQueue
,這是一個沒有容量的阻塞隊列,每個插入操作必須等待另一個線程的移除操作,反之亦然。這意味著線程池會立即為新任務創建線程,直到達到最大線程數。 - 線程工廠:
Util.threadFactory("OkHttp Dispatcher", false)
,用于創建線程,線程名稱為OkHttp Dispatcher
。
5.2 線程池的任務執行和管理
在 Dispatcher
類的 enqueue
方法中,當滿足條件時,會將異步請求添加到正在
接著繼續分析
六、任務調度的并發控制原理
6.1 并發控制的整體邏輯
OkHttp 的任務調度模塊通過 Dispatcher
類實現了精細的并發控制,主要體現在對最大并發請求數(maxRequests
)和每個主機的最大并發請求數(maxRequestsPerHost
)的限制上。這兩個參數的設置可以避免過多的請求同時發起,從而防止系統資源被過度占用,確保應用的穩定性和性能。
6.2 最大并發請求數的控制
在 Dispatcher
類的 enqueue
方法中,會檢查當前正在執行的異步請求數量是否超過 maxRequests
:
java
synchronized void enqueue(AsyncCall call) {// 檢查當前正在執行的異步請求數量和每個主機的并發請求數量是否滿足條件if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {// 如果滿足條件,將請求添加到正在執行的異步請求隊列中runningAsyncCalls.add(call);// 使用線程池執行請求executorService().execute(call);} else {// 如果不滿足條件,將請求添加到準備執行的異步請求隊列中readyAsyncCalls.add(call);}
}
當 runningAsyncCalls
的大小小于 maxRequests
時,說明當前并發請求數量未達到上限,新的請求可以被立即執行;否則,新請求會被放入 readyAsyncCalls
隊列中等待。
6.3 每個主機的最大并發請求數的控制
runningCallsForHost
方法用于計算指定異步請求對應的主機的正在執行的請求數量:
java
private int runningCallsForHost(AsyncCall call) {int result = 0;// 遍歷正在執行的異步請求隊列for (AsyncCall c : runningAsyncCalls) {if (c.host().equals(call.host())) {// 如果主機相同,則計數器加 1result++;}}return result;
}
在 enqueue
方法中,除了檢查最大并發請求數,還會調用 runningCallsForHost
方法檢查當前主機的并發請求數量是否超過 maxRequestsPerHost
。如果超過,則新請求同樣會被放入 readyAsyncCalls
隊列中等待。
6.4 并發控制的動態調整
Dispatcher
類提供了 setMaxRequests
和 setMaxRequestsPerHost
方法,允許開發者動態調整最大并發請求數和每個主機的最大并發請求數:
java
public void setMaxRequests(int maxRequests) {if (maxRequests < 1) {throw new IllegalArgumentException("maxRequests < 1: " + maxRequests);}synchronized (this) {this.maxRequests = maxRequests;}promoteCalls();
}public void setMaxRequestsPerHost(int maxRequestsPerHost) {if (maxRequestsPerHost < 1) {throw new IllegalArgumentException("maxRequestsPerHost < 1: " + maxRequestsPerHost);}synchronized (this) {this.maxRequestsPerHost = maxRequestsPerHost;}promoteCalls();
}
在調整這些參數后,會調用 promoteCalls
方法,嘗試從 readyAsyncCalls
隊列中取出滿足條件的請求執行,以確保并發控制的動態調整能夠及時生效。
七、任務排隊和優先級處理
7.1 任務排隊機制
當新的請求不滿足并發條件時,會被添加到 readyAsyncCalls
隊列中等待執行。readyAsyncCalls
是一個 ArrayDeque
,它是一個雙端隊列,具有先進先出(FIFO)的特性。這意味著先進入隊列的請求會先被處理,保證了請求的公平性。
7.2 任務優先級處理
OkHttp 本身并沒有直接提供任務優先級處理的機制,所有請求默認按照 FIFO 的順序執行。不過,開發者可以通過自定義攔截器或者對請求進行包裝,來實現簡單的優先級處理。例如,可以為每個請求添加一個優先級標記,在 Dispatcher
類的 promoteCalls
方法中,根據優先級從 readyAsyncCalls
隊列中選擇請求執行。
以下是一個簡單的示例,展示如何實現基于優先級的任務調度:
java
// 自定義一個帶有優先級的 AsyncCall 類
class PriorityAsyncCall extends AsyncCall {private final int priority;public PriorityAsyncCall(Callback responseCallback, int priority) {super(responseCallback);this.priority = priority;}public int getPriority() {return priority;}
}// 修改 Dispatcher 類的 promoteCalls 方法
private void promoteCalls() {if (runningAsyncCalls.size() >= maxRequests) return; if (readyAsyncCalls.isEmpty()) return; // 對準備執行的隊列按照優先級排序List<PriorityAsyncCall> sortedCalls = new ArrayList<>();for (AsyncCall call : readyAsyncCalls) {if (call instanceof PriorityAsyncCall) {sortedCalls.add((PriorityAsyncCall) call);}}sortedCalls.sort((c1, c2) -> Integer.compare(c2.getPriority(), c1.getPriority()));for (PriorityAsyncCall call : sortedCalls) {if (runningCallsForHost(call) < maxRequestsPerHost) {readyAsyncCalls.remove(call);runningAsyncCalls.add(call);executorService().execute(call);}if (runningAsyncCalls.size() >= maxRequests) return; }
}
在這個示例中,我們自定義了一個 PriorityAsyncCall
類,為每個請求添加了一個優先級標記。然后修改了 Dispatcher
類的 promoteCalls
方法,在從 readyAsyncCalls
隊列中選擇請求時,按照優先級進行排序,優先執行優先級高的請求。
八、任務調度與攔截器鏈的協同工作
8.1 攔截器鏈的作用
OkHttp 的攔截器鏈是一個強大的機制,它允許開發者在請求發送和響應返回的過程中插入自定義的邏輯。攔截器鏈由多個攔截器組成,每個攔截器都可以對請求進行修改、記錄日志、處理緩存等操作。
8.2 任務調度與攔截器鏈的結合點
在 RealCall
類的 execute
和 enqueue
方法中,都會調用 getResponseWithInterceptorChain
方法來獲取響應。這個方法會創建一個攔截器鏈,并依次調用每個攔截器的 intercept
方法:
java
Response getResponseWithInterceptorChain() throws IOException {// 創建攔截器列表List<Interceptor> interceptors = new ArrayList<>();// 添加用戶自定義的攔截器interceptors.addAll(client.interceptors());// 添加重試和重定向攔截器interceptors.add(retryAndFollowUpInterceptor);// 添加橋接攔截器,將用戶請求轉換為符合 HTTP 協議的請求interceptors.add(new BridgeInterceptor(client.cookieJar()));// 添加緩存攔截器,處理緩存邏輯interceptors.add(new CacheInterceptor(client.internalCache()));// 添加連接攔截器,建立與服務器的連接interceptors.add(new ConnectInterceptor(client));if (!forWebSocket) {// 如果不是 WebSocket 請求,添加用戶自定義的網絡攔截器interceptors.addAll(client.networkInterceptors());}// 添加調用服務器攔截器,負責與服務器進行實際的交互interceptors.add(new CallServerInterceptor(forWebSocket));// 創建攔截器鏈RealInterceptorChain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());// 調用攔截器鏈的 proceed 方法開始處理請求return chain.proceed(originalRequest);
}
在任務調度的過程中,攔截器鏈會在請求執行的各個階段發揮作用。例如,緩存攔截器可以在請求發送之前檢查緩存,如果緩存命中,則直接返回緩存的響應,避免了不必要的網絡請求;重試和重定向攔截器可以在請求失敗或者需要重定向時,自動進行重試或者重定向操作。
8.3 攔截器對任務調度的影響
攔截器的執行時間和邏輯會影響任務調度的性能。如果某個攔截器的執行時間過長,會導致整個請求的處理時間增加,從而影響后續請求的執行。因此,在編寫攔截器時,需要注意優化攔截器的邏輯,避免在攔截器中進行耗時的操作。
九、異常處理和重試機制
9.1 異常處理
在 RealCall
類的 execute
和 enqueue
方法中,都會對請求過程中可能出現的異常進行捕獲和處理。例如,在 execute
方法中:
java
@Override
public Response execute() throws IOException {// 同步操作,確保該請求只被執行一次synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}// 標記請求開始captureCallStackTrace();// 記錄請求開始時間eventListener.callStart(this);try {// 調用 Dispatcher 的 executed 方法,將該請求添加到正在執行的同步請求隊列中client.dispatcher().executed(this);// 調用 getResponseWithInterceptorChain 方法獲取響應Response result = getResponseWithInterceptorChain();if (result == null) throw new IOException("Canceled");return result;} catch (IOException e) {// 記錄請求失敗事件eventListener.callFailed(this, e);throw e;} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}
}
如果在請求過程中出現 IOException
,會記錄請求失敗事件,并將異常拋出。在 AsyncCall
類的 execute
方法中,也會對異常進行類似的處理:
java
@Override
protected void execute() {boolean signalledCallback = false;try {// 調用 getResponseWithInterceptorChain 方法獲取響應Response response = getResponseWithInterceptorChain();// 檢查請求是否被取消if (retryAndFollowUpInterceptor.isCanceled()) {signalledCallback = true;// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {signalledCallback = true;// 調用回調函數的 onResponse 方法responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {// 記錄請求失敗事件eventListener.callFailed(RealCall.this, e);// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, e);}} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}
}
9.2 重試機制
OkHttp 的重試機制主要由 RetryAndFollowUpInterceptor
類實現。該攔截器會在請求失敗或者需要重定向時,自動進行重試或者重定向操作。在 RetryAndFollowUpInterceptor
類的 intercept
方法中,會對響應的狀態碼進行檢查,如果需要重試或者重定向,會重新發起請求:
java
@Override public Response intercept(Chain chain) throws IOException {Request request = chain.request();StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),createAddress(request.url()), call, eventListener, callStackTrace);this.streamAllocation = streamAllocation;int followUpCount = 0;Response priorResponse = null;while (true) {if (canceled) {streamAllocation.release();throw new IOException("Canceled");}Response response;boolean releaseConnection = true;try {response = realChain.proceed(request, streamAllocation, null, null);releaseConnection = false;} catch (RouteException e) {// 處理路由異常,嘗試重試if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {throw e.getLastConnectException();}releaseConnection = false;continue;} catch (IOException e) {// 處理其他 I/O 異常,嘗試重試boolean requestSendStarted = !(e instanceof ConnectionShutdownException);if (!recover(e, streamAllocation, requestSendStarted, request)) {throw e;}releaseConnection = false;continue;} finally {if (releaseConnection) {streamAllocation.streamFailed(null);streamAllocation.release();}}// 處理重定向Request followUp = followUpRequest(response, streamAllocation.route());if (followUp == null) {if (!forWebSocket) {streamAllocation.release();}return response;}closeQuietly(response.body());if (++followUpCount > MAX_FOLLOW_UPS) {streamAllocation.release();throw new ProtocolException("Too many follow-up requests: " + followUpCount);}if (followUp.body() instanceof UnrepeatableRequestBody) {streamAllocation.release();throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());}if (!sameConnection(response, followUp.url())) {streamAllocation.release();streamAllocation = new StreamAllocation(client.connectionPool(),createAddress(followUp.url()), call, eventListener, callStackTrace);this.streamAllocation = streamAllocation;} else if (streamAllocation.codec() != null) {throw new IllegalStateException("Closing the body of " + response+ " didn't close its backing stream. Bad interceptor?");}request = followUp;priorResponse = response;}
}
在這個方法中,會捕獲 RouteException
和其他 IOException
,并調用 recover
方法嘗試重試。同時,會檢查響應的狀態碼,如果需要重定向,會生成一個新的請求并重新發起。
十、任務調度模塊的性能優化和注意事項
10.1 性能優化建議
- 合理配置線程池:線程池的配置對任務調度的性能有很大影響。可以根據應用的實際情況,調整線程池的核心線程數、最大線程數和空閑線程存活時間,避免過多的線程創建和銷毀帶來的性能開銷。
- 優化攔截器邏輯:攔截器的執行時間會影響請求的處理速度。在編寫攔截器時,需要注意優化攔截器的邏輯,避免在攔截器中進行耗時的操作。
- 控制并發請求數量:合理設置
maxRequests
和maxRequestsPerHost
參數,避免過多的并發請求對服務器和系統資源造成過大的壓力。
10.2 注意事項
- 線程安全問題:
Dispatcher
類中的部分方法使用了synchronized
關鍵字進行同步,以確保線程安全。在使用和擴展任務調度模塊時,需要注意線程安全問題,避免出現數據不一致的情況。 - 異常處理:在請求過程中可能會出現各種異常,需要在代碼中進行充分的異常處理,避免應用崩潰。同時,要注意對異常信息的記錄和分析,以便及時發現和解決問題。
- 資源管理:在請求完成后,需要及時釋放相關的資源,如網絡連接、緩存等,避免資源泄漏。
十一、總結
OkHttp 的任務調度模塊通過 Dispatcher
類實現了高效的任務調度和并發控制。它利用線程池來執行異步請求,通過隊列來管理請求的排隊和執行順序,同時結合攔截器鏈實現了請求的預處理、緩存處理和重試重定向等功能。在使用 OkHttp 的任務調度模塊時,開發者需要深入理解其原理和機制,合理配置參數,優化代碼邏輯,以確保應用的性能和穩定性。隨著網絡技術的不斷發展,OkHttp 的任務調度模塊也可能會不斷優化和改進,為開發者提供更強大、更靈活的功能。