目錄
- 前言
- 基礎
- 基本用法
- 概念與核心特點
- Android中使用
- 與LiveData對比
- 熱流StateFlow、SharedFlow
- 搜索輸入流實現實時搜索
前言
? Flow是kotlin協程庫中的一個重要組成部分,它可以按順序發送多個值,用于對異步數據流進行處理。所謂異步數據流就是連續的異步事件,如連續的網絡請求、查詢數據庫等。
? 了解LiveData的同學可能知道,使用LiveData也可以大體實現這樣的處理連續的異步事件的效果,比如說每發一次網絡請求執行一次postValue()
方法,那么我們就能通過Observer
來監聽到這個值的改變,接著去進行相應的操作,但與此同時這樣會遇到許多相應的問題,這里先按下不表,后面會介紹為什么使用Flow而不使用LiveData來對連續的異步事件進行處理。
? 同時為了方便大家理解啥場景用Flow,下面會另外實現一個搜索輸入流來實現實時搜索的Demo。
基礎
基本用法
// 生產者:發送數據
val numbersFlow = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)
}// 收集者:接收數據
lifecycleScope.launch {numbersFlow.collect { value ->Log.d("Flow", "收到:$value")}
}
打印:
收到:1
收到:2
收到:3
這是一個最簡單的Flow例子,可以看到使用生產者使用emit來發送數據,接著接收者使用collect來接收數據
概念與核心特點
上面講了這些,好像還沒具體說下Flow是個啥。
Flow是一種可以異步地發送多個值的冷流。所謂冷流就是說Flow是冷的,只有收集的時候(collect時)才會開始執行,也就是調用collect之后Flow再來生產數據再發送數據。與之對應的還有熱流,典型的就是StateFlow,它不管有沒有收集者,數據都會產生,下面再具體介紹。
Flow的核心特點:
特征 | 描述 |
---|---|
冷流 | Flow是冷的,直到被收集的時候(collect時)才會開始執行,有助于節省資源 |
支持協程 | Flow是基于協程構建的,支持協程,能在emit和collect中調用掛起函數 |
支持自動取消 | Flow可以結合協程作用域實現自動取消 Flow的收集過程是掛起的,需要運行在一個協程中。只要這個協程被取消了,Flow的收集就會自動終止 |
背壓處理 | 背壓控制:消費者來不及處理生產者發送的數據時的處理機制 內置背壓控制,不會造成UI卡頓或內存泄露 |
鏈式操作 | 可以進行鏈式操作(map、filter等) |
上面說到Flow支持鏈式操作,介紹一下Flow的常用的操作符map、filter的用法,其他感興趣的可自行Chatgpt:
fun processedFlow(): Flow<String> = flow {for (i in 1..5) {delay(500)emit(i)}
}.map { // 中間操作:轉換為字符串"Item $it"
}.filter {// 中間操作:過濾偶數it.last().toString().toInt() % 2 != 0
}
收集到的結果:
Item 1
Item 3
Item 5
Android中使用
這里拿一個網絡請求的Demo來舉例,在ViewModel進行網絡請求,在Activity中對請求結果進行收集:
// ViewModel
fun fetchUserData(): Flow<String> = flow {delay(2000)// 模擬網絡請求獲取數據emit("User Data")delay(1500)// 模擬更新數據emit("Updated Data")
}.flowOn(Dispatchers.IO) // 指定在IO線程執行// Activity
lifecycleScope.launchWhenStarted {viewModel.fetchUserData().onEach { data ->println(data)}.catch { e ->// 異常處理Toast.makeText(context, "Error: ${e.message}", Toast.LENGTH_SHORT).show()}.collect()
}
打印:
User Data
Updated Data
與LiveData對比
上面的在Android中的使用Flow來對網絡請求結果收集如果用LiveData來寫的話:
- 不使用LiveData{}
// ViewModel
private val _userData = MutableLiveData<String>()
val userData: LiveData<String> = _userDatafun fetchUserDataManual() {viewModelScope.launch {delay(2000)_userData.postValue("User Data")delay(1500)_userData.postValue("Updated Data")}
}// Activity
viewModel.userData.observe(this) { data ->println(data)
}
- 使用LiveData{}
// ViewModel
fun fetchUserData(): LiveData<String> = liveData(Dispatchers.IO) {try {delay(2000)emit("User Data")delay(1500)emit("Updated Data")} catch (e: Exception) {emit("Error: ${e.message}")}
}// Activity
viewModel.fetchUserData().observe(this) { data ->println(data)
}
可以發現好像使用LiveData也可以實現連續發送網絡請求接著進行處理,但使用LiveData時會存在以下問題與局限性:
- 感知生命周期但不支持掛起函數:無法在LiveData中使用suspend關鍵字
因為LiveData是基于觀察者模式和生命周期感知構建的,不是基于suspend的掛起函數的異步機制,所以LiveData中不支持掛起函數調用,可以使用LiveData{}構建器來讓LiveData支持調用掛起函數:
fun fetchData(): LiveData<String> = liveData(Dispatchers.IO) {val result = fetchFromNetwork() // 可以調用suspend函數emit(result)
}
- 不支持鏈式操作
Flow鏈式操作:
textFlow.debounce(500)// 停止輸入500ms才觸發一次.map { it.trim() }// 去掉空格.distinctUntilChanged()// 只有當輸入發生變化時才會接著往下執行.onEach { vm.updateKeyword(it) }// 更新.launchIn(lifecycleScope)// 啟動Flow
可以看到將一連串的處理像鏈條一樣串聯起來,LiveData并沒有這些鏈式操作符。
- 缺乏背壓控制:無法響應高頻數據流(如搜索輸入流)
- 只能有限支持異步數據流
- 組合操作符少:沒有map、filter等操作符
- 邏輯冗余:要手動處理線程、去重、防抖、錯誤捕獲等問題
使用建議:
- 如果只需要簡單的、生命周期安全的 UI 數據綁定,LiveData 是輕量選擇。
- 如果需要復雜的異步流、響應式變換、防抖、取消處理等,Flow 更合適。
熱流StateFlow、SharedFlow
常見的熱流有StateFlow和SharedFlow。(LiveData也是一個熱流)
知道了冷流是只有接收者接受數據時,發送者才會去產生數據再發送數據給接收者。并且每個接收者都會觸發完整的數據流從頭開始接收完整的數據源。
而熱流就是不管有沒有接受者來接收數據,發送者都會生產數據,多個接受者時共享同一份數據源的,同時接受者并不會接收完整的數據源,發送者數據生產到哪了接受者就接收到哪的數據。
說的通俗一點就是:
- 冷流就像刷視頻,我們開始刷這個視頻這個視頻才會開始播放,并且是從頭開始播放
- 熱流就像直播,我們不看直播這個直播也在播放,點進直播間觀看也并不是從頭開始看,而是只能從當前的內容開始看
冷流Demo:
//每次collect都會重新發射數據
val coldFlow = flow { println("開始生產數據")emit(1) emit(2)
} // 觀察者1
coldFlow.collect { println("觀察者1: $it") } // 輸出:1,2
// 觀察者2
coldFlow.collect { println("觀察者2: $it") } // 再次輸出:1,2
適用場景:
- 網絡請求、數據庫查詢等需要獨立數據源的場景
- 每個訂閱者需要從頭消費完整的數據
熱流Demo:
// 創建熱流(SharedFlow)
val hotFlow = MutableSharedFlow<Int>()// 啟動協程持續發射數據(即使沒有訂閱者)
CoroutineScope(Dispatchers.Default).launch { repeat(4) { delay(1000) // 發射 0 1 2 3 4hotFlow.emit(it)}
}// 觀察者1(延遲1秒訂閱)
CoroutineScope(Dispatchers.Main).launch {delay(1000) hotFlow.collect { println("觀察者1: $it") } // 只能收到 1,2,3,4
} // 觀察者2(延遲5秒訂閱)
CoroutineScope(Dispatchers.Main).launch { delay(5000) hotFlow.collect { println("觀察者2: $it") } // 收不到任何數據(發射已結束)
}
適用場景:
- 需要共享實時數據的場景(如IM消息、用戶定位更新)
- 數據生產是連續且獨立的
總結:
冷流(Flow、asFlow) | 熱流(StateFlow、SharedFlow) | |
---|---|---|
數據產生發送時機 | 接受者收集數據時(collect) | 直接產生數據,不管有沒有接受者收集 |
數據獨立性 | 每個接受者收到的數據時獨立的 | 所有接受者共享數據 |
數據歷史 | 每個接受者從頭開始獲取完整數據 | 只能獲取訂閱后產生的數據 |
搜索輸入流實現實時搜索
場景:有一個搜索框,沒有搜索圖標我們無法手動點擊進行搜索,而是對文本進行監聽實現實時自動搜索。
在這使用的Flow是callbackFlow,介紹一下它與Flow的區別:
- flow{}用于掛起函數式的順序執行
- callbackFlow{}用于將異步的、回調式的數據源封裝成Flow
簡單來說,callbackFlow是需要依賴異步回調拿數據的場景,沒辦法直接emit(),比如說監聽文本變化這種。而不是像flow那樣,發送完網絡請求直接emit()即可。
所以需要對文本進行監聽的話,需要使用callbackFlow將TextWatcher轉成流。
val et = vBinding.etSearchval textFlow = callbackFlow {val watcher = object : TextWatcher {override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {}override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {}override fun afterTextChanged(s: Editable?) {val str = s?.toString() ?: ""// 將變化的字符串交給FlowtrySend(str)}}et.addTextChangedListener(watcher)awaitClose {// 當Flow被取消時,移除監聽器避免內存泄露et.removeTextChangedListener(watcher)}}textFlow.debounce(500).map { it.trim() }.distinctUntilChanged().onEach {vm.updateKeyword(it)}.launchIn(lifecycleScope)
這里textFlow做的處理有:
- 用戶通知輸入500ms時才做處理,減少頻繁觸發
- distinctUntilChanged()表示輸入內容不變時不觸發搜索
- 收集trySend發送的字符串來進行搜索
- launchIn(lifecycleScope)在當前生命周期范圍內啟動協程收集Flow