private suspend fun fetchDataConcurrently(list: MutableList<MyType>,onRequestResult: (Int, List<MyType>?) -> Unit //高階函數回調) {val deferredList = mutableListOf<Deferred<MyType?>>()// 設定任務超時時間為12秒,并使用 async 并發執行請求withTimeoutOrNull(12_000L) {Log.d(TAG, "request size:${list.size}")for ((index, item) in list.withIndex()) {val deferred = async { //對每個item都發起一次異步請求, 這里是并發的請求//通過callbackChannel來傳遞結果,參數UNLIMITED為無限緩沖,具體的在下面擴展有講val callbackChannel = Channel<MyType?>(Channel.UNLIMITED)SDKInstance(item.id,object : SDKCallback() {override fun onSuccess(children: List<SDKType>,) {super.onSuccess(children)Log.d(TAG, "success name: ${item.name}")val item = MyType(item.name, item.id)item.list = childrencallbackChannel.trySend(item).isSuccess}override fun onError() {super.onError()callbackChannel.trySend(null).isFailureLog.d(TAG, "error name: ${item.name}")}})callbackChannel.receive()}deferredList.add(deferred)}}// 等待所有請求完成val resultData = mutableListOf<MyType>()var requestSituation: Int = REQUEST_TIME_DEFAULT //超時情況記錄for (deferred in deferredList) {try {val result = deferred.await() //這里的await就是等待異步任務完成result?.let {resultData.add(it)requestSituation = REQUEST_TIME_NORMAL}} catch (e: Exception) {// 處理任務異常Log.d(TAG, "error: ${e}, isTimeOut = $requestSituation")if (requestSituation != REQUEST_TIME_NORMAL) { //如果有數據返回成功就無需記錄超時requestSituation = REQUEST_TIME_TIMEOUT //如果所有數據獲取超時,需要反饋異常}}}Log.d(TAG, "response size: ${resultData.size}")if(requestSituation == REQUEST_TIME_TIMEOUT) {onRequestResult(REQ_ERROR, null)} else if (resultData.isEmpty()) {onRequestResult(REQ_NO_DATA, null)} else {if (list.size - resultData.size > Math.max((list.size - 1) / 2, 1) && resultData.size < 5) {onRequestResult(REQ_ERROR, null)} else {onRequestResult(REQ_SUCCESS, resultData)myData.applyPut { cache ->cache[MyKey] = resultData //這里使用了LruCache,以后再講}}}}
擴展:
Channel在這段代碼中的作用
- 橋接api與協程:將傳統的回調式API(SDK的回調)轉換為協程友好的異步操作
- 同步時序:確保在SDK回調后,協程能夠繼續執行
- 結果傳遞:將回調結果傳遞回主協程流程
潛在問題
- 使用無限緩沖可能不必要,因為channel開啟在for循環中,一次只需要接收一個結果
- channel沒有被顯式關閉,可能導致資源泄漏
try{
//回調處理
...
} finally {callbackChannel.close() //確保關閉
}
Channel是什么?
它是Kotlin協程中的一個并發通信原語,用于在不同協程之間安全的傳遞數據。類似阻塞隊列,但完全基于協程的非阻塞特性實現。
它是協程間通信的強大工具,特別適合將回調式API轉換為掛起函數,使異步代碼更線性易讀。
Channel的基本特點
生產者-消費者模式:一個協程發送數據,另一個協程接收數據
線程安全:內部已處理好線程同步的問題
可掛起:當Channel滿或空時,發生和接收操作會掛起協程而非阻塞線程
Channel在以上代碼中的時序關系
-
創建channel:在每次async任務中創建一個channel
-
SDK回調:當收到SDK回調,成功獲取數據時,使用trySend發送數據,失敗時使用trySend發送null
-
接收結果:通過callbackChannel.receive()等待SDK回調
關鍵時序點:receive會掛起協程,直到SDK回調觸發并發送數據到Channel
Channel的常見用法
- 創建Channel
//創建有緩沖的Channel
val channel = Channel<T>(capacity)//capacity
//RENDEZVOUS(默認,無緩沖)
//UNLIMITED(無限緩沖,MAX_VALUE)
//CONFLATED(只保留最小值)
//具體數字(固定緩沖大小)
- 發送數據
//常規發送(可能掛起)
channel.send(data)//嘗試發送(不掛起)
channel.trySend(data).isSuccess
- 接收數據
//常規接收(可能掛起)
val data = channel.receive()//嘗試接收(不掛起)
val data = channel.tryReceive().getOrNull()
- 關閉Channel
channel.close() //發送結束信號, 防止資源泄漏
關于這段代碼的優化寫法
private suspend fun fetchDataConcurrently(list: MutableList<MyType>,onRequestResult: (Int, List<MyType>?) -> Unit
) {val resultData = mutableListOf<MyType>()var requestSituation = REQUEST_TIME_DEFAULTtry {withTimeout(12_000L) {val deferredResults = list.map { item ->async {try {val result = suspendCancellableCoroutine<MyType?> { continuation ->val callback = object : SDKCallback() {override fun onSuccess(children: List<SDKType>,) {val item= MyType(item.name, item.id).apply {list = children}continuation.resume(item)}override fun onError() {continuation.resume(null)}}continuation.invokeOnCancellation {// 如果協程被取消,可以在這里取消SDK請求// 需要SDK支持取消操作}SDKInstance(item.id, callback)}result} catch (e: Exception) {null}}}deferredResults.forEach { deferred ->deferred.await()?.let {resultData.add(it)requestSituation = REQUEST_TIME_NORMAL}}}} catch (e: TimeoutCancellationException) {if (requestSituation != REQUEST_TIME_NORMAL) {requestSituation = REQUEST_TIME_TIMEOUT}Log.w(TAG, "Request timeout: ${e.message}")} catch (e: Exception) {Log.e(TAG, "Unexpected error: ${e.message}", e)}// 結果處理邏輯保持不變when {requestSituation == REQUEST_TIME_TIMEOUT -> {onRequestResult(REQ_ERROR, null)}resultData.isEmpty() -> {onRequestResult(REQ_NO_DATA, null)}list.size - resultData.size > maxOf(list.size / 2, 1) && resultData.size < 5 -> {onRequestResult(REQ_ERROR, null)}else -> {onRequestResult(REQ_SUCCESS, resultData)myData.applyPut { cache ->cache[myKey] = resultData}}}
}
優化點說明
-
替換Channel為suspendCancellableCoroutine:
更直接地將回調API轉換為掛起函數
避免了Channel資源管理問題
-
改進資源管理:
使用invokeOnCancellation處理協程取消
確保所有可能的異常都被捕獲
-
緩沖策略優化:
完全移除了不必要的Channel緩沖
使用更直接的協程控制流
-
錯誤處理增強:
明確區分超時和其他異常
更好的日志記錄
核心知識點
-
協程與回調的轉換:
suspendCancellableCoroutine將回調API轉換為掛起函數協程取消處理機制
-
結構化并發:
withTimeout創建有時間限制的作用域async/await并發模式
-
資源管理:
協程取消時的清理工作異常處理邊界
-
并發控制:
多個請求的并行執行結果的聚合處理
-
狀態管理:
請求狀態的跟蹤(REQUEST_TIME_NORMAL/TIMEOUT)結果的成功/失敗判定邏輯