Kotlin 之 協程(三)Flow非同步流
開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第6天,點選檢視活動詳情
flow介紹
掛起函式可以非同步返回單個值,那如何非同步多次返回多個值呢? 使用flow,flow的特點: - flow{...}塊中的程式碼可以掛起 - 使用flow,suspend修飾符可以省略 - 流使用emit函式發射值 - 流使用collect的函式收集值 - flow類似冷流,flow中程式碼直到流被收集(呼叫collect)的時候才執行,類似lazy,什麼時候用,什麼時候執行。 - 流的連續性:流收集都是按順序收集的 - flowOn可更改流發射的上下文,即可以指定在主執行緒或子執行緒中執行 - 與之相對的是熱流,我們即將介紹的 StateFlow 和 SharedFlow 是熱流,在垃圾回收之前,都是存在記憶體之中,並且處於活躍狀態的。
c
//使用flow,suspend修飾符可以省略
fun doflow() = flow<Int> {
for (i in 1..5) {
//這裡是掛起,不是阻塞
delay(500)
emit(i)
}
}.flowOn(Dispatchers.IO)
//呼叫
runBlocking {
doflow().collect {
log("value=$it")
}
}
列印(多次返回多個值)
com.z.zjetpack V/zx: value=1
com.z.zjetpack V/zx: value=2
com.z.zjetpack V/zx: value=3
com.z.zjetpack V/zx: value=4
com.z.zjetpack V/zx: value=5
flow的應用場景
檔案下載場景
```c
//正在下載(檔案總大小為5)
fun doflow() = flow
列印: com.z.zjetpack V/zx: 當前下載=20.0% com.z.zjetpack V/zx: 當前下載=40.0% com.z.zjetpack V/zx: 當前下載=60.0% com.z.zjetpack V/zx: 當前下載=80.0% com.z.zjetpack V/zx: 當前下載=100.0% ```
流構建器
flowof 和asflow
```c runBlocking { flowOf(1, 2, 3) .onEach { delay(500) } .collect { log("value = $it") }
(5..8).asFlow()
.onEach { delay(500) }
.collect {
log("value = $it")
}
}
```
使用launchin替換collect在單獨的協程中啟動收集流。
```c fun event() = (1..3) .asFlow() .onEach { delay(500) }.flowOn(Dispatchers.IO)
//呼叫
runBlocking {
val job = event().onEach {
log("value = $it")
}.launchIn(CoroutineScope(Dispatchers.IO))
//主執行緒可用this
//.launchIn(this)
job.join()
}
```
流的取消
超時的時候取消
```c
fun cancelFlow() = flow
//呼叫 runBlocking { //超時的時候取消流 withTimeoutOrNull(2500) { cancelFlow().collect { log("value = $it") } } }
列印:在2.5秒的時候超時了,取消了 com.z.zjetpack V/zx: value = 1 com.z.zjetpack V/zx: value = 2 ```
直接取消
```c runBlocking { cancelFlow().collect { log("value = $it") if(it == 3){ cancel() }
}
}
```
繁忙的任務是不能直接取消的,需要檢測取消(cancellable)
c
runBlocking {
(1..5).asFlow().cancellable().collect {
if(it == 3) {
cancel()
}
}
}
背壓:生產者效率 > 消費者效率
使用緩衝和flowon來處理背壓
buffer():併發執行流中發射元素的程式碼 conflate():合併發射項,不對每個值處理 collectLatest():取消並重新發送最後一個值
模擬背壓程式碼:
```c
fun preFlow() = flow
//呼叫 //100ms傳送一次,300ms接收一次就產生了背壓 runBlocking { val time = measureTimeMillis { preFlow() //buffer可以增加緩衝,提高效率 //.buffer(100) //flowOn自帶緩衝功能 //.flowOn(Dispatchers.IO) //conflate不對每個值處理 //.conflate() //.collect //取消並重新發送最後一個值 .collectLatest { delay(300) log("接收到:$it") } } log("總耗時 $time")
}
列印: com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 總耗時 2033
使用buffer後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 1634
使用flowOn後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 1639
使用conflate後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 1034
使用collectLatest後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 843 ```
操作符
轉換操作符:map ,transform 限長操作符:取指定數量,take 末端操作符:末端操作符用於啟動流收集的掛起函式,collect,tolist,toset,reduce,fold 組合操作符:zip 展平操作符:flatMapConcat(連線),flatMapMerge(合併),flatMapLatest(最新)
map
```c suspend fun perRequest(req: Int): String { delay(1000) return "轉換 $req" }
runBlocking {
(1..3).asFlow().map {
perRequest(it)
}.collect {
log(it)
}
}
列印: com.z.zjetpack V/zx: 轉換 1 com.z.zjetpack V/zx: 轉換 2 com.z.zjetpack V/zx: 轉換 3 ```
transform
c
runBlocking {
(5..6).asFlow().transform {
emit("s $it")
emit(perRequest(it))
emit("e $it")
}
//.take(4)
.collect {
log(it)
}
}
列印:
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 轉換 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
com.z.zjetpack V/zx: 轉換 6
com.z.zjetpack V/zx: e 6
take
c
加上take之後
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 轉換 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
末端操作符:collect,tolist,toset,reduce,fold
c
runBlocking {
val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }
log("sum = $sum")
val nList = (1..5).asFlow().toList()
log("nList = $nList")
val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet()
log("nSet = $nSet")
}
列印:
com.z.zjetpack V/zx: sum = 55
com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5]
com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]
展平操作符
只使用map的時候
```c
//返回值是一個flow
fun reqFlow(i: Int) = flow
runBlocking {
(0..1).asFlow().map {
reqFlow(it)
}.collect {
log("首次collect = $it")
it.collect {
log("二次 = $it")
}
}
}
列印:由於返回是flow所以需要collect 兩次才能拿到值,Flow
flatMapConcat
c
runBlocking {
(0..1).asFlow().flatMapConcat {
reqFlow(it)
}.collect {
log("首次collect = $it")
}
}
列印:直接展開了
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = end 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 1
c
runBlocking {
(0..1).asFlow().flatMapMerge {
reqFlow(it)
}.collect {
log("首次collect = $it")
}
}
列印:
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 0
com.z.zjetpack V/zx: 首次collect = end 1
flatMapLatest
```c runBlocking { (0..1).asFlow().flatMapLatest { reqFlow(it) }.collect { log("首次collect = $it") } }
列印: com.z.zjetpack V/zx: 首次collect = start 0 com.z.zjetpack V/zx: 首次collect = start 1 com.z.zjetpack V/zx: 首次collect = end 1 ```
流的異常處理
catch函式 和 try catch
c
flow {
emit(1)
throw NullPointerException()
//catch函式只捕獲上游的異常
}.catch {
log("exception $it")
//在異常後恢復
emit(20)
}.flowOn(Dispatchers.IO)
.collect {
log("msg $it")
}
列印:
com.z.zjetpack V/zx: exception java.lang.NullPointerException
com.z.zjetpack V/zx: msg 1
com.z.zjetpack V/zx: msg 20
c
//不建議通過這種方式捕獲上游的異常,違反了flow原則,這種適合捕獲下游的異常
try {
(1..3).asFlow().collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("異常 $e")
}
列印:
com.z.zjetpack V/zx: 異常 java.lang.IllegalStateException: ex 1
流的完成
finally 和 onCompletion
```c try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("異常 $e") } finally { log("流已完成") }
//發生異常onCompletion可以拿到異常資訊,但不會捕獲
try {
(1..3).asFlow().onCompletion {
log("onCompletion $it")
}.collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("異常 $e")
}
列印: com.z.zjetpack V/zx: 異常 java.lang.IllegalStateException: ex 1 com.z.zjetpack V/zx: 流已完成 com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1 com.z.zjetpack V/zx: 異常 java.lang.IllegalStateException: ex 1 ```
StateFlow
StateFlow 是一個狀態容器式可觀察資料流,可以向其收集器發出當前狀態更新和新狀態更新。 1. StateFlow使用 第一步:建立 MutableStateFlow 並設定初始化的值。
c
class MainViewModel : ViewModel() {
val selected = MutableStateFlow<Boolean>(false)
}
第二步:同 Flow 一樣,使用 collect 方法:
c
lifecycleScope.launch {
viewModel.selected.collect {
// ... 引起UI發生的變化
// 比如 某個按鈕是否選中狀態
}
}
第三步:可以給 selected設定值,從而引起 Ui 層的變化:
c
class MainViewModel : ViewModel() {
val selected = MutableStateFlow<Boolean>(false)
fun doSomeThing(value: Boolean) {
selected.value = value
}
}
普通的 Flow,是不具備 selected.value = value 這種能力的
StateFlow 和 LiveData 有什麼區別? 有兩點區別:
第一點,StateFlow 必須有初始值,LiveData 不需要。 第二點,當 View 變為 STOPPED 狀態時,LiveData.observe() 會自動取消註冊使用方,而從 StateFlow 或任何其他資料流收集資料則不會取消註冊使用方。 對於 StateFlow 在介面銷燬的時仍處於活躍狀態,有兩種解決方法:
使用 ktx 將 Flow 轉換為 LiveData。 在介面銷燬的時候,手動取消(這很容易被遺忘)。
```c class LatestNewsActivity : AppCompatActivity() { ... // Coroutine listening for UI states private var uiStateJob: Job? = null
override fun onStart() {
super.onStart()
// Start collecting when the View is visible
uiStateJob = lifecycleScope.launch {
latestNewsViewModel.uiState.collect { uiState -> ... }
}
}
override fun onStop() {
// Stop collecting when the View goes to the background
uiStateJob?.cancel()
super.onStop()
}
} ```
SharedFlow
SharedFlow:資料共享,有點類似廣播 和 StateFlow 一樣,SharedFlow 也是熱流,它可以將已傳送過的資料傳送給新的訂閱者,並且具有高的配置性。
- SharedFlow使用場景 總的來說,SharedFlow 和 StateFlow 類似,他們都是熱流,都可以用來儲存狀態,但 SharedFlow 配置靈活。
當你有如下場景時,需要使用 SharedFlow:
發生訂閱時,需要將過去已經更新的n個值,同步給新的訂閱者。 配置快取策略。 2. SharedFlow的使用 簡單寫一個 Demo吧。
第一步:建立一個 MutableSharedFlow,對應的引數解釋在註釋中
c
class MainViewModel : ViewModel() {
val sharedFlow = MutableSharedFlow<Int>(
5 // 引數一:當新的訂閱者Collect時,傳送幾個已經發送過的資料給它
, 3 // 引數二:減去replay,MutableSharedFlow還快取多少資料
, BufferOverflow.DROP_OLDEST // 引數三:快取策略,三種 丟掉最新值、丟掉最舊值和掛起
)
}
第二步:使用emit或者tryEmit方法
```c
class MainViewModel : ViewModel() {
val sharedFlow = MutableSharedFlow
// 初始化時呼叫
init {
for (i in 0..10) {
sharedFlow.tryEmit(i)
}
}
// 在按鈕中呼叫
fun doAsClick() {
for (i in 11..20) {
sharedFlow.tryEmit(i)
}
}
} ```
當 MutableSharedFlow 中快取資料量超過閾值時,emit 方法和 tryEmit 方法的處理方式會有不同:
emit 方法:當快取策略為 BufferOverflow.SUSPEND 時,emit 方法會掛起,直到有新的快取空間。 tryEmit 方法:tryEmit 會返回一個 Boolean 值,true 代表傳遞成功,false 代表會產生一個回撥,讓這次資料發射掛起,直到有新的快取空間。 第三步:接收資料 接收資料的方式,跟普通的 Flow 沒什麼區別。
下面是我的全部程式碼:
```c class MainActivity : AppCompatActivity() {
private lateinit var viewModel: MainViewModel
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
viewModel = ViewModelProvider(this).get(com.example.coroutinedemo.viewmodel.MainViewModel::class.java)
val tvContent = findViewById<TextView>(R.id.tv_content)
// 啟動第一個協程,接收初始化的資料
lifecycleScope.launch {
val sb = StringBuffer()
viewModel.sharedFlow.collect {
sb.append("<<${it}")
tvContent.text = sb
}
}
val btnGo = findViewById<Button>(R.id.btn_go)
val tvTwo = findViewById<TextView>(R.id.tv_2)
btnGo.setOnClickListener {
// 傳送新的資料
viewModel.doAsClick()
// 傳送新的資料以後,啟動第二個協程
lifecycleScope.launch {
val sb = StringBuffer()
viewModel.sharedFlow.collect {
sb.append("<<${it}")
tvTwo.text = sb.toString()
}
}
}
}
} ```
- 將冷流轉化為SharedFlow 直接使用官網的程式碼,方法是使用 Flow 的擴充套件方法 shareIn:
c
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed() // 啟動政策
)
}
重點是引數三,分別提供了三個啟動策略:
SharingStarted.WhileSubscribed():存在訂閱者時,將使上游提供方保持活躍狀態。 SharingStarted.Eagerly:立即啟動提供方。 SharingStarted.Lazily:在第一個訂閱者出現後開始共享資料,並使資料流永遠保持活躍狀態。 總結 Flow 給我的感覺就像古老的印刷術,版面定了就不可更改,不過,該版面可印刷多張內容;StateFlow 給我的感覺就像活字印刷,可以不停的更改版面,也可以使用同一個版面印刷很多內容。
如果你要使用 Flow 記錄資料的狀態,StateFlow 和 SharedFlow 會是一個不錯的選擇。StateFlow 和 SharedFlow 提供了在 Flow 中使用 LiveData 式更新資料的能力,但是如果要在 UI 層使用,需要注意生命週期的問題。
StateFlow 和 SharedFlow 相比,StateFlow 需要提供初始值,SharedFlow 配置靈活,可提供舊資料同步和快取配置的功能。 協程進階技巧 - StateFlow和SharedFlow