Kotlin 協程之Channel 的高階應用

在這里插入圖片描述

前言

了解了 Channel 的基礎概念和基本使用 后,我們再來看一看 Channel 的特性以及高階應用。


Channel 是"熱流"

熱流概念

Channel 是熱流(Hot Stream),具備以下特性:

  • 數據的生產和消費是兩套獨立的流程

  • 即使沒有消費者,生產者也會繼續生產數據

而在日常開發中常用到的 Flow 則是 冷流(Cold Stream) ,冷流只有在有訂閱者時才開始產生數據。關于冷流的詳細內容,在后續 Flow 相關的文章中再進行深入介紹。

代碼示例:

suspend fun channelHot() {val channel = Channel<String>(Channel.BUFFERED)val sender = GlobalScope.launch {for (i in 1..5) {delay(100)channel.send(i.toString())println("\u001B[32m[生產者] 數據發送完畢: $i\u001B[0m")}}joinAll(sender)println("\u001B[36m ----結束---- \u001B[0m")
}

可以看到,雖然沒有消費者,生產者也能繼續發送數據。

在這里插入圖片描述


Channel 的一對多、多對多應用模式

在上一篇 Kotlin Channel基礎使用 中,我們的示例都是一對一的,即單個生產者向單個消費者發送數據。實際上,Channel 還支持實現一對多,多對一,多對多等模式

扇出模式(一對多)- 負載均衡

概念: 單個生產者發送數據,多個消費者競爭接收數據,每個數據只會被一個消費者處理。

特點:

  • 每個數據只被一個消費者處理

  • 消費者之間形成競爭關系

  • 實現了自然的負載均衡

應用場景: 任務分發、負載均衡、并行處理

代碼示例:

suspend fun singleProducerMultipleConsumers() {val channel = Channel<Int>(Channel.BUFFERED)// 單個生產者val producer = GlobalScope.launch {println("\u001B[32m[生產者] 開始生產任務...\u001B[0m")for (i in 1..10) {println("\u001B[32m[生產者] 發送任務: $i\u001B[0m")channel.send(i)delay(30) // 模擬生產時間}channel.close()println("\u001B[32m[生產者] 所有任務已發送完畢\u001B[0m")}// 多個消費者 - 每個消費者使用不同顏色val consumerColors = listOf("\u001B[34m", "\u001B[36m", "\u001B[31m") // 藍色、青色、紅色val consumers = List(3) { consumerId ->GlobalScope.launch {val color = consumerColors[consumerId]println("${color}[消費者-$consumerId] 準備接收任務...\u001B[0m")for (task in channel) {println("${color}[消費者-$consumerId] 處理任務: $task\u001B[0m")delay(300) // 模擬處理時間println("${color}[消費者-$consumerId] 任務 $task 處理完成\u001B[0m")}println("${color}[消費者-$consumerId] 沒有更多任務,退出\u001B[0m")}}// 等待所有協程完成joinAll(producer, *consumers.toTypedArray())println("\u001B[36m ----結束---- \u001B[0m")
}

在這里插入圖片描述

注意,是多個消費者 “瓜分” 數據

扇入模式(多對一)- 數據聚合

概念: 多個生產者向同一個 Channel 發送數據,單個消費者接收所有數據。

特點:

  • 多個數據源匯聚到一個處理點

  • 消費者按接收順序處理所有數據

應用場景: 日志收集、數據匯總、多源數據聚合、監控指標收集

suspend fun multipleProducersSingleConsumer() {// 創建一個共享的Channel,用于接收來自多個生產者的數據val sharedChannel = Channel<String>(capacity = 10)// 啟動多個生產者協程 val producerColors = listOf("\u001B[32m", "\u001B[33m", "\u001B[35m") // 綠色、黃色、洋紅色val producers = List(3) { producerId ->GlobalScope.launch {val color = producerColors[producerId]repeat(5) { messageId ->val message = "Producer-$producerId: Message-$messageId"println("${color}[生產者$producerId] 發送: $message\u001B[0m")sharedChannel.send(message)delay(Random.nextLong(100, 500)) // 隨機延遲模擬不同的生產速度}println("${color}[生產者$producerId] 完成發送\u001B[0m")}}// 啟動單個消費者協程val consumer = GlobalScope.launch {var receivedCount = 0for (message in sharedChannel) {delay(200)println("\u001B[34m[消費者] 處理: $message 完畢\u001B[0m")receivedCount++}println("\u001B[34m[消費者] 完成接收,共處理 $receivedCount 條消息\u001B[0m")}// 等待所有生產者完成producers.joinAll()println("\u001B[36m所有生產者已完成\u001B[0m")// 關閉Channel,這樣消費者的for循環才能正常結束sharedChannel.close()// 等待消費者完成consumer.join()println("\u001B[36m ----結束---- \u001B[0m")
}

在這里插入圖片描述

多對多模式 - 高并發處理

概念: 多個生產者和多個消費者同時工作,結合了扇出和扇入的特點。

特點:

  • 具備負載均衡能力(多消費者競爭)

  • 支持高并發數據生產(多生產者)

  • 通過緩沖區優化資源使用

  • 適合處理大量并發任務

應用場景: 消息隊列系統、任務分發系統、高并發數據處理

suspend fun multipleProducersMultipleConsumers() {// 創建一個共享的Channelval sharedChannel = Channel<String>(capacity = 5)val producerColors = listOf("\u001B[32m", "\u001B[33m") // 綠色、黃色val producers = List(2) { producerId ->GlobalScope.launch {val color = producerColors[producerId]repeat(8) { messageId ->val message = "Producer-$producerId: Task-$messageId"println("${color}[生產者$producerId] 發送任務: $message\u001B[0m")sharedChannel.send(message)delay(Random.nextLong(200, 600))}println("${color}[生產者$producerId] 完成任務發送\u001B[0m")}}// 啟動多個消費者協程 val consumerColors = listOf("\u001B[34m", "\u001B[36m", "\u001B[31m") // 藍色、青色、紅色val consumers = List(3) { consumerId ->GlobalScope.launch {val color = consumerColors[consumerId]var processedCount = 0try {for (message in sharedChannel) {println("${color}[消費者$consumerId] 處理任務: $message\u001B[0m")delay(Random.nextLong(300, 800)) // 模擬任務處理時間processedCount++println("${color}[消費者$consumerId] 完成任務: $message\u001B[0m")}} catch (e: ClosedReceiveChannelException) {println("${color}[消費者$consumerId] Channel已關閉,共處理 $processedCount 個任務\u001B[0m")}}}// 等待所有生產者完成producers.joinAll()println("\u001B[36m所有生產者已完成,正在關閉Channel...\u001B[0m")// 關閉ChannelsharedChannel.close()// 等待所有消費者完成consumers.joinAll()println("\u001B[36m ----結束---- \u001B[0m")
}

在這里插入圖片描述

對比

應用模式生產者消費者數據流向特點應用場景
扇出模式1個多個一對多自動負載均衡,任務并行處理工作隊列、任務分發、并行計算
扇入模式多個1個多對一數據聚合,統一處理點日志收集、數據匯總、監控指標聚合
多對多模式多個多個多對多最大化并發能力,結合扇出扇入優勢消息隊列、高并發數據處理、分布式系統

Select 表達式

Select 概念與特性

Select 可以同時等待多個掛起操作,并處理最先完成的那個操作,在處理多個不確定耗時的異步任務時非常有用。

特性:

  • 非阻塞選擇:同時監聽多個 Channel

  • 競爭機制:處理最先到達的結果

Select 基本使用

suspend fun selectFastestResponse() {// 創建多個Channel模擬不同的數據源val apiChannel = Channel<String>()val databaseChannel = Channel<String>()val cacheChannel = Channel<String>()println("\u001B[36m[系統] 啟動多數據源查詢競爭...\u001B[0m")// 模擬API調用GlobalScope.launch {delay(Random.nextLong(500, 1500))val result = "API響應: 用戶數據已獲取"println("\u001B[33m[API任務] 完成查詢: $result\u001B[0m")apiChannel.send(result)}// 模擬數據庫查詢GlobalScope.launch {delay(Random.nextLong(800, 2000))val result = "數據庫響應: 查詢結果已返回"println("\u001B[31m[數據庫任務] 完成查詢: $result\u001B[0m")databaseChannel.send(result)}// 模擬緩存查詢GlobalScope.launch {delay(Random.nextLong(100, 800))val result = "緩存響應: 緩存命中,數據已返回"println("\u001B[32m[緩存任務] 完成查詢: $result\u001B[0m")cacheChannel.send(result)}// 使用select選擇最快的響應val startTime = System.currentTimeMillis()val fastestResult = select<String> {apiChannel.onReceive { result ->println("\u001B[33m[獲勝者] API最快響應!\u001B[0m")result}databaseChannel.onReceive { result ->println("\u001B[31m[獲勝者] 數據庫最快響應!\u001B[0m")result}cacheChannel.onReceive { result ->println("\u001B[32m[獲勝者] 緩存最快響應!\u001B[0m")result}}val endTime = System.currentTimeMillis()println("\u001B[36m[結果] 最快響應: $fastestResult\u001B[0m")println("\u001B[36m[性能] 響應時間: ${endTime - startTime}ms\u001B[0m")// 關閉所有ChannelapiChannel.close()databaseChannel.close()cacheChannel.close()println("\u001B[36m ----結束---- \u001B[0m")
}

在這里插入圖片描述

Select 取消機制優化

在實際生產環境中,我們通常希望在獲得最快響應后立即取消其他任務,以節省系統資源:

suspend fun selectFastestResponseWithCancel() {// 創建三個不同任務的channelval apiChannel = Channel<String>()val databaseChannel = Channel<String>()val cacheChannel = Channel<String>()println("\u001B[36m[系統] 開始執行多個異步任務,選擇最快響應并取消其他任務...\u001B[0m")// 模擬API調用任務val apiTask = GlobalScope.launch {try {val responseTime = Random.nextLong(300, 1200) // 隨機響應時間println("\u001B[33m[API任務] 開始調用遠程API,預計耗時: ${responseTime}ms\u001B[0m")delay(responseTime)val result = "API響應數據: {userId: 456, name: 'Alice'}"apiChannel.send(result)println("\u001B[33m[API任務] 調用完成,耗時: ${responseTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[33m[API任務] 任務被取消\u001B[0m")throw e}}// 模擬數據庫查詢任務val databaseTask = GlobalScope.launch {try {val queryTime = Random.nextLong(400, 1000) // 隨機查詢時間println("\u001B[31m[數據庫任務] 開始查詢數據庫,預計耗時: ${queryTime}ms\u001B[0m")delay(queryTime)val result = "數據庫查詢結果: {userId: 456, name: 'Alice', email: 'alice@example.com'}"databaseChannel.send(result)println("\u001B[31m[數據庫任務] 查詢完成,耗時: ${queryTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[31m[數據庫任務] 任務被取消\u001B[0m")throw e}}// 模擬緩存查詢任務val cacheTask = GlobalScope.launch {try {val cacheTime = Random.nextLong(100, 400) // 隨機緩存訪問時間println("\u001B[32m[緩存任務] 開始查詢緩存,預計耗時: ${cacheTime}ms\u001B[0m")delay(cacheTime)val result = "緩存數據: {userId: 888, name: '喻志強', cached: true}"cacheChannel.send(result)println("\u001B[32m[緩存任務] 查詢完成,耗時: ${cacheTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[32m[緩存任務] 任務被取消\u001B[0m")throw e}}// 使用select表達式選擇最快響應的任務val startTime = System.currentTimeMillis()val result = select<String> {apiChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] API任務最先響應!耗時: ${elapsedTime}ms\u001B[0m")println("\u001B[36m[系統] 獲取到數據: $data\u001B[0m")data}databaseChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 數據庫任務最先響應!耗時: ${elapsedTime}ms\u001B[0m")println("\u001B[36m[系統] 獲取到數據: $data\u001B[0m")data}cacheChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 緩存任務最先響應!耗時: ${elapsedTime}ms\u001B[0m")println("\u001B[36m[系統] 獲取到數據: $data\u001B[0m")data}}println("\u001B[36m[系統] 選擇最快響應完成,使用結果: $result\u001B[0m")// 主動取消其他正在執行的任務println("\u001B[36m[系統] 開始取消其他未完成的任務...\u001B[0m")if (apiTask.isActive) {println("\u001B[36m[系統] 正在取消API任務...\u001B[0m")apiTask.cancel()}if (databaseTask.isActive) {println("\u001B[36m[系統] 正在取消數據庫任務...\u001B[0m")databaseTask.cancel()}if (cacheTask.isActive) {println("\u001B[36m[系統] 正在取消緩存任務...\u001B[0m")cacheTask.cancel()}// 關閉channelsapiChannel.close()databaseChannel.close()cacheChannel.close()println("\u001B[36m ----結束---- \u001B[0m")
}

在這里插入圖片描述

Select 超時處理機制

在某些場景中,我們需要為異步操作設置超時限制,避免無限等待影響系統性能。Select 表達式結合 withTimeoutOrNull可以優雅地實現超時控制機制。

關鍵點:

  1. 時間控制:通過 withTimeoutOrNull 設置最大等待時間
  2. 優雅降級:超時時返回 null,可以實現降級策略
  3. 資源清理:超時后自動取消所有未完成的任務
  4. 用戶體驗:避免用戶長時間等待,提供及時反饋
suspend fun selectWithTimeout() {// 創建三個不同任務的channelval apiChannel = Channel<String>()val databaseChannel = Channel<String>()val cacheChannel = Channel<String>()println("\u001B[36m[系統] 啟動帶超時控制的多任務查詢(超時時間: 100毫秒)...\u001B[0m")println("\u001B[36m[系統] 實際應用場景:Web API響應、數據庫查詢、緩存查詢等\u001B[0m")// 模擬API調用任務(可能較慢)val apiTask = GlobalScope.launch {try {val responseTime = Random.nextLong(800, 2000) // 可能超時的響應時間println("\u001B[33m[API任務] 開始調用遠程API,預計耗時: ${responseTime}ms\u001B[0m")delay(responseTime)val result = "API響應: {userId: 789, name: 'Bob', source: 'remote'}"apiChannel.send(result)println("\u001B[33m[API任務] 調用完成,耗時: ${responseTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[33m[API任務] 任務被取消(超時處理)\u001B[0m")throw e}}// 模擬數據庫查詢任務(可能較慢)val databaseTask = GlobalScope.launch {try {val queryTime = Random.nextLong(600, 1800) // 可能超時的查詢時間println("\u001B[31m[數據庫任務] 開始查詢數據庫,預計耗時: ${queryTime}ms\u001B[0m")delay(queryTime)val result = "數據庫響應: {userId: 789, name: 'Bob', email: 'bob@example.com', source: 'database'}"databaseChannel.send(result)println("\u001B[31m[數據庫任務] 查詢完成,耗時: ${queryTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[31m[數據庫任務] 任務被取消(超時處理)\u001B[0m")throw e}}// 模擬緩存查詢任務(通常較快)val cacheTask = GlobalScope.launch {try {val cacheTime = Random.nextLong(200, 1200) // 緩存訪問時間println("\u001B[32m[緩存任務] 開始查詢緩存,預計耗時: ${cacheTime}ms\u001B[0m")delay(cacheTime)val result = "緩存響應: {userId: 666, name: 'XeonYu', cached: true, source: 'cache'}"cacheChannel.send(result)println("\u001B[32m[緩存任務] 查詢完成,耗時: ${cacheTime}ms\u001B[0m")} catch (e: CancellationException) {println("\u001B[32m[緩存任務] 任務被取消(超時處理)\u001B[0m")throw e}}// 使用withTimeoutOrNull實現超時控制val startTime = System.currentTimeMillis()val result = withTimeoutOrNull(100) { //設置超時時間select<String> {apiChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] API任務響應成功!耗時: ${elapsedTime}ms\u001B[0m")data}databaseChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 數據庫任務響應成功!耗時: ${elapsedTime}ms\u001B[0m")data}cacheChannel.onReceive { data ->val elapsedTime = System.currentTimeMillis() - startTimeprintln("\u001B[36m[系統] 緩存任務響應成功!耗時: ${elapsedTime}ms\u001B[0m")data}}}val totalTime = System.currentTimeMillis() - startTime// 處理超時情況if (result != null) {println("\u001B[36m[系統] 獲取到數據: $result\u001B[0m")println("\u001B[36m[系統] 總響應時間: ${totalTime}ms(在超時限制內)\u001B[0m")} else {println("\u001B[36m[系統] 所有任務均超時,啟用降級策略\u001B[0m")println("\u001B[36m[系統] 返回默認數據: {userId: 000, name: 'Unknown', source: 'default'}\u001B[0m")}// 取消所有未完成的任務println("\u001B[36m[系統] 清理資源,取消未完成的任務...\u001B[0m")apiTask.cancel()databaseTask.cancel()cacheTask.cancel()// 關閉channelsapiChannel.close()databaseChannel.close()cacheChannel.close()println("\u001B[36m ----結束---- \u001B[0m")
}

在這里插入圖片描述

把超時時間設置為 1 秒再看看執行結果

在這里插入圖片描述


CoroutineScope.produce & CoroutineScope.actor

除了直接通過頂層函數 Channel() 創建通道,Kotlin 還提供了 produceactor 兩個構建器函數。

為什么需要 produce 和 actor?

核心原因在于返回值類型的差異

返回值類型對比

1. Channel() 函數
在這里插入圖片描述

Channel() 函數返回 Channel<E> 類型的一個實例

在這里插入圖片描述

Channel 繼承了 SendChannel<E>ReceiveChannel<E> 接口,既可發送又可接收

2. produce 構建器
在這里插入圖片描述

produce 返回 ReceiveChannel<E>只能接收數據

3. actor 構建器
在這里插入圖片描述

actor 返回 SendChannel<E>只能發送數據

Channel 三層接口架構

Channel 接口的設計非常巧妙,采用了三層接口分離結合泛型逆變和協變的設計:

// 發送端接口 - 逆變泛型,只能發送 E 或其子類型
public interface SendChannel<in E>// 接收端接口 - 協變泛型,只能接收 E 或其父類型
public interface ReceiveChannel<out E>// 雙向通道接口 - 不變泛型,既能發送又能接收 E
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

好處:

  • 接口隔離原則(ISP):發送和接收功能完全分離,避免接口污染

  • 類型安全保障:通過協變和逆變確保數據流向的類型安全

  • 職責清晰分離:不同角色只需關注相關接口,降低復雜度

實際應用場景

在架構設計中,當你需要向團隊提供 API 時,往往希望限制調用者的操作權限,如果直接暴露 Channel 類型,調用者可以同時進行發送和接收操作,這可能違背你的設計意圖,因為你沒辦法控制調用者的行為,通過返回受限的接口類型,就可以 精確控制調用者的操作范圍

produce 構建器示例

當你希望調用者只能接收數據時:


// 架構開發者:使用produce構建器封裝數據流處理邏輯
fun createDataStream(): ReceiveChannel<String> {println("\u001B[35m[架構層] 使用produce構建器封裝復雜的數據處理邏輯\u001B[0m")return GlobalScope.produce(capacity = 3) {// 架構層復雜邏輯:數據獲取、轉換、驗證等repeat(5) { batch ->// 模擬復雜的數據處理流程val rawData = "raw_data_$batch"val processedData = "DataBatch-$batch: [validated, transformed, enriched]"println("\u001B[35m[架構層] 處理原始數據: $rawData -> $processedData\u001B[0m")send(processedData) // 只有架構層可以發送數據delay(200)}// produce自動管理資源清理println("\u001B[35m[架構層] produce自動清理內部資源\u001B[0m")}
}suspend fun produceBuilderExample() {// 業務開發者:獲得受限的ReceiveChannel接口val dataStream: ReceiveChannel<String> = createDataStream()println("\u001B[34m[業務開發者] 獲得ReceiveChannel,只能接收數據\u001B[0m")// 業務開發者只能進行接收操作println("\u001B[34m[業務層] 開始處理數據流\u001B[0m")for (data in dataStream) {println("\u001B[34m[業務層] 接收并處理: $data\u001B[0m")delay(100)}
}

在這里插入圖片描述

actor 構建器示例

當你希望調用者只能發送數據

// 使用actor構建器封裝數據處理邏輯
fun createDataProcessor(): SendChannel<String> {println("\u001B[35m[架構層] 使用actor構建器封裝數據處理管道\u001B[0m")return GlobalScope.actor(capacity = 5) {// 架構層復雜邏輯:數據接收、驗證、存儲等var processedCount = 0for (rawData in channel) { // actor內部可以接收數據processedCount++// 模擬復雜的數據處理流程val validatedData = "Validated: $rawData"val transformedData = "Transformed: $validatedData"val storedData = "Stored: $transformedData [ID: $processedCount]"println("\u001B[35m[架構層] 處理數據管道: $rawData -> $storedData\u001B[0m")delay(150) // 模擬處理時間// 這里可以進行數據庫存儲、文件寫入等操作println("\u001B[35m[架構層] 數據處理完成: $storedData\u001B[0m")}// actor自動管理資源清理println("\u001B[35m[架構層] actor自動清理內部資源,共處理 $processedCount 條數據\u001B[0m")}
}suspend fun actorBuilderExample() {// 業務開發者:獲得受限的SendChannel接口val dataProcessor: SendChannel<String> = createDataProcessor()println("\u001B[34m[業務開發者] 獲得SendChannel,只能發送數據\u001B[0m")// 業務開發者只能進行發送操作println("\u001B[34m[業務層] 開始發送數據到處理管道\u001B[0m")repeat(6) { index ->val businessData = "BusinessData-$index: {orderId: ${1000 + index}, amount: ${(index + 1) * 100}}"println("\u001B[34m[業務層] 發送業務數據: $businessData\u001B[0m")dataProcessor.send(businessData)delay(80)}// 業務層完成數據發送后關閉通道dataProcessor.close()println("\u001B[34m[業務層] 數據發送完成,關閉通道\u001B[0m")}

在這里插入圖片描述

produce vs actor 對比

特性produceactor
返回類型ReceiveChannel<E>SendChannel<E>
外部操作只能接收數據只能發送數據
內部邏輯生產數據并發送接收數據并處理
數據流向內部 → 外部外部 → 內部
適用場景數據生成、API 封裝數據處理、消息處理
接口隔離隱藏生產邏輯隱藏處理邏輯
類型安全協變 out E逆變 in E

總結

關于熱流這個特性

Channel 是熱流,這意味著什么?簡單說就是生產者不等人。不管有沒有消費者在那兒等著,生產者該干嘛干嘛,數據照樣往Channel 里塞。

這個特性在實際開發中特別有用。比如你在做日志收集系統,不管有沒有消費日志,日志數據肯定是持續產生的, Channel 的熱流特性正好匹配到這個場景。

幾種應用模式的實際體驗

扇出模式(一對多):這個應用場景是比較多的,特別是在處理任務隊列的時候。一個生產者瘋狂塞任務,多個消費者搶著處理,天然的負載均衡。不用你寫什么復雜的分發邏輯,Channel 自己就搞定了。

扇入模式(多對一):日志收集的時候經常用到。多個服務往一個 Channel 里扔日志,一個消費者統一處理。簡單粗暴,但很有效。

多對多模式:這個就是前兩種的結合體,適合高并發場景。不過說實話,復雜度也上去了,一般也不會用到。

Select 表達式:這個還是非常實用的,可以同時等到多個 Channel,誰先有數據就處理誰。配合超時機制,可以做很多有趣的事情。比如同時查緩存、數據庫、API,誰快用誰的,其他的直接取消。

produce 和 actor 的設計思路

剛開始我也不理解為什么要搞這兩個構建器,直接用 Channel() 不香嗎?后來在實際項目中才體會到接口隔離的重要性。

想象一下,你給團隊提供一個數據流 API,如果直接返回 Channel,別人既能往里發數據,又能從里面取數據。這不就亂套了嗎?

produce 返回 ReceiveChannel,別人只能取數據;用 actor 返回 SendChannel,別人只能發數據。權限控制得明明白白

這種設計在大型項目中特別有用,可以避免很多不必要的 bug。

Channel 到 Flow

學完 Channel 之后,你會發現它解決了很多并發場景的問題。但是 Channel 作為熱流,有個問題就是資源消耗。即使沒有消費者,生產者也在那兒工作,這在某些場景下是浪費的。

這時候就該 Flow(冷流) 出場了。Flow 只有在被訂閱的時候才開始工作,更節省資源。而且 Flow 有更豐富的操作符,可以做各種數據變換。

而且 kotlin 還提供了 StaredFlowStateFlow 這兩個非常實用的數據流,因此,在日常開發中,我們可能更多的是選擇 Flow

掌握了 Channel,再去學 Flow 會輕松很多。因為很多概念是相通的,只是應用場景不同而已。


好了, 本篇文章就是這些,希望能幫到你。

感謝閱讀,如果對你有幫助請三連(點贊、收藏、加關注)支持。有任何疑問或建議,歡迎在評論區留言討論。如需轉載,請注明出處:喻志強的博客

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/94910.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/94910.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/94910.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PostgreSQL表空間(Tablespace)作用(管理數據庫對象的存儲位置)(pg_default、pg_global)

文章目錄**1. 靈活的數據存儲管理**- **邏輯與物理分離**&#xff1a;表空間為數據庫對象&#xff08;如表、索引&#xff09;提供了一個邏輯名稱與物理存儲路徑的映射。用戶無需直接操作底層文件路徑&#xff0c;只需通過表空間名稱管理數據。- **多數據庫共享表空間**&#x…

Ansible 核心運維場景落地:YUM 倉庫、SSH 公鑰、固定 IP 配置技巧

1&#xff1a;如何一次性驗證所有主機能否被 Ansible 訪問&#xff1f; 答&#xff1a;使用臨時命令&#xff1a;ansible all -m ansible.builtin.ping或驗證 sudo 是否正常&#xff1a;ansible all -m ansible.builtin.ping --become -K2&#xff1a;如何用 Ansible 統一配置…

rman導致的報錯ORA-27037: unable to obtain file status

有套3節點的11204集群環境&#xff0c;在db2上配置了rman備份&#xff0c;今天例行檢查時發現db1和db3上不定期有報錯&#xff0c;報錯如下&#xff1a;Control file backup creation failed:failure to open backup target file /u01/app/oracle/product/11.2.0/db_1/dbs/snap…

Kubernetes 與 GitOps 的深度融合實踐指南

前言&#xff1a;在云原生技術飛速發展的今天&#xff0c;Kubernetes&#xff08;簡稱 K8s&#xff09;已成為容器編排領域的事實標準&#xff0c;而 GitOps 作為一種基于 Git 的云原生運維理念&#xff0c;正與 K8s 深度融合&#xff0c;為企業實現自動化、可追溯、可審計的應…

REST-assured 接口測試編寫指南

REST-assured 簡介 REST-assured 是一個基于 Java 的 DSL&#xff08;領域特定語言&#xff09;庫&#xff0c;專門用于簡化 RESTful API 測試的編寫。它提供了流暢的 API 接口&#xff0c;使得測試代碼更加易讀易寫&#xff0c;支持 JSON 和 XML 等多種響應格式的驗證。 基本環…

內網應用如何實現外網訪問?外地通過公網地址訪問內網服務器的設置方法

一、內網應用程序在外網需要連接訪問遇到的問題我們經常需要在內網中部署服務&#xff0c;比如一個 Web 服務器或者數據庫&#xff0c;但由于本地沒有公網IP&#xff0c;這些服務無法直接從外地公網訪問。如自己家里的監控系統&#xff0c;在家時能查看&#xff0c;但出門在外就…

ubuntu24.04 QT中配置opencv4.12

假如生成的opencv路徑是&#xff1a;/usr/local/opencv4.12QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c17# You can make your code fail to compile if it uses deprecated APIs. # In order to do so, uncomment the following line. #DEFINE…

客戶端是否都需要主動發送`FindService`報文來尋找服務

<摘要> 在AUTOSAR SOME/IP-SD的服務發現流程中&#xff0c;客戶端是否需要主動發送FindService報文來尋找服務&#xff0c;是理解服務訂閱邏輯的一個關鍵。這將直接影響到事件組訂閱的觸發時機和網絡行為。下文將結合規范&#xff0c;對這一問題進行深入剖析。 <解析&…

Go語言流式輸出實戰:構建高性能實時應用

什么是流式輸出&#xff1f; 流式輸出&#xff08;Streaming Output&#xff09;是一種服務器推送技術&#xff0c;允許數據在生成過程中逐步發送到客戶端&#xff0c;而不是等待所有數據準備就緒后一次性發送。這種技術顯著改善了用戶體驗&#xff0c;特別是在處理大量數據或長…

操作系統上的Docker安裝指南:解鎖容器化新世界

摘要&#xff1a;本文詳細介紹了Docker在不同操作系統上的安裝方法。主要內容包括&#xff1a;Windows系統通過Docker Desktop安裝&#xff0c;需啟用Hyper-V和WSL2&#xff1b;Mac系統同樣使用Docker Desktop&#xff0c;根據芯片類型選擇版本&#xff1b;Linux系統以Ubuntu為…

【微信小程序】分別解決H5的跨域代理問題 和小程序正常不需要代理問題

——總問&#xff1a;何為跨域和代理&#xff1f; &#x1f539;什么叫跨域&#xff1f; 前端在瀏覽器里發請求時&#xff0c;如果 域名 / 協議 / 端口 三個中有一個不一樣&#xff0c;就會觸發 跨域問題。 例子&#xff1a; 頁面跑在 http://localhost:5173你要請求接口 http:…

數字簽名 digital signature

文章目錄1、嚴謹的定義2、技術原理&#xff1a;如何工作&#xff1f;第一步&#xff1a;發送者 - 簽名過程第二步&#xff1a;接收者 - 簽名驗證過程3、C語言實現示例4、關鍵技術要點5、安全注意事項6、最重要的應用&#xff1a;TLS/SSL 與網站安全1、嚴謹的定義 數字簽名是一…

對于STM32工程模板

工程模板文件下載鏈接 https://download.csdn.net/download/qq_58631644/91809234 重命名 打開這個文件夾 重命名保持一致 雙擊打開

使用 SmartIDE 開發長安鏈 Go 語言智能合約

文章目錄官方文檔Chrome 插件登錄 SmartIDE合約調試合約編譯官方文檔 使用SmartIDE編寫Go智能合約 Chrome 插件 https://git.chainmaker.org.cn/chainmaker/chainmaker-smartplugin/-/releases 登錄 SmartIDE https://ide.chainmaker.org.cn/ 合約調試 合約編譯

MEM課程之物流與供應鏈管理課程經典案例及分析-個人原創內容放在此保存

供應鏈管理課程案例 特殊時期期間,美國出現養豬戶對數百萬頭豬實施安樂死和奶農傾倒牛奶現象。從供應鏈的角度分析該現象并提出應對思路。要求有分析框架和文獻支撐。 供應鏈管理課程案例分析 從供應鏈角度分析特殊時期美國豬安樂死和傾倒牛奶現象 本文描述了特殊時期期間,美…

Transformer:從入門到精通

學習一個深度學習模型&#xff0c;我們首先需要從理論的角度理解它的構架&#xff0c;進而理解代碼。 Transformer背景 首先我們知道&#xff0c;神經網絡有一個巨大的家族&#xff0c;其中的CNN&#xff08;卷積神經網絡&#xff09;源于視覺研究&#xff0c;目標是讓機器自…

FOC開環控制代碼解讀

這段代碼實現了一個開環速度控制系統&#xff0c;用于控制電機轉速。它通過PWM控制器輸出電壓信號&#xff0c;來驅動電機轉動。具體來說&#xff0c;它在指定目標速度下&#xff0c;持續通過電壓信號進行控制。下面是對該代碼詳細流程的逐步解析&#xff1a; 1. 宏定義與變量初…

Ansible Playbook 調試與預演指南:從語法檢查到連通性排查

1&#xff1a;調試 playbook 時&#xff0c;最該先看哪一段輸出&#xff1f; 答&#xff1a;先查看ansible-navigator run的 PLAY RECAP 段落&#xff0c;它能一次性給出每臺受管主機的 ok、changed、unreachable、failed、skipped、rescued、ignored 等計數&#xff0c;快速定…

深入探討可視化技術如何實現安全監測

可視化在安全監測中的作用&#xff0c;遠超越了“美觀的圖表”這一表層概念。它是將抽象、混沌的安全數據轉化為直觀、可理解的視覺信息的過程&#xff0c;其核心價值在于賦能人類直覺&#xff0c;大幅提升認知與決策效率&#xff0c;從而實現對安全態勢的深度感知和快速響應。…

Scikit-learn Python機器學習 - Scikit-learn加載數據集

鋒哥原創的Scikit-learn Python機器學習視頻教程&#xff1a; 2026版 Scikit-learn Python機器學習 視頻教程(無廢話版) 玩命更新中~_嗶哩嗶哩_bilibili 課程介紹 本課程主要講解基于Scikit-learn的Python機器學習知識&#xff0c;包括機器學習概述&#xff0c;特征工程(數據…