OkHttp 之任務調度模塊源碼分析

一、引言

在現代網絡應用開發中,高效的任務調度機制對于提升系統性能和用戶體驗至關重要。OkHttp 作為一款廣泛使用的高性能 HTTP 客戶端庫,其任務調度模塊在處理網絡請求的并發、排隊和執行等方面發揮著關鍵作用。本文將深入 OkHttp 源碼,詳細剖析其任務調度模塊的實現原理和工作流程。

二、任務調度模塊概述

2.1 核心功能

OkHttp 的任務調度模塊主要負責管理和調度 HTTP 請求任務,其核心功能包括:

  • 并發控制:限制同時執行的請求數量,避免過多請求耗盡系統資源。
  • 任務排隊:當并發請求數量達到上限時,將新請求放入隊列等待執行。
  • 異步執行:支持異步請求,通過線程池管理請求的執行,避免阻塞主線程。

2.2 主要組件

任務調度模塊的主要組件包括:

  • Dispatcher:調度器,負責任務的分發、排隊和執行管理。
  • RealCall:表示一個實際的 HTTP 請求調用。
  • AsyncCall:RealCall 的異步執行包裝類。
  • 線程池:用于執行異步請求任務。

2.3 整體架構

OkHttp 任務調度模塊的整體架構可以分為以下幾個層次:

  1. 客戶端層:應用代碼通過 OkHttpClient 發起 HTTP 請求。

  2. 調度層Dispatcher 類負責接收請求任務,對任務進行調度和管理。

  3. 執行層RealCallAsyncCall 類負責具體的請求執行。

三、Dispatcher 類源碼分析

3.1 類定義及屬性

java

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;// Dispatcher 類,負責 HTTP 請求任務的調度和管理
public final class Dispatcher {// 最大并發請求數,默認值為 64private int maxRequests = 64;// 每個主機的最大并發請求數,默認值為 5private int maxRequestsPerHost = 5;// 空閑線程的存活時間,默認值為 5 分鐘private long keepAliveDurationNs = TimeUnit.MINUTES.toNanos(5);// 線程池,用于執行異步請求任務private ExecutorService executorService;// 準備執行的異步請求隊列private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();// 正在執行的異步請求隊列private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();// 正在執行的同步請求隊列private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();// 空閑回調函數,當沒有正在執行的請求時會被調用private Runnable idleCallback;
}

Dispatcher 類中,定義了一些重要的屬性:

  • maxRequestsmaxRequestsPerHost 用于控制并發請求數量,防止資源過度占用。
  • keepAliveDurationNs 用于設置空閑線程的存活時間,避免頻繁創建和銷毀線程。
  • executorService 是線程池,用于執行異步請求任務。
  • readyAsyncCallsrunningAsyncCallsrunningSyncCalls 分別用于存儲準備執行的異步請求、正在執行的異步請求和正在執行的同步請求。
  • idleCallback 是一個回調函數,當沒有正在執行的請求時會被調用。

3.2 構造函數

java

/*** 默認構造函數,使用默認參數初始化 Dispatcher*/
public Dispatcher() {this(64, 5, 5, TimeUnit.MINUTES);
}/*** 自定義參數的構造函數,允許用戶指定最大并發請求數、每個主機的最大并發請求數、* 空閑線程的存活時間和時間單位* * @param maxRequests 最大并發請求數* @param maxRequestsPerHost 每個主機的最大并發請求數* @param keepAliveDuration 空閑線程的存活時間* @param timeUnit 時間單位*/
public Dispatcher(int maxRequests, int maxRequestsPerHost, long keepAliveDuration, TimeUnit timeUnit) {this.maxRequests = maxRequests;this.maxRequestsPerHost = maxRequestsPerHost;this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
}

構造函數提供了兩種初始化方式,一種是使用默認參數,另一種是允許用戶自定義參數。

3.3 線程池的獲取與創建

java

/*** 獲取線程池,如果線程池未初始化,則進行初始化* * @return 線程池實例*/
public synchronized ExecutorService executorService() {// 檢查線程池是否已經初始化if (executorService == null) {// 創建一個線程池,核心線程數為 0,最大線程數為 Integer.MAX_VALUE// 使用 SynchronousQueue 作為任務隊列,空閑線程存活時間為 60 秒executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}

executorService() 方法用于獲取線程池。如果線程池未初始化,會創建一個新的線程池。這里使用 SynchronousQueue 作為任務隊列,意味著線程池會立即為新任務創建線程,直到達到最大線程數。

3.4 異步請求的調度

3.4.1 enqueue 方法

java

/*** 異步執行請求的方法* * @param call 異步請求對象*/
synchronized void enqueue(AsyncCall call) {// 檢查當前正在執行的異步請求數量和每個主機的并發請求數量是否滿足條件if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {// 如果滿足條件,將請求添加到正在執行的異步請求隊列中runningAsyncCalls.add(call);// 使用線程池執行請求executorService().execute(call);} else {// 如果不滿足條件,將請求添加到準備執行的異步請求隊列中readyAsyncCalls.add(call);}
}/*** 計算指定異步請求對應的主機的正在執行的請求數量* * @param call 異步請求* @return 該主機的正在執行的請求數量*/
private int runningCallsForHost(AsyncCall call) {int result = 0;// 遍歷正在執行的異步請求隊列for (AsyncCall c : runningAsyncCalls) {if (c.host().equals(call.host())) {// 如果主機相同,則計數器加 1result++;}}return result;
}

enqueue 方法用于異步執行請求。它會檢查當前正在執行的異步請求數量和每個主機的并發請求數量是否滿足條件。如果滿足條件,將請求添加到 runningAsyncCalls 隊列中,并使用線程池執行該請求;如果不滿足條件,將請求添加到 readyAsyncCalls 隊列中等待執行。runningCallsForHost 方法用于計算指定異步請求對應的主機的正在執行的請求數量。

3.4.2 finished 方法

java

/*** 異步請求執行完成的回調方法* * @param call 完成的異步請求對象*/
void finished(AsyncCall call) {// 調用內部的 finished 方法處理請求完成邏輯finished(runningAsyncCalls, call, true);
}/*** 處理請求完成的通用方法* * @param calls 正在執行的請求隊列* @param call 完成的請求* @param promoteCalls 是否嘗試從準備隊列中取出請求執行*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {int runningCallsCount;Runnable idleCallback;// 同步操作,確保線程安全synchronized (this) {// 從正在執行的請求隊列中移除完成的請求if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");// 計算當前正在執行的請求數量runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}// 如果需要嘗試從準備隊列中取出請求執行if (promoteCalls) {// 調用 promoteCalls 方法嘗試從準備隊列中取出請求執行promoteCalls();}// 如果沒有正在執行的請求,且設置了空閑回調,則執行回調if (runningCallsCount == 0 && idleCallback != null) {idleCallback.run();}
}/*** 嘗試從準備隊列中取出請求執行*/
private void promoteCalls() {// 如果正在執行的異步請求數量已經達到最大并發請求數,直接返回if (runningAsyncCalls.size() >= maxRequests) return; // 如果準備執行的異步請求隊列為空,直接返回if (readyAsyncCalls.isEmpty()) return; // 遍歷準備執行的異步請求隊列for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall call = i.next();// 檢查該請求對應的主機的并發請求數量是否滿足條件if (runningCallsForHost(call) < maxRequestsPerHost) {// 從準備隊列中移除該請求i.remove();// 將請求添加到正在執行的異步請求隊列中runningAsyncCalls.add(call);// 使用線程池執行該請求executorService().execute(call);}// 如果正在執行的異步請求數量已經達到最大并發請求數,停止遍歷if (runningAsyncCalls.size() >= maxRequests) return; }
}/*** 計算當前正在執行的請求數量* * @return 正在執行的請求數量*/
private int runningCallsCount() {return runningAsyncCalls.size() + runningSyncCalls.size();
}

finished 方法用于處理異步請求執行完成的情況。它會從 runningAsyncCalls 隊列中移除完成的請求,并調用 promoteCalls 方法嘗試從 readyAsyncCalls 隊列中取出請求執行。promoteCalls 方法會遍歷 readyAsyncCalls 隊列,檢查每個請求對應的主機的并發請求數量是否滿足條件,如果滿足條件,則將請求從 readyAsyncCalls 隊列中移除,添加到 runningAsyncCalls 隊列中,并使用線程池執行該請求。

3.5 同步請求的調度

3.5.1 executed 方法

java

/*** 同步執行請求的方法* * @param call 同步請求對象*/
synchronized void executed(RealCall call) {// 將請求添加到正在執行的同步請求隊列中runningSyncCalls.add(call);
}/*** 同步請求執行完成的回調方法* * @param call 完成的同步請求對象*/
void finished(RealCall call) {// 調用內部的 finished 方法處理請求完成邏輯finished(runningSyncCalls, call, false);
}

executed 方法用于同步執行請求,它會將請求添加到 runningSyncCalls 隊列中。finished 方法用于處理同步請求執行完成的情況,它會從 runningSyncCalls 隊列中移除完成的請求。

3.6 其他方法

java

/*** 設置空閑回調函數* * @param idleCallback 空閑回調函數*/
public void setIdleCallback(Runnable idleCallback) {this.idleCallback = idleCallback;
}/*** 獲取最大并發請求數* * @return 最大并發請求數*/
public int getMaxRequests() {return maxRequests;
}/*** 設置最大并發請求數* * @param maxRequests 最大并發請求數*/
public void setMaxRequests(int maxRequests) {if (maxRequests < 1) {throw new IllegalArgumentException("maxRequests < 1: " + maxRequests);}synchronized (this) {this.maxRequests = maxRequests;}promoteCalls();
}/*** 獲取每個主機的最大并發請求數* * @return 每個主機的最大并發請求數*/
public int getMaxRequestsPerHost() {return maxRequestsPerHost;
}/*** 設置每個主機的最大并發請求數* * @param maxRequestsPerHost 每個主機的最大并發請求數*/
public void setMaxRequestsPerHost(int maxRequestsPerHost) {if (maxRequestsPerHost < 1) {throw new IllegalArgumentException("maxRequestsPerHost < 1: " + maxRequestsPerHost);}synchronized (this) {this.maxRequestsPerHost = maxRequestsPerHost;}promoteCalls();
}/*** 獲取準備執行的異步請求隊列* * @return 準備執行的異步請求隊列*/
public synchronized List<Call> readyCalls() {List<Call> result = new ArrayList<>();for (AsyncCall asyncCall : readyAsyncCalls) {result.add(asyncCall.get());}return result;
}/*** 獲取正在執行的異步請求隊列* * @return 正在執行的異步請求隊列*/
public synchronized List<Call> runningCalls() {List<Call> result = new ArrayList<>();result.addAll(runningSyncCalls);for (AsyncCall asyncCall : runningAsyncCalls) {result.add(asyncCall.get());}return result;
}/*** 獲取準備執行的異步請求數量* * @return 準備執行的異步請求數量*/
public synchronized int readyCallsCount() {return readyAsyncCalls.size();
}/*** 獲取正在執行的請求數量* * @return 正在執行的請求數量*/
public synchronized int runningCallsCount() {return runningAsyncCalls.size() + runningSyncCalls.size();
}

這些方法提供了對 Dispatcher 類的一些屬性的訪問和修改功能,例如設置最大并發請求數、每個主機的最大并發請求數,獲取準備執行的請求隊列、正在執行的請求隊列等。

四、RealCall 類源碼分析

4.1 類定義及屬性

java

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.cache.CacheInterceptor;
import okhttp3.internal.connection.ConnectInterceptor;
import okhttp3.internal.connection.StreamAllocation;
import okhttp3.internal.http.BridgeInterceptor;
import okhttp3.internal.http.CallServerInterceptor;
import okhttp3.internal.http.RealInterceptorChain;
import okhttp3.internal.http.RetryAndFollowUpInterceptor;
import okhttp3.internal.platform.Platform;import java.io.IOException;
import java.util.List;// RealCall 類,表示一個實際的 HTTP 請求調用
final class RealCall implements Call {// OkHttpClient 實例,用于獲取配置信息final OkHttpClient client;// 重試和重定向攔截器final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;// 原始請求對象final Request originalRequest;// 標記該請求是否已經被執行boolean executed;// 用于 WebSocket 請求的標記final boolean forWebSocket;// 事件監聽器,用于監聽請求的各個階段事件private EventListener eventListener;
}

RealCall 類表示一個實際的 HTTP 請求調用。它包含了 OkHttpClient 實例、原始請求對象、重試和重定向攔截器等重要屬性。executed 標記用于確保一個請求只能被執行一次。forWebSocket 標記用于區分是否是 WebSocket 請求。eventListener 用于監聽請求的各個階段事件。

4.2 構造函數

java

/*** 構造函數,初始化 RealCall 實例* * @param client OkHttpClient 實例* @param originalRequest 原始請求對象* @param forWebSocket 是否為 WebSocket 請求*/
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {this.client = client;this.originalRequest = originalRequest;this.forWebSocket = forWebSocket;// 創建重試和重定向攔截器this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

構造函數用于初始化 RealCall 實例,同時創建重試和重定向攔截器。

4.3 同步請求執行方法

java

@Override
public Response execute() throws IOException {// 同步操作,確保該請求只被執行一次synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}// 標記請求開始captureCallStackTrace();// 記錄請求開始時間eventListener.callStart(this);try {// 調用 Dispatcher 的 executed 方法,將該請求添加到正在執行的同步請求隊列中client.dispatcher().executed(this);// 調用 getResponseWithInterceptorChain 方法獲取響應Response result = getResponseWithInterceptorChain();if (result == null) throw new IOException("Canceled");return result;} catch (IOException e) {// 記錄請求失敗事件eventListener.callFailed(this, e);throw e;} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}
}/*** 捕獲請求調用的堆棧信息*/
private void captureCallStackTrace() {Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}/*** 通過攔截器鏈獲取響應* * @return 響應對象* @throws IOException 如果請求過程中出現 I/O 異常*/
Response getResponseWithInterceptorChain() throws IOException {// 創建攔截器列表List<Interceptor> interceptors = new ArrayList<>();// 添加用戶自定義的攔截器interceptors.addAll(client.interceptors());// 添加重試和重定向攔截器interceptors.add(retryAndFollowUpInterceptor);// 添加橋接攔截器,將用戶請求轉換為符合 HTTP 協議的請求interceptors.add(new BridgeInterceptor(client.cookieJar()));// 添加緩存攔截器,處理緩存邏輯interceptors.add(new CacheInterceptor(client.internalCache()));// 添加連接攔截器,建立與服務器的連接interceptors.add(new ConnectInterceptor(client));if (!forWebSocket) {// 如果不是 WebSocket 請求,添加用戶自定義的網絡攔截器interceptors.addAll(client.networkInterceptors());}// 添加調用服務器攔截器,負責與服務器進行實際的交互interceptors.add(new CallServerInterceptor(forWebSocket));// 創建攔截器鏈RealInterceptorChain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());// 調用攔截器鏈的 proceed 方法開始處理請求return chain.proceed(originalRequest);
}

execute 方法用于同步執行請求。首先,它會檢查該請求是否已經被執行,如果已經執行則拋出異常。然后,調用 captureCallStackTrace 方法捕獲請求調用的堆棧信息,記錄請求開始時間。接著,將該請求添加到正在執行的同步請求隊列中,并調用 getResponseWithInterceptorChain 方法通過攔截器鏈獲取響應。如果請求過程中出現異常,會記錄請求失敗事件。最后,調用 finished 方法處理請求完成邏輯。

getResponseWithInterceptorChain 方法會創建一個攔截器列表,依次添加用戶自定義的攔截器、重試和重定向攔截器、橋接攔截器、緩存攔截器、連接攔截器、網絡攔截器和調用服務器攔截器。然后創建一個 RealInterceptorChain 實例,并調用其 proceed 方法開始處理請求。

4.4 異步請求執行方法

java

@Override
public void enqueue(Callback responseCallback) {// 同步操作,確保該請求只被執行一次synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}// 標記請求開始captureCallStackTrace();// 記錄請求開始時間eventListener.callStart(this);// 創建 AsyncCall 實例,用于異步執行請求AsyncCall asyncCall = new AsyncCall(responseCallback);// 調用 Dispatcher 的 enqueue 方法,將異步請求添加到調度隊列中client.dispatcher().enqueue(asyncCall);
}// AsyncCall 類,繼承自 NamedRunnable,用于異步執行請求
final class AsyncCall extends NamedRunnable {private final Callback responseCallback;/*** 構造函數,初始化 AsyncCall 實例* * @param responseCallback 響應回調函數*/AsyncCall(Callback responseCallback) {super("OkHttp %s", redactedUrl());this.responseCallback = responseCallback;}/*** 獲取請求的主機地址* * @return 請求的主機地址*/String host() {return originalRequest.url().host();}/*** 獲取原始請求對象* * @return 原始請求對象*/Request request() {return originalRequest;}/*** 獲取 RealCall 實例* * @return RealCall 實例*/RealCall get() {return RealCall.this;}@Overrideprotected void execute() {boolean signalledCallback = false;try {// 調用 getResponseWithInterceptorChain 方法獲取響應Response response = getResponseWithInterceptorChain();// 檢查請求是否被取消if (retryAndFollowUpInterceptor.isCanceled()) {signalledCallback = true;// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {signalledCallback = true;// 調用回調函數的 onResponse 方法responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {// 記錄請求失敗事件eventListener.callFailed(RealCall.this, e);// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, e);}} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}}
}

enqueue 方法用于異步執行請求。首先,它會檢查該請求是否已經被執行,如果已經執行則拋出異常。然后,調用 captureCallStackTrace 方法捕獲請求調用的堆棧信息,記錄請求開始時間。接著,創建一個 AsyncCall 實例,并將其添加到調度隊列中。

AsyncCall 類繼承自 NamedRunnable,用于異步執行請求。在 execute 方法中,它會調用 getResponseWithInterceptorChain 方法獲取響應,并根據請求是否被取消調用回調函數的 onFailureonResponse 方法。如果請求過程中出現異常,會記錄請求失敗事件。最后,調用 finished 方法處理請求完成邏輯。

五、線程池的使用和管理

5.1 線程池的創建和配置

java

// 在 Dispatcher 類中
public synchronized ExecutorService executorService() {if (executorService == null) {executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;
}

這里使用了 ThreadPoolExecutor 來創建線程池,具體配置如下:

  • 核心線程數:0,表示線程池在沒有任務時不會保留任何線程。
  • 最大線程數Integer.MAX_VALUE,表示線程池可以創建的最大線程數沒有限制。
  • 空閑線程存活時間:60 秒,當線程空閑超過 60 秒時,會被銷毀。
  • 任務隊列SynchronousQueue,這是一個沒有容量的阻塞隊列,每個插入操作必須等待另一個線程的移除操作,反之亦然。這意味著線程池會立即為新任務創建線程,直到達到最大線程數。
  • 線程工廠Util.threadFactory("OkHttp Dispatcher", false),用于創建線程,線程名稱為 OkHttp Dispatcher

5.2 線程池的任務執行和管理

Dispatcher 類的 enqueue 方法中,當滿足條件時,會將異步請求添加到正在

接著繼續分析

六、任務調度的并發控制原理

6.1 并發控制的整體邏輯

OkHttp 的任務調度模塊通過 Dispatcher 類實現了精細的并發控制,主要體現在對最大并發請求數(maxRequests)和每個主機的最大并發請求數(maxRequestsPerHost)的限制上。這兩個參數的設置可以避免過多的請求同時發起,從而防止系統資源被過度占用,確保應用的穩定性和性能。

6.2 最大并發請求數的控制

Dispatcher 類的 enqueue 方法中,會檢查當前正在執行的異步請求數量是否超過 maxRequests

java

synchronized void enqueue(AsyncCall call) {// 檢查當前正在執行的異步請求數量和每個主機的并發請求數量是否滿足條件if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {// 如果滿足條件,將請求添加到正在執行的異步請求隊列中runningAsyncCalls.add(call);// 使用線程池執行請求executorService().execute(call);} else {// 如果不滿足條件,將請求添加到準備執行的異步請求隊列中readyAsyncCalls.add(call);}
}

runningAsyncCalls 的大小小于 maxRequests 時,說明當前并發請求數量未達到上限,新的請求可以被立即執行;否則,新請求會被放入 readyAsyncCalls 隊列中等待。

6.3 每個主機的最大并發請求數的控制

runningCallsForHost 方法用于計算指定異步請求對應的主機的正在執行的請求數量:

java

private int runningCallsForHost(AsyncCall call) {int result = 0;// 遍歷正在執行的異步請求隊列for (AsyncCall c : runningAsyncCalls) {if (c.host().equals(call.host())) {// 如果主機相同,則計數器加 1result++;}}return result;
}

enqueue 方法中,除了檢查最大并發請求數,還會調用 runningCallsForHost 方法檢查當前主機的并發請求數量是否超過 maxRequestsPerHost。如果超過,則新請求同樣會被放入 readyAsyncCalls 隊列中等待。

6.4 并發控制的動態調整

Dispatcher 類提供了 setMaxRequestssetMaxRequestsPerHost 方法,允許開發者動態調整最大并發請求數和每個主機的最大并發請求數:

java

public void setMaxRequests(int maxRequests) {if (maxRequests < 1) {throw new IllegalArgumentException("maxRequests < 1: " + maxRequests);}synchronized (this) {this.maxRequests = maxRequests;}promoteCalls();
}public void setMaxRequestsPerHost(int maxRequestsPerHost) {if (maxRequestsPerHost < 1) {throw new IllegalArgumentException("maxRequestsPerHost < 1: " + maxRequestsPerHost);}synchronized (this) {this.maxRequestsPerHost = maxRequestsPerHost;}promoteCalls();
}

在調整這些參數后,會調用 promoteCalls 方法,嘗試從 readyAsyncCalls 隊列中取出滿足條件的請求執行,以確保并發控制的動態調整能夠及時生效。

七、任務排隊和優先級處理

7.1 任務排隊機制

當新的請求不滿足并發條件時,會被添加到 readyAsyncCalls 隊列中等待執行。readyAsyncCalls 是一個 ArrayDeque,它是一個雙端隊列,具有先進先出(FIFO)的特性。這意味著先進入隊列的請求會先被處理,保證了請求的公平性。

7.2 任務優先級處理

OkHttp 本身并沒有直接提供任務優先級處理的機制,所有請求默認按照 FIFO 的順序執行。不過,開發者可以通過自定義攔截器或者對請求進行包裝,來實現簡單的優先級處理。例如,可以為每個請求添加一個優先級標記,在 Dispatcher 類的 promoteCalls 方法中,根據優先級從 readyAsyncCalls 隊列中選擇請求執行。

以下是一個簡單的示例,展示如何實現基于優先級的任務調度:

java

// 自定義一個帶有優先級的 AsyncCall 類
class PriorityAsyncCall extends AsyncCall {private final int priority;public PriorityAsyncCall(Callback responseCallback, int priority) {super(responseCallback);this.priority = priority;}public int getPriority() {return priority;}
}// 修改 Dispatcher 類的 promoteCalls 方法
private void promoteCalls() {if (runningAsyncCalls.size() >= maxRequests) return; if (readyAsyncCalls.isEmpty()) return; // 對準備執行的隊列按照優先級排序List<PriorityAsyncCall> sortedCalls = new ArrayList<>();for (AsyncCall call : readyAsyncCalls) {if (call instanceof PriorityAsyncCall) {sortedCalls.add((PriorityAsyncCall) call);}}sortedCalls.sort((c1, c2) -> Integer.compare(c2.getPriority(), c1.getPriority()));for (PriorityAsyncCall call : sortedCalls) {if (runningCallsForHost(call) < maxRequestsPerHost) {readyAsyncCalls.remove(call);runningAsyncCalls.add(call);executorService().execute(call);}if (runningAsyncCalls.size() >= maxRequests) return; }
}

在這個示例中,我們自定義了一個 PriorityAsyncCall 類,為每個請求添加了一個優先級標記。然后修改了 Dispatcher 類的 promoteCalls 方法,在從 readyAsyncCalls 隊列中選擇請求時,按照優先級進行排序,優先執行優先級高的請求。

八、任務調度與攔截器鏈的協同工作

8.1 攔截器鏈的作用

OkHttp 的攔截器鏈是一個強大的機制,它允許開發者在請求發送和響應返回的過程中插入自定義的邏輯。攔截器鏈由多個攔截器組成,每個攔截器都可以對請求進行修改、記錄日志、處理緩存等操作。

8.2 任務調度與攔截器鏈的結合點

RealCall 類的 executeenqueue 方法中,都會調用 getResponseWithInterceptorChain 方法來獲取響應。這個方法會創建一個攔截器鏈,并依次調用每個攔截器的 intercept 方法:

java

Response getResponseWithInterceptorChain() throws IOException {// 創建攔截器列表List<Interceptor> interceptors = new ArrayList<>();// 添加用戶自定義的攔截器interceptors.addAll(client.interceptors());// 添加重試和重定向攔截器interceptors.add(retryAndFollowUpInterceptor);// 添加橋接攔截器,將用戶請求轉換為符合 HTTP 協議的請求interceptors.add(new BridgeInterceptor(client.cookieJar()));// 添加緩存攔截器,處理緩存邏輯interceptors.add(new CacheInterceptor(client.internalCache()));// 添加連接攔截器,建立與服務器的連接interceptors.add(new ConnectInterceptor(client));if (!forWebSocket) {// 如果不是 WebSocket 請求,添加用戶自定義的網絡攔截器interceptors.addAll(client.networkInterceptors());}// 添加調用服務器攔截器,負責與服務器進行實際的交互interceptors.add(new CallServerInterceptor(forWebSocket));// 創建攔截器鏈RealInterceptorChain chain = new RealInterceptorChain(interceptors, null, null, null, 0,originalRequest, this, eventListener, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());// 調用攔截器鏈的 proceed 方法開始處理請求return chain.proceed(originalRequest);
}

在任務調度的過程中,攔截器鏈會在請求執行的各個階段發揮作用。例如,緩存攔截器可以在請求發送之前檢查緩存,如果緩存命中,則直接返回緩存的響應,避免了不必要的網絡請求;重試和重定向攔截器可以在請求失敗或者需要重定向時,自動進行重試或者重定向操作。

8.3 攔截器對任務調度的影響

攔截器的執行時間和邏輯會影響任務調度的性能。如果某個攔截器的執行時間過長,會導致整個請求的處理時間增加,從而影響后續請求的執行。因此,在編寫攔截器時,需要注意優化攔截器的邏輯,避免在攔截器中進行耗時的操作。

九、異常處理和重試機制

9.1 異常處理

RealCall 類的 executeenqueue 方法中,都會對請求過程中可能出現的異常進行捕獲和處理。例如,在 execute 方法中:

java

@Override
public Response execute() throws IOException {// 同步操作,確保該請求只被執行一次synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}// 標記請求開始captureCallStackTrace();// 記錄請求開始時間eventListener.callStart(this);try {// 調用 Dispatcher 的 executed 方法,將該請求添加到正在執行的同步請求隊列中client.dispatcher().executed(this);// 調用 getResponseWithInterceptorChain 方法獲取響應Response result = getResponseWithInterceptorChain();if (result == null) throw new IOException("Canceled");return result;} catch (IOException e) {// 記錄請求失敗事件eventListener.callFailed(this, e);throw e;} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}
}

如果在請求過程中出現 IOException,會記錄請求失敗事件,并將異常拋出。在 AsyncCall 類的 execute 方法中,也會對異常進行類似的處理:

java

@Override
protected void execute() {boolean signalledCallback = false;try {// 調用 getResponseWithInterceptorChain 方法獲取響應Response response = getResponseWithInterceptorChain();// 檢查請求是否被取消if (retryAndFollowUpInterceptor.isCanceled()) {signalledCallback = true;// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {signalledCallback = true;// 調用回調函數的 onResponse 方法responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {// 記錄請求失敗事件eventListener.callFailed(RealCall.this, e);// 調用回調函數的 onFailure 方法responseCallback.onFailure(RealCall.this, e);}} finally {// 調用 Dispatcher 的 finished 方法,處理請求完成邏輯client.dispatcher().finished(this);}
}

9.2 重試機制

OkHttp 的重試機制主要由 RetryAndFollowUpInterceptor 類實現。該攔截器會在請求失敗或者需要重定向時,自動進行重試或者重定向操作。在 RetryAndFollowUpInterceptor 類的 intercept 方法中,會對響應的狀態碼進行檢查,如果需要重試或者重定向,會重新發起請求:

java

@Override public Response intercept(Chain chain) throws IOException {Request request = chain.request();StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),createAddress(request.url()), call, eventListener, callStackTrace);this.streamAllocation = streamAllocation;int followUpCount = 0;Response priorResponse = null;while (true) {if (canceled) {streamAllocation.release();throw new IOException("Canceled");}Response response;boolean releaseConnection = true;try {response = realChain.proceed(request, streamAllocation, null, null);releaseConnection = false;} catch (RouteException e) {// 處理路由異常,嘗試重試if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {throw e.getLastConnectException();}releaseConnection = false;continue;} catch (IOException e) {// 處理其他 I/O 異常,嘗試重試boolean requestSendStarted = !(e instanceof ConnectionShutdownException);if (!recover(e, streamAllocation, requestSendStarted, request)) {throw e;}releaseConnection = false;continue;} finally {if (releaseConnection) {streamAllocation.streamFailed(null);streamAllocation.release();}}// 處理重定向Request followUp = followUpRequest(response, streamAllocation.route());if (followUp == null) {if (!forWebSocket) {streamAllocation.release();}return response;}closeQuietly(response.body());if (++followUpCount > MAX_FOLLOW_UPS) {streamAllocation.release();throw new ProtocolException("Too many follow-up requests: " + followUpCount);}if (followUp.body() instanceof UnrepeatableRequestBody) {streamAllocation.release();throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());}if (!sameConnection(response, followUp.url())) {streamAllocation.release();streamAllocation = new StreamAllocation(client.connectionPool(),createAddress(followUp.url()), call, eventListener, callStackTrace);this.streamAllocation = streamAllocation;} else if (streamAllocation.codec() != null) {throw new IllegalStateException("Closing the body of " + response+ " didn't close its backing stream. Bad interceptor?");}request = followUp;priorResponse = response;}
}

在這個方法中,會捕獲 RouteException 和其他 IOException,并調用 recover 方法嘗試重試。同時,會檢查響應的狀態碼,如果需要重定向,會生成一個新的請求并重新發起。

十、任務調度模塊的性能優化和注意事項

10.1 性能優化建議

  • 合理配置線程池:線程池的配置對任務調度的性能有很大影響。可以根據應用的實際情況,調整線程池的核心線程數、最大線程數和空閑線程存活時間,避免過多的線程創建和銷毀帶來的性能開銷。
  • 優化攔截器邏輯:攔截器的執行時間會影響請求的處理速度。在編寫攔截器時,需要注意優化攔截器的邏輯,避免在攔截器中進行耗時的操作。
  • 控制并發請求數量:合理設置 maxRequestsmaxRequestsPerHost 參數,避免過多的并發請求對服務器和系統資源造成過大的壓力。

10.2 注意事項

  • 線程安全問題Dispatcher 類中的部分方法使用了 synchronized 關鍵字進行同步,以確保線程安全。在使用和擴展任務調度模塊時,需要注意線程安全問題,避免出現數據不一致的情況。
  • 異常處理:在請求過程中可能會出現各種異常,需要在代碼中進行充分的異常處理,避免應用崩潰。同時,要注意對異常信息的記錄和分析,以便及時發現和解決問題。
  • 資源管理:在請求完成后,需要及時釋放相關的資源,如網絡連接、緩存等,避免資源泄漏。

十一、總結

OkHttp 的任務調度模塊通過 Dispatcher 類實現了高效的任務調度和并發控制。它利用線程池來執行異步請求,通過隊列來管理請求的排隊和執行順序,同時結合攔截器鏈實現了請求的預處理、緩存處理和重試重定向等功能。在使用 OkHttp 的任務調度模塊時,開發者需要深入理解其原理和機制,合理配置參數,優化代碼邏輯,以確保應用的性能和穩定性。隨著網絡技術的不斷發展,OkHttp 的任務調度模塊也可能會不斷優化和改進,為開發者提供更強大、更靈活的功能。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/72945.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/72945.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/72945.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

復現無人機的項目,項目名稱為Evidential Detection and Tracking Collaboration

項目名稱為Evidential Detection and Tracking Collaboration&#xff0c;主要用于強大的反無人機系統&#xff0c;涉及新問題、基準和算法研究。下面介紹項目的復現步驟&#xff1a; 安裝環境&#xff1a;使用Anaconda創建并激活名為edtc的虛擬環境&#xff0c;Python版本為3…

QwQ-32B 開源!本地部署+微調教程來了

今天&#xff0c;通義千問開源了推理模型QwQ-32B QwQ-32B 在一系列基準測試中進行了評估&#xff0c;測試了數學推理、編程能力和通用能力。以下結果展示了 QwQ-32B 與其他領先模型的性能對比&#xff0c;包括 DeepSeek-R1-Distilled-Qwen-32B、DeepSeek-R1-Distilled-Llama-7…

如何利用 Excel 表格實現精準文件批量重命名教程

在處理大量文件時&#xff0c;有時需要根據特定規則對文件名進行調整。如果您的文件名和新名稱之間存在一對多的關系&#xff0c;并且這種關系可以通過 Excel 表格來管理&#xff0c;那么使用“簡鹿文件批量重命名”軟件中的“匹配對應名稱命名”功能將是一個高效的選擇。接下來…

開關模式電源轉換器 EMI/EMC 的集成仿真

介紹 在電力電子領域&#xff0c;電磁干擾 &#xff08;EMI&#xff09; 和電磁兼容性 &#xff08;EMC&#xff09; 問題可以決定設計的成敗。開關模式電源轉換器雖然高效且緊湊&#xff0c;但卻是電磁噪聲的常見來源&#xff0c;可能會對附近的組件和系統造成嚴重破壞。隨著…

Android 藍牙工具類封裝:支持經典藍牙與 BLE,兼容高版本權限

為了優化經典藍牙&#xff08;Classic Bluetooth&#xff09;和低功耗藍牙&#xff08;Bluetooth Low Energy, BLE&#xff09;的操作&#xff0c;我們可以將功能封裝到一個工具類中&#xff0c;支持掃描、連接、通信&#xff0c;并兼容高版本 Android 的動態權限申請。以下是完…

STM32 CAN模塊原理與應用詳解

目錄 概述 一、CAN模塊核心原理 1. CAN協議基礎 2. STM32 CAN控制器結構 3. 波特率配置 二、CAN模塊配置步驟&#xff08;基于HAL庫&#xff09; 1. 初始化CAN外設 2. 配置過濾器 3. 啟動CAN通信 三、數據收發實現 1. 發送數據幀 2. 接收數據幀&#xff08;中斷方式…

PostgreSQL_安裝部署

一、Windows系統下安裝 1.下載安裝包 登錄PostgreSQL: Downloads官網&#xff1a; 選擇14.12版本&#xff0c;點擊下載&#xff1a; 2.安裝PostgrSQL14.12 雙擊exe安裝包程序&#xff0c;準備安裝&#xff1a; 選擇安裝路徑&#xff1a; 選擇想安裝的工具&#xff1a; 選擇數…

init arry的作用,可以沒有init arry嘛?(面試題)

https://bbs.kanxue.com/thread-282657.htm 對init_array段調用的方法進行Hook https://bbs.kanxue.com/thread-191092.htm init_array原理簡單說明 https://bbs.kanxue.com/thread-280135.htm frida hook init_array自吐新解 init_array 的作用&#xff0c;以及是否可以沒有 i…

藍橋杯真題0團建dfs+哈希表/鄰接表

dfs鄰接表儲存或者哈希表的運用&#xff0c;考察我們對數據的存儲 本題核心就是在求從根節點開始的兩棵樹相同的最長序列&#xff0c;首先確定用dfs進行深搜&#xff0c;對于節點的形式可以用鄰接表&#xff0c;鄰接矩陣&#xff0c;哈希表來進行存儲數據。下面看代碼 鄰接表 …

使用 AIStor、MLflow 和 KServe 將模型部署到 Kubernetes

在之前幾篇關于 MLOps 工具的文章中&#xff0c;我展示了有多少流行的 MLOps 工具跟蹤與模型訓練實驗相關的指標。我還展示了他們如何使用 MinIO 來存儲作為模型訓練管道一部分的非結構化數據。但是&#xff0c;一個好的 MLOps 工具應該做的不僅僅是管理您的實驗、數據集和模型…

kali linux web掃描工具

Kali Linux是一款專為網絡安全領域而打造的操作系統&#xff0c;提供了眾多優秀的安全工具&#xff0c;其中就包括了強大的web掃描工具。Web掃描是網絡安全檢測的一個重要環節&#xff0c;它可以幫助安全專家檢測網站的漏洞&#xff0c;提升網站的安全性。 Kali Linux中集成了…

Linux losetup循環設備

好的&#xff0c;以下是命令的中文解釋和使用步驟&#xff1a; 命令解釋&#xff1a; losetup -r /dev/loop0 /system/app.bin&#xff1a; losetup 是一個用于將文件與循環設備&#xff08;loop device&#xff09;關聯的命令。-r 選項表示將循環設備設置為只讀模式。/dev/lo…

【js逆向】

地址&#xff1a;aHR0cHM6Ly93d3cud2VpYm90b3AuY24vMi4wLw f12進入 debugger&#xff0c;過debugger 查看預覽數據 全局搜索 請求網址中的 api.weibotop.cn 在下方疑似找到了加密和解密的函數 斷點調試 控制臺輸出 那個n就是 常見的 cryptoJs庫 const cryptoJs require(cry…

1.Intel BIOS 開發指南詳細介紹

1. 引言 目的: Intel BIOS 開發指南旨在為開發者提供詳細的指導,幫助他們理解和實現 Intel 平臺上的 BIOS 功能。 適用對象: 適用于希望開發、調試和優化 BIOS 的硬件工程師、軟件工程師和系統集成商。 版本信息: 確保你使用的是最新版本的指南,以獲取最新的信息和最佳實…

deepseek在pycharm中的配置和簡單應用

對于最常用的調試python腳本開發環境pycharm&#xff0c;如何接入deepseek是我們窺探ai代碼編寫的第一步&#xff0c;熟悉起來總沒壞處。 1、官網安裝pycharm社區版&#xff08;免費&#xff09;&#xff0c;如果需要安裝專業版&#xff0c;需要另外找破解碼。 2、安裝Ollama…

【論文閱讀】多模態——LSeg

文獻基本信息 標題&#xff1a;Language-Driven Semantic Segmentation作者&#xff1a;Boyi Li、Kilian Q. Weinberger、Serge Belongie、Vladlen Koltun、Ren Ranftl單位&#xff1a;Cornell University、University of Copenhagen、Apple、Intel Labs會議/期刊&#xff1a;…

【MySQL基礎-1】MySQL 用戶管理指南:創建用戶、修改密碼與權限分配

MySQL 作為廣泛使用的關系型數據庫管理系統&#xff0c;用戶管理和權限分配是其核心功能之一。合理創建用戶、修改密碼以及分配權限&#xff0c;不僅能保障數據庫的安全性&#xff0c;還能有效控制用戶的操作范圍。本文將詳細介紹如何在 MySQL 中創建用戶、修改用戶密碼以及分配…

影刀RPA編碼版與流程版解析

影刀RPA編碼版是影刀RPA的一個高級版本&#xff0c;它結合了流程版的可視化操作和編碼版的強大靈活性&#xff0c;以下是對影刀RPA編碼版的詳細介紹&#xff1a; 1. 功能對比 流程版&#xff1a; 可視化操作&#xff1a;通過拖拽式流程設計器&#xff0c;用戶可以像搭積木一樣…

20天 - TCP 和 UDP 有什么區別?說說 TCP 的三次握手?TCP 是用來解決什么問題?

TCP 和 UDP 有什么區別&#xff1f; TCP&#xff08;傳輸控制協議&#xff09;和 UDP&#xff08;用戶數據報協議&#xff09;都是傳輸層的網絡協議&#xff0c;它們的主要區別如下&#xff1a; 連接方式 TCP&#xff1a;面向連接的協議&#xff0c;類似于打電話&#xff0c…

【MySQL_05】語法簡述(是語法,不詳細介紹各種語句)

文章目錄 一、基本規則二、標識符規則三、數據類型四、運算符五、關鍵字六、SQL 語句的通用語法結構 歷史文章點擊&#x1f449;&#xff1a;SQL &#x1f408;??github&#xff1a;https://github.com/mysql &#x1f4bb;官網&#xff1a; https://www.mysql.com &#…