前言
了解了 Channel 的基礎概念和基本使用 后,我們再來看一看 Channel 的特性以及高階應用。
Channel 是"熱流"
熱流概念
Channel 是熱流(Hot Stream),具備以下特性:
-
數據的生產和消費是兩套獨立的流程
-
即使沒有消費者,生產者也會繼續生產數據
而在日常開發中常用到的 Flow
則是 冷流(Cold Stream) ,冷流只有在有訂閱者時才開始產生數據。關于冷流的詳細內容,在后續 Flow 相關的文章中再進行深入介紹。
代碼示例:
suspend fun channelHot() {val channel = Channel<String>(Channel.BUFFERED)val sender = GlobalScope.launch {for (i in 1..5) {delay(100)channel.send(i.toString())println("\u001B[32m[生產者] 數據發送完畢: $i\u001B[0m")}}joinAll(sender)println("\u001B[36m ----結束---- \u001B[0m")
}
可以看到,雖然沒有消費者,生產者也能繼續發送數據。
Channel 的一對多、多對多應用模式
在上一篇 Kotlin Channel基礎使用 中,我們的示例都是一對一的,即單個生產者向單個消費者發送數據。實際上,Channel
還支持實現一對多,多對一,多對多等模式
扇出模式(一對多)- 負載均衡
概念: 單個生產者發送數據,多個消費者競爭接收數據,每個數據只會被一個消費者處理。
特點:
-
每個數據只被一個消費者處理
-
消費者之間形成競爭關系
-
實現了自然的負載均衡
應用場景: 任務分發、負載均衡、并行處理
代碼示例:
suspend fun singleProducerMultipleConsumers() {val channel = Channel<Int>(Channel.BUFFERED)// 單個生產者val producer = GlobalScope.launch {println("\u001B[32m[生產者] 開始生產任務...\u001B[0m")for (i in 1..10) {println("\u001B[32m[生產者] 發送任務: $i\u001B[0m")channel.send(i)delay(30) // 模擬生產時間}channel.close()println("\u001B[32m[生產者] 所有任務已發送完畢\u001B[0m")}// 多個消費者 - 每個消費者使用不同顏色val consumerColors = listOf("\u001B[34m", "\u001B[36m", "\u001B[31m") // 藍色、青色、紅色val consumers = List(3) { consumerId ->GlobalScope.launch {val color = consumerColors[consumerId]println("${color}[消費者-$consumerId] 準備接收任務...\u001B[0m")for (task in channel) {println("${color}[消費者-$consumerId] 處理任務: $task\u001B[0m")delay(300) // 模擬處理時間println("${color}[消費者-$consumerId] 任務 $task 處理完成\u001B[0m")}println("${color}[消費者-$consumerId] 沒有更多任務,退出\u001B[0m")}}// 等待所有協程完成joinAll(producer, *consumers.toTypedArray())println("\u001B[36m ----結束---- \u001B[0m")
}
注意,是多個消費者 “瓜分” 數據
扇入模式(多對一)- 數據聚合
概念: 多個生產者向同一個 Channel
發送數據,單個消費者接收所有數據。
特點:
-
多個數據源匯聚到一個處理點
-
消費者按接收順序處理所有數據
應用場景: 日志收集、數據匯總、多源數據聚合、監控指標收集
suspend fun multipleProducersSingleConsumer() {// 創建一個共享的Channel,用于接收來自多個生產者的數據val sharedChannel = Channel<String>(capacity = 10)// 啟動多個生產者協程 val producerColors = listOf("\u001B[32m", "\u001B[33m", "\u001B[35m") // 綠色、黃色、洋紅色val producers = List(3) { producerId ->GlobalScope.launch {val color = producerColors[producerId]repeat(5) { messageId ->val message = "Producer-$producerId: Message-$messageId"println("${color}[生產者$producerId] 發送: $message\u001B[0m")sharedChannel.send(message)delay(Random.nextLong(100, 500)) // 隨機延遲模擬不同的生產速度}println("${color}[生產者$producerId] 完成發送\u001B[0m")}}// 啟動單個消費者協程val consumer = GlobalScope.launch {var receivedCount = 0for (message in sharedChannel) {delay(200)println("\u001B[34m[消費者] 處理: $message 完畢\u001B[0m")receivedCount++}println("\u001B[34m[消費者] 完成接收,共處理 $receivedCount 條消息\u001B[0m")}// 等待所有生產者完成producers.joinAll()println("\u001B[36m所有生產者已完成\u001B[0m")// 關閉Channel,這樣消費者的for循環才能正常結束sharedChannel.close()// 等待消費者完成consumer.join()println("\u001B[36m ----結束---- \u001B[0m")
}
多對多模式 - 高并發處理
概念: 多個生產者和多個消費者同時工作,結合了扇出和扇入的特點。
特點:
-
具備負載均衡能力(多消費者競爭)
-
支持高并發數據生產(多生產者)
-
通過緩沖區優化資源使用
-
適合處理大量并發任務
應用場景: 消息隊列系統、任務分發系統、高并發數據處理
suspend fun multipleProducersMultipleConsumers() {// 創建一個共享的Channelval sharedChannel = Channel<String>(capacity = 5)val producerColors = listOf("\u001B[32m", "\u001B[33m") // 綠色、黃色val producers = List(2) { producerId ->GlobalScope.launch {val color = producerColors[producerId]repeat(8) { messageId ->val message = "Producer-$producerId: Task-$messageId"println("${color}[生產者$producerId] 發送任務: $message\u001B[0m")sharedChannel.send(message)delay(Random.nextLong(200, 600))}println("${color}[生產者$producerId] 完成任務發送\u001B[0m")}}// 啟動多個消費者協程 val consumerColors = listOf("\u001B[34m", "\u001B[36m", "\u001B[31m") // 藍色、青色、紅色val consumers = List(3) { consumerId ->GlobalScope.launch {val color = consumerColors[consumerId]var processedCount = 0try {for (message in sharedChannel) {println("${color}[消費者$consumerId] 處理任務: $message\u001B[0m")delay(Random.nextLong(300, 800)) // 模擬任務處理時間processedCount++println("${color}[消費者$consumerId] 完成任務: $message\u001B[0m")}} catch (e: ClosedReceiveChannelException) {println("${color}[消費者$consumerId] Channel已關閉,共處理 $processedCount 個任務\u001B[0m")}}}// 等待所有生產者完成producers.joinAll()println("\u001B[36m所有生產者已完成,正在關閉Channel...\u001B[0m")// 關閉ChannelsharedChannel.close()// 等待所有消費者完成consumers.joinAll()println("\u001B[36m ----結束---- \u001B[0m")
}
對比
應用模式 | 生產者 | 消費者 | 數據流向 | 特點 | 應用場景 |
---|---|---|---|---|---|
扇出模式 | 1個 | 多個 | 一對多 | 自動負載均衡,任務并行處理 | 工作隊列、任務分發、并行計算 |
扇入模式 | 多個 | 1個 | 多對一 | 數據聚合,統一處理點 | 日志收集、數據匯總、監控指標聚合 |
多對多模式 | 多個 | 多個 | 多對多 | 最大化并發能力,結合扇出扇入優勢 | 消息隊列、高并發數據處理、分布式系統 |
Select 表達式
Select 概念與特性
Select 可以同時等待多個掛起操作,并處理最先完成的那個操作,在處理多個不確定耗時的異步任務時非常有用。
特性:
-
非阻塞選擇:同時監聽多個 Channel
-
競爭機制:處理最先到達的結果
Select 基本使用
suspend fun selectFastestResponse() {// 創建多個Channel模擬不同的數據源val apiChannel = Channel<String>()val databaseChannel = Channel<String>()val cacheChannel = Channel<String>()println("\u001B[36m[系統] 啟動多數據源查詢競爭...\u001B[0m")// 模擬API調用GlobalScope.launch {delay(Random.nextLong(500, 1500))val result = "API響應: 用戶數據已獲取"println("\u001B[33m[API任務] 完成查詢: $result\u001B[0m")apiChannel.send(result)}// 模擬數據庫查詢GlobalScope.launch {delay(Random.nextLong(800, 2000))val result = "數據庫響應: 查詢結果已返回"println("\u001B[31m[數據庫任務] 完成查詢: $result\u001B[0m")databaseChannel.send(result)}// 模擬緩存查詢GlobalScope.launch {delay(Random.nextLong(100, 800))val result = "緩存響應: 緩存命中,數據已返回"println("\u001B[32m[緩存任務] 完成查詢: $result\u001B[0m")cacheChannel.send(result)}// 使用select選擇最快的響應val startTime = System.currentTimeMillis()val fastestResult = select<String> {apiChannel.onReceive { result ->println("\u001B[33m[獲勝者] API最快響應!\u001B[0m")result}databaseChannel.onReceive { result ->println("\u001B[31m[獲勝者] 數據庫最快響應!\u001B[0m")result}cacheChannel.onReceive { result ->println("\u001B[32m[獲勝者] 緩存最快響應!\u001B[0m")result}}val endTime = System.currentTimeMillis()println("\u001B[36m[結果] 最快響應: $fastestResult\u001B[0m")println("\u001B[36m[性能] 響應時間: ${endTime - startTime}ms\u001B[0m")// 關閉所有ChannelapiChannel.close()databaseChannel.close()cacheChannel.close()println("\u001B[36m ----結束---- \u001B[0m")
}
Select 取消機制優化
在實際生產環境中,我們通常希望在獲得最快響應后立即取消其他任務,以節省系統資源:
suspend fun selectFastestResponseWithCancel() {// 創建三個不同任務的channelval apiChannel = Channel<String>()val databaseChannel = Channel<String>()val cacheChannel = Channel<String>()println("\u001B[36m[系統] 開始執行多個異步任務,選擇最快響應并取消其他任務...\u001B[0m")// 模擬API調用任務val apiTask = GlobalScope.launch {try {val responseTime = Random.nextLong(300, 1200) // 隨機響應時間println("\u001B[33m[API任務] 開始調用遠程API,預計耗時: ${responseTime}ms\u001B[0m")delay(responseTime)val result = "API響應數據: {userId: 456, name: 'Alice'}"apiChannel.send(result)println("\u001B[33m[API任務] 調用完成,耗時: ${responseTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[33m[API任務] 任務被取消\u001B[0m")throw e}}// 模擬數據庫查詢任務val databaseTask = GlobalScope.launch {try {val queryTime = Random.nextLong(400, 1000) // 隨機查詢時間println("\u001B[31m[數據庫任務] 開始查詢數據庫,預計耗時: ${queryTime}ms\u001B[0m")delay(queryTime)val result = "數據庫查詢結果: {userId: 456, name: 'Alice', email: 'alice@example.com'}"databaseChannel.send(result)println("\u001B[31m[數據庫任務] 查詢完成,耗時: ${queryTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[31m[數據庫任務] 任務被取消\u001B[0m")throw e}}// 模擬緩存查詢任務val cacheTask = GlobalScope.launch {try {val cacheTime = Random.nextLong(100, 400) // 隨機緩存訪問時間println("\u001B[32m[緩存任務] 開始查詢緩存,預計耗時: ${cacheTime}ms\u001B[0m")delay(cacheTime)val result = "緩存數據: {userId: 888, name: '喻志強', cached: true}"cacheChannel.send(result)println("\u001B[32m[緩存任務] 查詢完成,耗時: ${cacheTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[32m[緩存任務] 任務被取消\u001B[0m")throw e}}// 使用select表達式選擇最快響應的任務val startTime = System.currentTimeMillis()val result = select<String> {apiChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] API任務最先響應!耗時: ${elapsedTime}ms\u001B[0m")println("\u001B[36m[系統] 獲取到數據: $data\u001B[0m")data}databaseChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 數據庫任務最先響應!耗時: ${elapsedTime}ms\u001B[0m")println("\u001B[36m[系統] 獲取到數據: $data\u001B[0m")data}cacheChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 緩存任務最先響應!耗時: ${elapsedTime}ms\u001B[0m")println("\u001B[36m[系統] 獲取到數據: $data\u001B[0m")data}}println("\u001B[36m[系統] 選擇最快響應完成,使用結果: $result\u001B[0m")// 主動取消其他正在執行的任務println("\u001B[36m[系統] 開始取消其他未完成的任務...\u001B[0m")if (apiTask.isActive) {println("\u001B[36m[系統] 正在取消API任務...\u001B[0m")apiTask.cancel()}if (databaseTask.isActive) {println("\u001B[36m[系統] 正在取消數據庫任務...\u001B[0m")databaseTask.cancel()}if (cacheTask.isActive) {println("\u001B[36m[系統] 正在取消緩存任務...\u001B[0m")cacheTask.cancel()}// 關閉channelsapiChannel.close()databaseChannel.close()cacheChannel.close()println("\u001B[36m ----結束---- \u001B[0m")
}
Select 超時處理機制
在某些場景中,我們需要為異步操作設置超時限制,避免無限等待影響系統性能。Select 表達式結合 withTimeoutOrNull
可以優雅地實現超時控制機制。
關鍵點:
- 時間控制:通過
withTimeoutOrNull
設置最大等待時間 - 優雅降級:超時時返回 null,可以實現降級策略
- 資源清理:超時后自動取消所有未完成的任務
- 用戶體驗:避免用戶長時間等待,提供及時反饋
suspend fun selectWithTimeout() {// 創建三個不同任務的channelval apiChannel = Channel<String>()val databaseChannel = Channel<String>()val cacheChannel = Channel<String>()println("\u001B[36m[系統] 啟動帶超時控制的多任務查詢(超時時間: 100毫秒)...\u001B[0m")println("\u001B[36m[系統] 實際應用場景:Web API響應、數據庫查詢、緩存查詢等\u001B[0m")// 模擬API調用任務(可能較慢)val apiTask = GlobalScope.launch {try {val responseTime = Random.nextLong(800, 2000) // 可能超時的響應時間println("\u001B[33m[API任務] 開始調用遠程API,預計耗時: ${responseTime}ms\u001B[0m")delay(responseTime)val result = "API響應: {userId: 789, name: 'Bob', source: 'remote'}"apiChannel.send(result)println("\u001B[33m[API任務] 調用完成,耗時: ${responseTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[33m[API任務] 任務被取消(超時處理)\u001B[0m")throw e}}// 模擬數據庫查詢任務(可能較慢)val databaseTask = GlobalScope.launch {try {val queryTime = Random.nextLong(600, 1800) // 可能超時的查詢時間println("\u001B[31m[數據庫任務] 開始查詢數據庫,預計耗時: ${queryTime}ms\u001B[0m")delay(queryTime)val result = "數據庫響應: {userId: 789, name: 'Bob', email: 'bob@example.com', source: 'database'}"databaseChannel.send(result)println("\u001B[31m[數據庫任務] 查詢完成,耗時: ${queryTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[31m[數據庫任務] 任務被取消(超時處理)\u001B[0m")throw e}}// 模擬緩存查詢任務(通常較快)val cacheTask = GlobalScope.launch {try {val cacheTime = Random.nextLong(200, 1200) // 緩存訪問時間println("\u001B[32m[緩存任務] 開始查詢緩存,預計耗時: ${cacheTime}ms\u001B[0m")delay(cacheTime)val result = "緩存響應: {userId: 666, name: 'XeonYu', cached: true, source: 'cache'}"cacheChannel.send(result)println("\u001B[32m[緩存任務] 查詢完成,耗時: ${cacheTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[32m[緩存任務] 任務被取消(超時處理)\u001B[0m")throw e}}// 使用withTimeoutOrNull實現超時控制val startTime = System.currentTimeMillis()val result = withTimeoutOrNull(100) { //設置超時時間select<String> {apiChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] API任務響應成功!耗時: ${elapsedTime}ms\u001B[0m")data}databaseChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 數據庫任務響應成功!耗時: ${elapsedTime}ms\u001B[0m")data}cacheChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 緩存任務響應成功!耗時: ${elapsedTime}ms\u001B[0m")data}}}val totalTime = System.currentTimeMillis() - startTime// 處理超時情況if (result != null) {println("\u001B[36m[系統] 獲取到數據: $result\u001B[0m")println("\u001B[36m[系統] 總響應時間: ${totalTime}ms(在超時限制內)\u001B[0m")} else {println("\u001B[36m[系統] 所有任務均超時,啟用降級策略\u001B[0m")println("\u001B[36m[系統] 返回默認數據: {userId: 000, name: 'Unknown', source: 'default'}\u001B[0m")}// 取消所有未完成的任務println("\u001B[36m[系統] 清理資源,取消未完成的任務...\u001B[0m")apiTask.cancel()databaseTask.cancel()cacheTask.cancel()// 關閉channelsapiChannel.close()databaseChannel.close()cacheChannel.close()println("\u001B[36m ----結束---- \u001B[0m")
}
把超時時間設置為 1 秒再看看執行結果
CoroutineScope.produce & CoroutineScope.actor
除了直接通過頂層函數 Channel()
創建通道,Kotlin 還提供了 produce
和 actor
兩個構建器函數。
為什么需要 produce 和 actor?
核心原因在于返回值類型的差異。
返回值類型對比
1. Channel() 函數
Channel()
函數返回 Channel<E>
類型的一個實例
Channel
繼承了 SendChannel<E>
和 ReceiveChannel<E>
接口,既可發送又可接收。
2. produce 構建器
produce
返回 ReceiveChannel<E>
,只能接收數據。
3. actor 構建器
actor
返回 SendChannel<E>
,只能發送數據。
Channel 三層接口架構
Channel
接口的設計非常巧妙,采用了三層接口分離結合泛型逆變和協變的設計:
// 發送端接口 - 逆變泛型,只能發送 E 或其子類型
public interface SendChannel<in E>// 接收端接口 - 協變泛型,只能接收 E 或其父類型
public interface ReceiveChannel<out E>// 雙向通道接口 - 不變泛型,既能發送又能接收 E
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
好處:
-
接口隔離原則(ISP):發送和接收功能完全分離,避免接口污染
-
類型安全保障:通過協變和逆變確保數據流向的類型安全
-
職責清晰分離:不同角色只需關注相關接口,降低復雜度
實際應用場景
在架構設計中,當你需要向團隊提供 API 時,往往希望限制調用者的操作權限,如果直接暴露 Channel
類型,調用者可以同時進行發送和接收操作,這可能違背你的設計意圖,因為你沒辦法控制調用者的行為,通過返回受限的接口類型,就可以 精確控制調用者的操作范圍
produce 構建器示例
當你希望調用者只能接收數據時:
// 架構開發者:使用produce構建器封裝數據流處理邏輯
fun createDataStream(): ReceiveChannel<String> {println("\u001B[35m[架構層] 使用produce構建器封裝復雜的數據處理邏輯\u001B[0m")return GlobalScope.produce(capacity = 3) {// 架構層復雜邏輯:數據獲取、轉換、驗證等repeat(5) { batch ->// 模擬復雜的數據處理流程val rawData = "raw_data_$batch"val processedData = "DataBatch-$batch: [validated, transformed, enriched]"println("\u001B[35m[架構層] 處理原始數據: $rawData -> $processedData\u001B[0m")send(processedData) // 只有架構層可以發送數據delay(200)}// produce自動管理資源清理println("\u001B[35m[架構層] produce自動清理內部資源\u001B[0m")}
}suspend fun produceBuilderExample() {// 業務開發者:獲得受限的ReceiveChannel接口val dataStream: ReceiveChannel<String> = createDataStream()println("\u001B[34m[業務開發者] 獲得ReceiveChannel,只能接收數據\u001B[0m")// 業務開發者只能進行接收操作println("\u001B[34m[業務層] 開始處理數據流\u001B[0m")for (data in dataStream) {println("\u001B[34m[業務層] 接收并處理: $data\u001B[0m")delay(100)}
}
actor 構建器示例
當你希望調用者只能發送數據時
// 使用actor構建器封裝數據處理邏輯
fun createDataProcessor(): SendChannel<String> {println("\u001B[35m[架構層] 使用actor構建器封裝數據處理管道\u001B[0m")return GlobalScope.actor(capacity = 5) {// 架構層復雜邏輯:數據接收、驗證、存儲等var processedCount = 0for (rawData in channel) { // actor內部可以接收數據processedCount++// 模擬復雜的數據處理流程val validatedData = "Validated: $rawData"val transformedData = "Transformed: $validatedData"val storedData = "Stored: $transformedData [ID: $processedCount]"println("\u001B[35m[架構層] 處理數據管道: $rawData -> $storedData\u001B[0m")delay(150) // 模擬處理時間// 這里可以進行數據庫存儲、文件寫入等操作println("\u001B[35m[架構層] 數據處理完成: $storedData\u001B[0m")}// actor自動管理資源清理println("\u001B[35m[架構層] actor自動清理內部資源,共處理 $processedCount 條數據\u001B[0m")}
}suspend fun actorBuilderExample() {// 業務開發者:獲得受限的SendChannel接口val dataProcessor: SendChannel<String> = createDataProcessor()println("\u001B[34m[業務開發者] 獲得SendChannel,只能發送數據\u001B[0m")// 業務開發者只能進行發送操作println("\u001B[34m[業務層] 開始發送數據到處理管道\u001B[0m")repeat(6) { index ->val businessData = "BusinessData-$index: {orderId: ${1000 + index}, amount: ${(index + 1) * 100}}"println("\u001B[34m[業務層] 發送業務數據: $businessData\u001B[0m")dataProcessor.send(businessData)delay(80)}// 業務層完成數據發送后關閉通道dataProcessor.close()println("\u001B[34m[業務層] 數據發送完成,關閉通道\u001B[0m")}
produce vs actor 對比
特性 | produce | actor |
---|---|---|
返回類型 | ReceiveChannel<E> | SendChannel<E> |
外部操作 | 只能接收數據 | 只能發送數據 |
內部邏輯 | 生產數據并發送 | 接收數據并處理 |
數據流向 | 內部 → 外部 | 外部 → 內部 |
適用場景 | 數據生成、API 封裝 | 數據處理、消息處理 |
接口隔離 | 隱藏生產邏輯 | 隱藏處理邏輯 |
類型安全 | 協變 out E | 逆變 in E |
總結
關于熱流這個特性
Channel 是熱流,這意味著什么?簡單說就是生產者不等人。不管有沒有消費者在那兒等著,生產者該干嘛干嘛,數據照樣往Channel
里塞。
這個特性在實際開發中特別有用。比如你在做日志收集系統,不管有沒有消費日志,日志數據肯定是持續產生的, Channel
的熱流特性正好匹配到這個場景。
幾種應用模式的實際體驗
扇出模式(一對多):這個應用場景是比較多的,特別是在處理任務隊列的時候。一個生產者瘋狂塞任務,多個消費者搶著處理,天然的負載均衡。不用你寫什么復雜的分發邏輯,Channel
自己就搞定了。
扇入模式(多對一):日志收集的時候經常用到。多個服務往一個 Channel 里扔日志,一個消費者統一處理。簡單粗暴,但很有效。
多對多模式:這個就是前兩種的結合體,適合高并發場景。不過說實話,復雜度也上去了,一般也不會用到。
Select 表達式:這個還是非常實用的,可以同時等到多個 Channel
,誰先有數據就處理誰。配合超時機制,可以做很多有趣的事情。比如同時查緩存、數據庫、API,誰快用誰的,其他的直接取消。
produce 和 actor 的設計思路
剛開始我也不理解為什么要搞這兩個構建器,直接用 Channel()
不香嗎?后來在實際項目中才體會到接口隔離的重要性。
想象一下,你給團隊提供一個數據流 API,如果直接返回 Channel
,別人既能往里發數據,又能從里面取數據。這不就亂套了嗎?
用 produce
返回 ReceiveChannel
,別人只能取數據;用 actor
返回 SendChannel
,別人只能發數據。權限控制得明明白白。
這種設計在大型項目中特別有用,可以避免很多不必要的 bug。
Channel 到 Flow
學完 Channel 之后,你會發現它解決了很多并發場景的問題。但是 Channel 作為熱流,有個問題就是資源消耗。即使沒有消費者,生產者也在那兒工作,這在某些場景下是浪費的。
這時候就該 Flow(冷流) 出場了。Flow
只有在被訂閱的時候才開始工作,更節省資源。而且 Flow 有更豐富的操作符,可以做各種數據變換。
而且 kotlin
還提供了 StaredFlow
和 StateFlow
這兩個非常實用的數據流,因此,在日常開發中,我們可能更多的是選擇 Flow
。
掌握了 Channel
,再去學 Flow
會輕松很多。因為很多概念是相通的,只是應用場景不同而已。
好了, 本篇文章就是這些,希望能幫到你。
感謝閱讀,如果對你有幫助請三連(點贊、收藏、加關注)支持。有任何疑問或建議,歡迎在評論區留言討論。如需轉載,請注明出處:喻志強的博客