先回憶下flow吧!
flow是啥
Flow 是 Kotlin 協程框架中的一個異步數據流處理組件,專為響應式編程設計,適用于需要連續或異步返回多個值的場景,如網絡請求、數據庫查詢、傳感器數據等
- 1 ?異步流(Asynchronous Streams)
- Flow 允許以非阻塞方式處理一系列值或事件,適用于大量數據或涉及 IO 操作的場景
- 與 suspend 函數不同,Flow 可以返回多個值,而 suspend 函數僅能返回單個計算結果
- 2 冷流(Cold Flow)與熱流(Hot Flow)
- 冷流?:僅在收集器(collect)訂閱后才會開始發射數據
? - 熱流?(如 SharedFlow):創建后立即發射數據,無論是否有收集器訂閱
- 冷流?:僅在收集器(collect)訂閱后才會開始發射數據
- 3 聲明式 API?
- 提供豐富的操作符(如 map、filter、reduce),支持鏈式調用
- 4 協程集成?
- Flow 基于協程,支持結構化并發、取消操作和背壓管理
flow的創建
- 使用 flow{} 構建器
val numberFlow = flow {emit(1)delay(100) // 模擬耗時操作emit(2)
}
- 使用 flowOf 快速創建
val fixedFlow = flowOf(1, 2, 3)
- 集合轉 Flow
val listFlow = listOf("A", "B", "C").asFlow()
高級創建方式
(1) 基于回調的 API 轉換
將回調式 API(如網絡請求)封裝為 Flow?
fun fetchDataFlow() = callbackFlow {val callback = object : DataCallback {override fun onData(value: String) {trySend(value) // 發送數據到 Flow}override fun onComplete() {close() // 關閉 Flow}}registerCallback(callback)awaitClose { unregisterCallback(callback) } // 確保資源釋放
}
(2) 從掛起函數生成
通過 channelFlow 或 flow 結合掛起函數實現復雜邏輯?
fun pollUpdates() = flow {while (true) {val updates = fetchUpdates() // 掛起函數emit(updates)delay(5000) // 間隔輪詢}
}
Flow 中處理背壓(Backpressure)的核心策略
- 緩沖機制?
通過 buffer() 設置緩沖區容量,允許生產者和消費者異步執行,緩解數據積壓壓力?
flow { repeat(100) { emit(it) }
}.buffer(50) // 設置50容量的緩沖區.collect { /* 處理數據 */ }
?2. 合并策略
使用 conflate() 跳過中間值,僅保留最新數據?
flow { emit(1); delay(10); emit(2); emit(3)
}.conflate().collect { println(it) } // 輸出:1 → 3(跳過2)
?3. 最新值優先
collectLatest 取消未完成的任務,立即處理最新發射值?
flow { emit("A"); delay(100); emit("B")
}.collectLatest { value ->delay(200) // 處理"A"時被"B"中斷println(value) // 僅輸出"B"
}
- 調度優化
利用 flowOn 切換協程上下文,分散計算負載?
flow { /* 密集計算 */ }.flowOn(Dispatchers.Default) // 在后臺線程生產.collect { /* 主線程消費 */ }
創建操作符?
flow{}?
基礎構建器,通過 emit 發射數據,支持掛起操作?:
flow { emit(1); delay(100); emit(2) }
flowOf?
快速創建固定值序列的 Flow?:
flowOf("A", "B", "C")
asFlow?
將集合(如 List)轉換為 Flow?:
listOf(1, 2, 3).asFlow()
轉換操作符?
map?
對每個元素進行轉換?:
flowOf(1, 2).map { it * 10 } // 輸出 10, 20
filter?
按條件過濾元素?:
flowOf(1, 2, 3).filter { it % 2 == 0 } // 輸出 2
transform?
靈活轉換,可多次發射值?:
flowOf("Hi").transform { emit(it.uppercase()); emit(it.length) }
組合操作符?
zip?
合并兩個 Flow 的對應元素?:
flowOf(1, 2).zip(flowOf("A", "B")) { num, str -> "$num$str" } // 輸出 1A, 2B
flatMapConcat?
順序展開嵌套 Flow?:
flowOf(1, 2).flatMapConcat { flowOf(it, it * 2) } // 輸出 1, 2, 2, 4
終端操作符?
collect?
觸發流執行并處理數據?:
flowOf(1).collect { println(it) }
first/last?
獲取首個或末尾元素?:
flowOf(1, 2).first() // 返回 1
reduce?
累積計算(如求和)?:
flowOf(1, 2, 3).reduce { acc, v -> acc + v } // 輸出 6
背壓處理操作符?
buffer?
設置緩沖區緩解生產消費速度差異?:
flow { emit(1) }.buffer(10)
conflate?
跳過中間值,保留最新數據?:
flow { emit(1); emit(2) }.conflate() // 僅處理 2
其他實用操作符?
take?
限制收集的元素數量?:
flowOf(1, 2, 3).take(2) // 輸出 1, 2
onEach?
在每次發射時執行副作用(如日志)?:
flowOf(1).onEach { println("發射: $it") }
開整
調用retrofit+okhttp
導包
// 網絡請求api("com.google.code.gson:gson:2.8.6")api("com.squareup.retrofit2:retrofit:2.9.0")api("com.squareup.retrofit2:converter-gson:2.9.0")api("com.squareup.retrofit2:converter-scalars:2.0.0")api("com.squareup.okhttp3:okhttp:3.14.9")api("com.squareup.okhttp3:logging-interceptor:3.12.2")api("com.squareup.okio:okio:1.17.4")
open class HttpCreater private constructor() {val timeOut: Long = 60 //30秒超時val baseUrlInterceptor: BaseUrlInterceptor = BaseUrlInterceptor()var okhttpClient: OkHttpClientvar downOkHttpClient: OkHttpClient//日志攔截var loggingInterceptor = HttpLoggingInterceptor(object : HttpLoggingInterceptor.Logger {override fun log(message: String) {//打印retrofit日志Log.i("log_http", "retrofitBack = $message")}})init {loggingInterceptor.level = HttpLoggingInterceptor.Level.BODY//用于請求okhttpClient = OkHttpClient.Builder().connectTimeout(timeOut, TimeUnit.SECONDS).readTimeout(timeOut, TimeUnit.SECONDS).writeTimeout(timeOut, TimeUnit.SECONDS).addInterceptor(baseUrlInterceptor).addInterceptor(loggingInterceptor).build()//用于下載 因為請求的okhttpClient 有日志攔截器 會先攔截請求結果 所以專門創建個用于下載的downOkHttpClient = OkHttpClient.Builder().connectTimeout(timeOut, TimeUnit.SECONDS).readTimeout(timeOut, TimeUnit.SECONDS).writeTimeout(timeOut, TimeUnit.SECONDS).build()}companion object {open val instance by lazy (LazyThreadSafetyMode.SYNCHRONIZED){HttpCreater()}}/*** 設置baseUrl 對應的token*/open fun setToken(baseUrl: String,token:String){baseUrlInterceptor.setToken(baseUrl,token)}/*** 獲取請求的retrofit 調用的時候創建 傳入baseUrl 因為我們項目連了好幾個服務器*/open fun getRetrofit(baseUrl:String) : Retrofit{val gson: Gson = GsonBuilder().setLenient().create();return Retrofit.Builder().client(okhttpClient).baseUrl(baseUrl)
// .addConverterFactory(GsonConverterFactory.create(gson)) //配置轉化庫 Gson解析失敗,不報錯崩潰.addConverterFactory(ScalarsConverterFactory.create()) //返回字符串.build()}/*** 下載的tokenretrofit*/open fun getDownRetrofit(baseUrl:String) : Retrofit{return Retrofit.Builder().client(downOkHttpClient).baseUrl(baseUrl).addConverterFactory(ScalarsConverterFactory.create()) //配置轉化庫 Gson解析失敗,不報錯崩潰.build()}
}
請求攔截器 用于獲取token后設置后,自動將token設置到請求頭
class BaseUrlInterceptor:Interceptor {val tokenMap = mutableMapOf<String,String>()override fun intercept(chain: Interceptor.Chain): Response {// 獲取requestval request = chain.request()val builder = request.newBuilder()builder.addHeader("Content-Type", "application/json; charset=UTF-8")builder.addHeader("Accept", "application/json;versions=1")val httpUrl = request.url().url().hostLog.i("log_http","httpUrl>>${httpUrl}")if(!tokenMap.get(httpUrl).isNullOrEmpty()){builder.addHeader("Authorization", "Bearer ${tokenMap.get(httpUrl)}")}return chain.proceed(builder.build())}fun setToken(baseUrl: String, token: String) {tokenMap.put(baseUrl,token)}
}
添加請求接口
interface NetApi {@GET("/article/list/{path}/json")suspend fun getList(@Path("path") page:Int):String}
NetRepository flow調用
class NetRepository private constructor(){val service by lazy { HttpCreater.instance.getRetrofit("https://www.wanandroid.com").create(NetApi::class.java) }companion object{val instance by lazy { NetRepository() }}fun getList():Flow<String> = flow {val result = service.getList(0)Log.i("zq_demo","flow result>>${result}")emit(result)}
}
測試
測試代碼
findViewById<TextView>(R.id.tv_hello).setOnClickListener {Log.i("zqq_demo","tv_hello")lifecycleScope.launch {Log.i("zqq_demo","lifecycleScope")netRepository.getList().onEach {Log.i("zqq_demo","onEach>>${it}")}.collect{Log.i("zqq_demo","collect>>${it}")}}}
結果
其他
1 我們可以使用配置文件配置baseurl
創建config.gradle.kts
添加測試代碼
val myProperty: String by extra("Hello, World!")
使用 app下
添加
// 加載 config.gradle.kts
apply(from = "${rootDir}/config.gradle.kts")
// 使用 myProperty
println(extra["myProperty"]) // 輸出: Hello, World!
運行
接下來創建baseUrl
config.gradle.kts:
val baseUrl: String by extra("https://www.wanandroid.com")
app下 build.gradle.kts
plugins {alias(libs.plugins.android.application)alias(libs.plugins.jetbrains.kotlin.android)
}
// 加載 config.gradle.kts
apply(from = "${rootDir}/config.gradle.kts")
android {defaultConfig {buildConfigField("String",name = "baseUrl", value = "\"${extra["baseUrl"]}\"")}//這個別忘記添加buildFeatures {buildConfig = true // 啟用 BuildConfig 功能}
}
然后打印
Log.i("zqq_demo","baseUrl>>${BuildConfig.baseUrl}")
這樣上邊NetRepository 就可以使用
class NetRepository private constructor(){val service by lazy { HttpCreater.instance.getRetrofit(BuildConfig.baseUrl).create(NetApi::class.java) }
當然你可以定義版本號 applicationId 其他的key versionCode versionName 簽名文件路徑 等等配置都可以
使用舊的config.gradle
config.gradle如下
ext {test = "hello"android = [hello = "hello"]
}
項目的build.gradle.kts
plugins {alias(libs.plugins.android.application) apply falsealias(libs.plugins.jetbrains.kotlin.android) apply falsealias(libs.plugins.android.library) apply false
}apply(from = "${rootDir}/config.gradle")
app的build.gradle.kts
println("${(rootProject.extra["android"] as Map<String,Any>).get("hello")}") defaultConfig {applicationId = "com.zqq.demo"minSdk = 24targetSdk = 34versionCode = 1versionName = "1.0"testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"buildConfigField("String",name = "hello", value = "\"${rootProject.extra["test"]}\"")}buildFeatures {buildConfig = true // 啟用 BuildConfig 功能}