Kotlin 之 協程(三)Flow非同步流

語言: CN / TW / HK

開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第6天,點選檢視活動詳情

flow介紹

掛起函式可以非同步返回單個值,那如何非同步多次返回多個值呢? 使用flow,flow的特點: - flow{...}塊中的程式碼可以掛起 - 使用flow,suspend修飾符可以省略 - 流使用emit函式發射值 - 流使用collect的函式收集值 - flow類似冷流,flow中程式碼直到流被收集(呼叫collect)的時候才執行,類似lazy,什麼時候用,什麼時候執行。 - 流的連續性:流收集都是按順序收集的 - flowOn可更改流發射的上下文,即可以指定在主執行緒或子執行緒中執行 - 與之相對的是熱流,我們即將介紹的 StateFlow 和 SharedFlow 是熱流,在垃圾回收之前,都是存在記憶體之中,並且處於活躍狀態的。

c //使用flow,suspend修飾符可以省略 fun doflow() = flow<Int> { for (i in 1..5) { //這裡是掛起,不是阻塞 delay(500) emit(i) } }.flowOn(Dispatchers.IO) //呼叫 runBlocking { doflow().collect { log("value=$it") } } 列印(多次返回多個值) com.z.zjetpack V/zx: value=1 com.z.zjetpack V/zx: value=2 com.z.zjetpack V/zx: value=3 com.z.zjetpack V/zx: value=4 com.z.zjetpack V/zx: value=5

flow的應用場景

檔案下載場景

```c //正在下載(檔案總大小為5) fun doflow() = flow { for (i in 1..5) { delay(500) emit(i.toDouble()) } //flowOn來指定在IO執行緒中下載 }.flowOn(Dispatchers.IO) //讀取進度 runBlocking { doflow().collect { log("當前下載=${it / 5 * 100}%") } }

列印: com.z.zjetpack V/zx: 當前下載=20.0% com.z.zjetpack V/zx: 當前下載=40.0% com.z.zjetpack V/zx: 當前下載=60.0% com.z.zjetpack V/zx: 當前下載=80.0% com.z.zjetpack V/zx: 當前下載=100.0% ```

流構建器

flowof 和asflow

```c runBlocking { flowOf(1, 2, 3) .onEach { delay(500) } .collect { log("value = $it") }

        (5..8).asFlow()
            .onEach { delay(500) }
            .collect {
                log("value = $it")
            }
    }

```

使用launchin替換collect在單獨的協程中啟動收集流。

```c fun event() = (1..3) .asFlow() .onEach { delay(500) }.flowOn(Dispatchers.IO)

//呼叫

    runBlocking {
        val job =   event().onEach {
            log("value = $it")
        }.launchIn(CoroutineScope(Dispatchers.IO))
        //主執行緒可用this
        //.launchIn(this)

        job.join()
    }

```

流的取消

超時的時候取消

```c fun cancelFlow() = flow { for (i in 1..5) { delay(1000) emit(i) } }

//呼叫 runBlocking { //超時的時候取消流 withTimeoutOrNull(2500) { cancelFlow().collect { log("value = $it") } } }

列印:在2.5秒的時候超時了,取消了 com.z.zjetpack V/zx: value = 1 com.z.zjetpack V/zx: value = 2 ```

直接取消

```c runBlocking { cancelFlow().collect { log("value = $it") if(it == 3){ cancel() }

            }
    }

```

繁忙的任務是不能直接取消的,需要檢測取消(cancellable)

c runBlocking { (1..5).asFlow().cancellable().collect { if(it == 3) { cancel() } } } 背壓:生產者效率 > 消費者效率 在這裡插入圖片描述 使用緩衝和flowon來處理背壓

buffer():併發執行流中發射元素的程式碼 conflate():合併發射項,不對每個值處理 collectLatest():取消並重新發送最後一個值

模擬背壓程式碼: ```c fun preFlow() = flow { for (i in 1..5) { delay(100) emit(i) log("傳送$i") } }

//呼叫 //100ms傳送一次,300ms接收一次就產生了背壓 runBlocking { val time = measureTimeMillis { preFlow() //buffer可以增加緩衝,提高效率 //.buffer(100) //flowOn自帶緩衝功能 //.flowOn(Dispatchers.IO) //conflate不對每個值處理 //.conflate() //.collect //取消並重新發送最後一個值 .collectLatest { delay(300) log("接收到:$it") } } log("總耗時 $time")

    }

列印: com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 總耗時 2033

使用buffer後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 1634

使用flowOn後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 1639

使用conflate後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 1034

使用collectLatest後 com.z.zjetpack V/zx: 傳送1 com.z.zjetpack V/zx: 傳送2 com.z.zjetpack V/zx: 傳送3 com.z.zjetpack V/zx: 傳送4 com.z.zjetpack V/zx: 傳送5 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 總耗時 843 ```

操作符

轉換操作符:map ,transform 限長操作符:取指定數量,take 末端操作符:末端操作符用於啟動流收集的掛起函式,collect,tolist,toset,reduce,fold 組合操作符:zip 展平操作符:flatMapConcat(連線),flatMapMerge(合併),flatMapLatest(最新)

map

```c suspend fun perRequest(req: Int): String { delay(1000) return "轉換 $req" }

    runBlocking {
        (1..3).asFlow().map {
            perRequest(it)
        }.collect {
            log(it)
        }
  }

列印: com.z.zjetpack V/zx: 轉換 1 com.z.zjetpack V/zx: 轉換 2 com.z.zjetpack V/zx: 轉換 3 ```

transform

c runBlocking { (5..6).asFlow().transform { emit("s $it") emit(perRequest(it)) emit("e $it") } //.take(4) .collect { log(it) } } 列印: com.z.zjetpack V/zx: s 5 com.z.zjetpack V/zx: 轉換 5 com.z.zjetpack V/zx: e 5 com.z.zjetpack V/zx: s 6 com.z.zjetpack V/zx: 轉換 6 com.z.zjetpack V/zx: e 6

take

c 加上take之後 com.z.zjetpack V/zx: s 5 com.z.zjetpack V/zx: 轉換 5 com.z.zjetpack V/zx: e 5 com.z.zjetpack V/zx: s 6

末端操作符:collect,tolist,toset,reduce,fold

c runBlocking { val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b } log("sum = $sum") val nList = (1..5).asFlow().toList() log("nList = $nList") val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet() log("nSet = $nSet") } 列印: com.z.zjetpack V/zx: sum = 55 com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5] com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]

展平操作符

只使用map的時候

```c //返回值是一個flow fun reqFlow(i: Int) = flow { emit("start $i") delay(500) emit("end $i") }

runBlocking { (0..1).asFlow().map { reqFlow(it) }.collect { log("首次collect = $it") it.collect { log("二次 = $it") } } } 列印:由於返回是flow所以需要collect 兩次才能拿到值,Flow> com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@63db1bf com.z.zjetpack V/zx: 二次 = start 0 com.z.zjetpack V/zx: 二次 = end 0 com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@d27108c com.z.zjetpack V/zx: 二次 = start 1 com.z.zjetpack V/zx: 二次 = end 1 ```

flatMapConcat

c runBlocking { (0..1).asFlow().flatMapConcat { reqFlow(it) }.collect { log("首次collect = $it") } } 列印:直接展開了 com.z.zjetpack V/zx: 首次collect = start 0 com.z.zjetpack V/zx: 首次collect = end 0 com.z.zjetpack V/zx: 首次collect = start 1 com.z.zjetpack V/zx: 首次collect = end 1

c runBlocking { (0..1).asFlow().flatMapMerge { reqFlow(it) }.collect { log("首次collect = $it") } } 列印: com.z.zjetpack V/zx: 首次collect = start 0 com.z.zjetpack V/zx: 首次collect = start 1 com.z.zjetpack V/zx: 首次collect = end 0 com.z.zjetpack V/zx: 首次collect = end 1

flatMapLatest

```c runBlocking { (0..1).asFlow().flatMapLatest { reqFlow(it) }.collect { log("首次collect = $it") } }

列印: com.z.zjetpack V/zx: 首次collect = start 0 com.z.zjetpack V/zx: 首次collect = start 1 com.z.zjetpack V/zx: 首次collect = end 1 ```

流的異常處理

catch函式 和 try catch

c flow { emit(1) throw NullPointerException() //catch函式只捕獲上游的異常 }.catch { log("exception $it") //在異常後恢復 emit(20) }.flowOn(Dispatchers.IO) .collect { log("msg $it") } 列印: com.z.zjetpack V/zx: exception java.lang.NullPointerException com.z.zjetpack V/zx: msg 1 com.z.zjetpack V/zx: msg 20

c //不建議通過這種方式捕獲上游的異常,違反了flow原則,這種適合捕獲下游的異常 try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("異常 $e") } 列印: com.z.zjetpack V/zx: 異常 java.lang.IllegalStateException: ex 1

流的完成

finally 和 onCompletion

```c try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("異常 $e") } finally { log("流已完成") }

        //發生異常onCompletion可以拿到異常資訊,但不會捕獲
        try {
            (1..3).asFlow().onCompletion {
                log("onCompletion $it")
            }.collect {
                check(it > 2) {
                    "ex $it"
                }
            }
        } catch (e: Exception) {
            log("異常 $e")
        }

列印: com.z.zjetpack V/zx: 異常 java.lang.IllegalStateException: ex 1 com.z.zjetpack V/zx: 流已完成 com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1 com.z.zjetpack V/zx: 異常 java.lang.IllegalStateException: ex 1 ```

StateFlow

StateFlow 是一個狀態容器式可觀察資料流,可以向其收集器發出當前狀態更新和新狀態更新。 1. StateFlow使用 第一步:建立 MutableStateFlow 並設定初始化的值。

c class MainViewModel : ViewModel() { val selected = MutableStateFlow<Boolean>(false) }

第二步:同 Flow 一樣,使用 collect 方法:

c lifecycleScope.launch { viewModel.selected.collect { // ... 引起UI發生的變化 // 比如 某個按鈕是否選中狀態 } }

第三步:可以給 selected設定值,從而引起 Ui 層的變化:

c class MainViewModel : ViewModel() { val selected = MutableStateFlow<Boolean>(false) fun doSomeThing(value: Boolean) { selected.value = value } }

普通的 Flow,是不具備 selected.value = value 這種能力的

StateFlow 和 LiveData 有什麼區別? 有兩點區別:

第一點,StateFlow 必須有初始值,LiveData 不需要。 第二點,當 View 變為 STOPPED 狀態時,LiveData.observe() 會自動取消註冊使用方,而從 StateFlow 或任何其他資料流收集資料則不會取消註冊使用方。 對於 StateFlow 在介面銷燬的時仍處於活躍狀態,有兩種解決方法:

使用 ktx 將 Flow 轉換為 LiveData。 在介面銷燬的時候,手動取消(這很容易被遺忘)。

```c class LatestNewsActivity : AppCompatActivity() { ... // Coroutine listening for UI states private var uiStateJob: Job? = null

override fun onStart() {
    super.onStart()
    // Start collecting when the View is visible
    uiStateJob = lifecycleScope.launch {
        latestNewsViewModel.uiState.collect { uiState -> ... }
    }
}

override fun onStop() {
    // Stop collecting when the View goes to the background
    uiStateJob?.cancel()
    super.onStop()
}

} ```

SharedFlow

SharedFlow:資料共享,有點類似廣播 和 StateFlow 一樣,SharedFlow 也是熱流,它可以將已傳送過的資料傳送給新的訂閱者,並且具有高的配置性。

  1. SharedFlow使用場景 總的來說,SharedFlow 和 StateFlow 類似,他們都是熱流,都可以用來儲存狀態,但 SharedFlow 配置靈活。

當你有如下場景時,需要使用 SharedFlow:

發生訂閱時,需要將過去已經更新的n個值,同步給新的訂閱者。 配置快取策略。 2. SharedFlow的使用 簡單寫一個 Demo吧。

第一步:建立一個 MutableSharedFlow,對應的引數解釋在註釋中

c class MainViewModel : ViewModel() { val sharedFlow = MutableSharedFlow<Int>( 5 // 引數一:當新的訂閱者Collect時,傳送幾個已經發送過的資料給它 , 3 // 引數二:減去replay,MutableSharedFlow還快取多少資料 , BufferOverflow.DROP_OLDEST // 引數三:快取策略,三種 丟掉最新值、丟掉最舊值和掛起 ) }

第二步:使用emit或者tryEmit方法

```c class MainViewModel : ViewModel() { val sharedFlow = MutableSharedFlow( // .... )

// 初始化時呼叫
init {
    for (i in 0..10) {
        sharedFlow.tryEmit(i)
    }
}

// 在按鈕中呼叫
fun doAsClick() {
    for (i in 11..20) {
        sharedFlow.tryEmit(i)
    }
}

} ```

當 MutableSharedFlow 中快取資料量超過閾值時,emit 方法和 tryEmit 方法的處理方式會有不同:

emit 方法:當快取策略為 BufferOverflow.SUSPEND 時,emit 方法會掛起,直到有新的快取空間。 tryEmit 方法:tryEmit 會返回一個 Boolean 值,true 代表傳遞成功,false 代表會產生一個回撥,讓這次資料發射掛起,直到有新的快取空間。 第三步:接收資料 接收資料的方式,跟普通的 Flow 沒什麼區別。

下面是我的全部程式碼:

```c class MainActivity : AppCompatActivity() {

private lateinit var viewModel: MainViewModel

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)

    viewModel = ViewModelProvider(this).get(com.example.coroutinedemo.viewmodel.MainViewModel::class.java)

    val tvContent = findViewById<TextView>(R.id.tv_content)
    // 啟動第一個協程,接收初始化的資料
    lifecycleScope.launch {
        val sb = StringBuffer()
        viewModel.sharedFlow.collect {
            sb.append("<<${it}")
            tvContent.text = sb
        }
    }

    val btnGo = findViewById<Button>(R.id.btn_go)
    val tvTwo = findViewById<TextView>(R.id.tv_2)
    btnGo.setOnClickListener {
        // 傳送新的資料
        viewModel.doAsClick()
        // 傳送新的資料以後,啟動第二個協程
        lifecycleScope.launch {
            val sb = StringBuffer()
            viewModel.sharedFlow.collect {
                sb.append("<<${it}")
                tvTwo.text = sb.toString()
            }
        }
    }
}

} ```

  1. 將冷流轉化為SharedFlow 直接使用官網的程式碼,方法是使用 Flow 的擴充套件方法 shareIn:

c class NewsRemoteDataSource(..., private val externalScope: CoroutineScope, ) { val latestNews: Flow<List<ArticleHeadline>> = flow { ... }.shareIn( externalScope, replay = 1, started = SharingStarted.WhileSubscribed() // 啟動政策 ) }

重點是引數三,分別提供了三個啟動策略:

SharingStarted.WhileSubscribed():存在訂閱者時,將使上游提供方保持活躍狀態。 SharingStarted.Eagerly:立即啟動提供方。 SharingStarted.Lazily:在第一個訂閱者出現後開始共享資料,並使資料流永遠保持活躍狀態。 總結 Flow 給我的感覺就像古老的印刷術,版面定了就不可更改,不過,該版面可印刷多張內容;StateFlow 給我的感覺就像活字印刷,可以不停的更改版面,也可以使用同一個版面印刷很多內容。

如果你要使用 Flow 記錄資料的狀態,StateFlow 和 SharedFlow 會是一個不錯的選擇。StateFlow 和 SharedFlow 提供了在 Flow 中使用 LiveData 式更新資料的能力,但是如果要在 UI 層使用,需要注意生命週期的問題。

StateFlow 和 SharedFlow 相比,StateFlow 需要提供初始值,SharedFlow 配置靈活,可提供舊資料同步和快取配置的功能。 協程進階技巧 - StateFlow和SharedFlow