Kotlin Flow 是 Kotlin 協程庫中的一個組件,它提供了處理異步數據流的能力。Kotlin Flow 類似于 RxJava 中的 Observable,但它完全基于 Kotlin 協程設計,使得異步流的操作變得更加簡單和直觀。
Flow 是冷流(cold stream),意味著它并不會在有收集器開始收集之前開始發射數據。這與 RxJava 中的熱流(hot stream)相反,后者在沒有觀察者的情況下也會開始發射數據。
使用 Flow 的關鍵好處包括:
- 簡化異步編程:通過 Flow,可以用順序的方式編寫異步代碼。
- 背壓支持:Flow 自然支持背壓(back-pressure),可以應對快速發射元素的場景。
- 靈活的操作符:Flow 提供了豐富的操作符(如
map
、filter
、zip
、combine
等)來轉換和組合數據流。 - 協程友好:Flow 完美融入協程的上下文管理,使得取消和異常處理變得更加容易。
示例代碼
創建一個簡單的 Flow:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*fun simpleFlow(): Flow<Int> = flow {for (i in 1..3) {delay(100) // 假設這是計算一個值的過程emit(i) // 發射值}
}fun main() = runBlocking<Unit> {simpleFlow().collect { value -> // 用 collect 方法收集流println(value)}
}
上面的例子中,simpleFlow
函數返回了一個 Flow<Int>
,當收集器開始收集時,它將逐個發射整數值。emit
函數用于發射值,collect
函數用來收集流。
操作符
Flow 提供了一系列操作符來轉換和處理數據流:
fun main() = runBlocking<Unit> {simpleFlow().filter { it % 2 == 0 } // 只接收偶數.map { it * it } // 將每個值平方.collect { println(it) }
}
異常處理
Flow 的異常處理可通過 catch
操作符來完成:
fun main() = runBlocking<Unit> {simpleFlow().catch { e -> println("Caught exception: $e") } // 捕獲異常.collect { println(it) }
}
回壓策略
Flow 可以通過各種構建器和操作符來處理回壓問題,例如 buffer
、conflate
和 collectLatest
。
組合多個流
Flow 提供了 zip
和 combine
等操作符來組合多個流:
fun main() = runBlocking<Unit> {val flowA = flowOf("A", "B", "C")val flowB = flowOf(1, 2, 3)flowA.zip(flowB) { a, b -> "$a$b" }.collect { println(it) } // 輸出 "A1", "B2", "C3"
}
SharedFlow 和 StateFlow
Flow 還有兩個特殊的子類型,SharedFlow
和 StateFlow
,分別用于更高級的用例:
SharedFlow
:一種熱流,它允許將數據多次廣播到多個收集器。StateFlow
:一個特殊的SharedFlow
,它總是保持當前狀態的值,并且只廣播最新的值給新的收集器。
Kotlin Flow 通過這些功能,提供了一種聲明式的方式來處理異步數據流,使得協程中的異步編程更加靈活和強大。