Kotlin Flow啊,你將流向何方?
前言
協程系列文章:
- 一個小故事講明白程序、執行緒、Kotlin 協程到底啥關係?
- 少年,你可知 Kotlin 協程最初的樣子?
- 講真,Kotlin 協程的掛起/恢復沒那麼神祕(故事篇)
- 講真,Kotlin 協程的掛起/恢復沒那麼神祕(原理篇)
- Kotlin 協程排程切換執行緒是時候解開真相了
- Kotlin 協程之執行緒池探索之旅(與Java執行緒池PK)
- Kotlin 協程之取消與異常處理探索之旅(上)
- Kotlin 協程之取消與異常處理探索之旅(下)
- 來,跟我一起擼Kotlin runBlocking/launch/join/async/delay 原理&使用
- 繼續來,同我一起擼Kotlin Channel 深水區
- Kotlin 協程 Select:看我如何多路複用
- Kotlin Sequence 是時候派上用場了
- Kotlin Flow啊,你將流向何方?
- Kotlin Flow 背壓和執行緒切換竟然如此相似
- Kotlin SharedFlow&StateFlow 熱流到底有多熱?
前邊一系列的協程文章鋪墊了很久,終於要分析Flow了。如果說協程是Kotlin的精華,那麼Flow就是協程的精髓。
通過本篇文章,你將瞭解到:
- 什麼是流?
- 為什麼引進Flow?
- Fow常見的操作
- 為什麼說Flow是冷流?
1. 什麼是流
自然界的流水,從高到低,從上游到下游流動。
而對於計算機世界的流:
資料的傳遞過程構成了資料流,簡稱流
比如想要查詢1~1000內的偶數,可以這麼寫:
kotlin
var i = 0
var list = mutableListOf<Int>()
while (i < 1000) {
if (i % 2 == 0)
list.add(i)
i++
}
此處對資料的處理即為找出其中的偶數。
若想要在偶數中找到>500的數,則繼續篩選:
kotlin
var i = 0
var list = mutableListOf<Int>()
while (i < 1000) {
if (i > 500 && i % 2 == 0)
list.add(i)
i++
}
可以看出,原始資料是1~1000,我們對它進行了一些操作:過濾偶數、過濾>500的數。當然還可以進行其它操作,如對映、變換等。
提取上述過程三要素:
- 原始資料
- 對資料的一系列操作
- 最終的資料
把這一系列的過程當做流:
從流的方向來觀察,我們稱原始資料為上流,對資料進行一系列處理後,最終的資料為下流。
從流的屬性來觀察,我們認為生產者在上流生產資料,消費者在下流消費資料。
2. 為什麼引進Flow?
由前面的文章我們知道,Java8提供了StreamAPI,專用來操作流,而Kotlin也提供了Sequence來處理流。
那為什麼還要引進Flow呢?
在Kotlin的世界裡當然不會想再依賴Java的StreamAPI了,主要來對比Kotlin裡的各種方案選擇。
先看應用場景的演變。
a、集合獲取多個值
想要獲取多個值,很顯而易見的想到了集合。
kotlin
fun testList() {
//構造集合
fun list(): List<Int> = listOf(1, 2, 3)
list().forEach {
//獲取多個值
println("value = $it")
}
}
以上函式功能涉及兩個物件:生產者和消費者。
生產者:負責將1、2、3構造為集合。
消費者:負責從集合裡將1、2、3取出。
若此時想要控制生產者的速度,比如先將1放到集合裡,過1秒後再講2放進集合,在此種場景下該函式顯得不那麼靈活了。
b、Sequence控制生成速度
Sequence可以生產資料,先看看它是怎麼控制生產速度的。
kotlin
fun testSequence() {
fun sequence():Sequence<Int> = sequence {
for (i in 1..3) {
Thread.sleep(1000)
yield(i)
}
}
sequence().forEach {
println("value = $it")
}
}
通過阻塞執行緒控制了生產者的速度。
你可能會說:在協程體裡為啥要用Thread.sleep()阻塞執行緒呢,用delay()不香嗎?
看起來很香,我們來看看實際效果:
直接報編譯錯誤了,提示是:受限制的掛起函式只能呼叫自己協程作用域內的成員和其它掛起函式。
而sequence的作用域是SequenceScope,檢視其定義發現:
究其原因,SequenceScope 被RestrictsSuspension 修飾限制了。
c、集合配合協程使用
sequence 因為協程作用域的限制,不能非同步生產資料,而使用集合卻沒此限制。
kotlin
suspend fun testListDelay() {
suspend fun list():List<Int> {
delay(1000)
return listOf(1, 2, 3)
}
list().forEach {
println("value = $it")
}
}
但也暴露了一個缺陷,只能一次性的返回集合元素。
綜上所述:
不管是集合還是Sequence,都不能完全覆蓋流的需求,此時Flow閃亮登場了
3. Fow常見的操作
最簡單的Flow使用
```kotlin suspend fun testFlow1() { //生產者 var flow = flow { //發射資料 emit(5) }
//消費者
flow.collect {
println("value=$it")
}
}
```
通過flow函式構造一個flow物件,然後通過呼叫flow.collect收集資料。
flow函式的閉包為生產者的生產邏輯,collect函式的閉包為消費者的消費邏輯。
當然,還有更簡單的寫法:
kotlin
suspend fun testFlow2() {
//生產者
flow {
//發射資料
emit(5)
}.collect {
//消費者
println("value=$it")
}
}
執行流程:
Flow操作符
上面只提到了flow資料的傳送以及接收,並沒有提及對flow資料的操作。
flow提供了許多操作符方便我們對資料進行處理(對流進行加工)。
我們以尋找1~1000內大於500的偶數為例:
```kotlin
suspend fun testFlow3() {
//生產者
var flow = flow {
for (i in 1..1000) {
emit(i)
}
}.filter { it > 500 && it % 2 == 0 }
//消費者
flow.collect {
println("value=$it")
}
}
filter函式的作用根據一定的規則過濾資料,一般稱這種函式為flow的操作符。
當然還可以對flow進行對映、變換、異常處理等。
kotlin
suspend fun testFlow3() {
//生產者
var flow = flow {
for (i in 1..1000) {
emit(i)
}
}.filter { it > 500 && it % 2 == 0 }
.map { it - 500 }
.catch {
//異常處理
}
//消費者
flow.collect {
println("value=$it")
}
}
```
中間操作符
前面說過流的三要素:原始資料、對資料的操作、最終資料,對應到Flow上也是一樣的。
flow的閉包裡我們看做是原始資料,而filter、map、catch等看做是對資料的操作,collect閉包裡看做是最終的資料。
filter、map等操作符屬於中間操作符,它們負責對資料進行處理。
中間操作符僅僅只是預先定義一些對流的操作方式,並不會主動觸發動作執行
末端操作符
末端操作符也叫做終端操作符,呼叫末端操作符後,Flow將從上流發出資料,經過一些列中間操作符處理後,最後流到下流形成最終資料。
如上面的collect操作符就是其中一種末端操作符。
怎麼區分中間操作符和末端操作符呢?
和Sequence操作符類似,可以通過返回值判斷。
先看看中間操作符filter:
```kotlin
public inline fun
internal inline fun
再看末端操作符collect:
kotlin
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
返回值為Unit,並且通過呼叫collect最終呼叫了emit,觸發了流。
Flow相比Sequence、Collection的優勢
Sequence對於協程的支援不夠好,不能呼叫其作用域外的suspend函式,而Collection生產資料不夠靈活,來看看Flow是如何解決這些問題的。
```kotlin
suspend fun testFlow4() {
//生產者
var flow = flow {
for (i in 1..1000) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.IO)//切換到io執行緒執行
//消費者
flow.collect {
delay(1000)
println("value=$it")
}
}
```
如上,flow的生產者、消費者閉包裡都支援呼叫協程的suspend函式,同時也支援切換執行緒執行。
再者,flow可以將集合裡的值一個個發出,可調整其流速。
當然,flow還提供了許多操作符幫助我們實現各種各樣的功能,此處限於篇幅就不再深入。
萬變不離其宗,知道了原理,一切迎刃而解。
4. 為什麼說Flow是冷流?
flow 的流動
在sequence的分析裡有提到過sequence是冷流,那麼什麼是冷流呢?
沒有消費者,生產者不會生產資料
沒有觀察者,被觀察者不會發送資料
kotlin
suspend fun testFlow5() {
//生產者
var flow = flow {
println("111")
for (i in 1..1000) {
emit(i)
}
}.filter {
println("222")
it > 500 && it % 2 == 0
}.map {
println("333")
it - 500
}.catch {
println("444")
//異常處理
}
如上程式碼,只要生產者沒有消費者,該函式執行後不會有任何列印語句輸出。
這個時候將消費者加上,就會觸發流的流動。
還是以最簡單的flow demo為例,看看其呼叫流程:
圖上1~6步驟即為最簡單的flow呼叫流程。
可以看出,只有呼叫了末端操作符(如collect)之後才會觸發flow的流動,因此flow是冷流。
flow 的原理
```kotlin suspend fun testFlow1() { //生產者 var flow = flow { //發射資料 emit(5) }
//消費者
flow.collect {
println("value=$it")
}
}
```
以上程式碼涉及到三個關鍵函式(flow、emit、collect),兩個閉包(flow閉包、collect閉包。
從上面的呼叫圖可知,以上五者的呼叫關係:
flow-->collect-->flow閉包-->emit-->collect閉包
接下來逐一分析在程式碼裡的關係。
先看生產者動作(flow函式)
flow函式實現:
```kotlin
public fun
傳入的引數型別為:FlowCollector的擴充套件函式,而FlowCollector是介面,它有唯一的函式:emit(xx)。因此在flow函式的閉包裡可以呼叫emit(xx)函式,flow閉包作為SafeFlow的成員變數block。
flow 函式返回SafeFlow,SafeFlow繼承自AbstractFlow,並實現了collect函式:
kotlin
Flow.kt
public final override suspend fun collect(SafeCollector: FlowCollector<T>) {
//構造SafeCollector
//collector 作為SafeCollector的成員變數
val safeCollector = SafeCollector(collector, coroutineContext)
try {
//抽象函式,子類實現
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
```
collect的閉包作為SafeCollector的成員變數collector,後面會用到。
由此可見:flow函式僅僅只是構造了flow物件並返回。
再看消費者動作(collect)
當消費者呼叫flow.collect函式時:
kotlin
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
此時呼叫的collect即為flow裡定義的collect函式,並構造了匿名物件FlowCollector,實現了emit函式,而emit函式的真正實現為action,也就是外層傳入的collect的閉包。
上面分析到的collect原始碼裡呼叫了collectSafely:
kotlin
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
此處的block即為在構造flow物件時傳入的閉包。
此時,消費者通過collect函式已經呼叫到生產者的閉包裡
還剩下最後一個問題:生產者的閉包是如何流轉到消費者的閉包裡呢?
最後看發射動作(emit)
在生產者的閉包裡呼叫了emit函式:
```kotlin
override suspend fun emit(value: T) {
//掛起函式
return suspendCoroutineUninterceptedOrReturn [email protected]{ uCont ->
try {
//uCont為當前協程續體
emit(uCont, value)
} catch (e: Throwable) {
// Save the fact that exception from emit (or even check context) has been thrown
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
//collector.emit 最終呼叫collect的閉包
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
```
如此一來,生產者的閉包裡呼叫emit函式後,將會呼叫到collect的閉包裡,此時資料從flow的上游流轉到下游。
總結以上步驟,其實本質還是物件呼叫。
中間操作符的原理
以filter為例:
```kotlin
public inline fun
internal inline fun <T, R> Flow<T>.unsafeTransform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
//呼叫當前物件collect
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
[email protected] transform(value)
}
}
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
//構造flow,重寫collect
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
```
filter操作符構造了新的flow物件,該物件重寫了collect函式。
當呼叫flow.collect時,先呼叫到filter物件的collect,進而呼叫到原始flow的collect,接著呼叫到原始flow物件的閉包,在閉包裡呼叫的emit即為filter的閉包,若filter閉包裡條件滿足則調動emit函式,最後呼叫到collect的閉包。
理解中間操作符的要點:
- 中間操作符返回新的flow物件,重寫了collect函式
- collect函式會呼叫當前flow(呼叫filter的flow物件)的collect
- collect函式做其它的處理
與sequence類似,使用了裝飾者模式。
以上以filter為例闡述了原理,其它中間操作符的原理類似,此處就不再細說。
下篇將分析Flow的背壓與執行緒切換,相信分析的邏輯會讓大家耳目一新,敬請期待~
本文基於Kotlin 1.5.3,文中完整Demo請點選
您若喜歡,請點贊、關注、收藏,您的鼓勵是我前進的動力
持續更新中,和我一起步步為營系統、深入學習Android/Kotlin
1、Android各種Context的前世今生
2、Android DecorView 必知必會
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分發全套服務
6、Android invalidate/postInvalidate/requestLayout 徹底釐清
7、Android Window 如何確定大小/onMeasure()多次執行原因
8、Android事件驅動Handler-Message-Looper解析
9、Android 鍵盤一招搞定
10、Android 各種座標徹底明瞭
11、Android Activity/Window/View 的background
12、Android Activity建立到View的顯示過
13、Android IPC 系列
14、Android 儲存系列
15、Java 併發系列不再疑惑
16、Java 執行緒池系列
17、Android Jetpack 前置基礎系列
18、Android Jetpack 易學易懂系列
19、Kotlin 輕鬆入門系列
20、Kotlin 協程系列全面解讀
- 來吧!接受Kotlin 協程--執行緒池的7個靈魂拷問
- Kotlin Flow啊,你將流向何方?
- Kotlin 協程 Select:看我如何多路複用
- Kotlin 協程排程切換執行緒是時候解開真相了
- 講真,Kotlin 協程的掛起沒那麼神祕(原理篇)
- 講真,Kotlin 協程的掛起沒那麼神祕(故事篇)
- 少年,你可知 Kotlin 協程最初的樣子?
- 一個小故事講明白程序、執行緒、Kotlin 協程到底啥關係?
- Kotlin 高階函式從未如此清晰(中)
- Kotlin 高階函式從未如此清晰(上)
- Android 容易遺漏的重新整理小細節
- Jetpack ViewModel 抽絲剝繭
- Jetpack LiveData 是時候瞭解一下了
- Jetpack Lifecycle 該怎麼看?還肝否?
- Android Activity 與View 的互動思考
- 數字簽名/數字證書/對稱/非對稱加密/CA 等概念明晰
- Android IPC 之AIDL應用(下)
- Android IPC 之Messenger 原理及應用
- Android clipToPadding 使用與疑難點解析
- Android invalidate/postInvalidate/requestLayout 徹底釐清