Combine | (V) Combine 中的錯誤處理和 Scheduler

語言: CN / TW / HK

錯誤處理

到目前為止,在我們編寫的大部分程式碼中,我們沒有處理錯誤,而處理的都是“happy path”。在前面的文章中,我們瞭解到,Combine Publisher 聲明瞭兩個約束:

  • Output定義 Publisher 發出的值的型別;
  • Failure 定義 Publisher 發出的失敗的型別。

現在,我們將深入瞭解 Failure 在 Publisher 中的作用。

錯誤

Never

失敗型別為 Never 的 Publisher 表示永遠不會發出失敗。它為這些 Publisher 提供了強大的保證。這類 Publisher 可讓我們專注於使用值,同時絕對確保 Publisher 只有成功完成的事件。

在新的 Playground 頁面新增以下程式碼:

import Combine import Foundation import PlaygroundSupport PlaygroundPage.current.needsIndefiniteExecution = true ​ func example(_ desc: String, _ action:() -> Void) {    print("--- (desc) ---")    action() } ​ var subscriptions = Set<AnyCancellable>() ​ example("Just") {  Just("Hello") }

我們建立了一個帶有 Hello 字串值的 Just。 Just 是不會發出失敗的。 請按住 Command 並單擊 Just 初始化程式並選擇 Jump to Definition,檢視定義:

In contrast with Result.Publisher, a Just publisher can’t fail with an error. And unlike Optional.Publisher, a Just publisher always produces a value.

Combine 對 Never 的障保證不僅是理論上的,而是深深植根於框架及其各種 API 中。Combine 提供了幾個 Operator,這些 Operator 僅在保證 Publisher 永遠不會發出失敗事件時才可用。第一個是 sink 的變體,只處理值:

example("Just") {  Just("Hello")    .sink(receiveValue: { print($0) })    .store(in: &subscriptions) }

在上面的示例中,我們使用 sink(receiveValue:) ,這種特定的過載使我們可以忽略 Publisher 的完成事件,而只處理其發出的值。

此過載僅適用於這類“可靠”的 Publisher。在錯誤處理方面,Combine 是智慧且安全的,如果可能丟擲錯誤,它會強制我們處理完成事件。要看到這一點,我們需要將 Never 的 Publisher 變成可能發出失敗事件的 Publisher。

setFailureType(to:)

func setFailureType<E>(to failureType: E.Type) -> Publishers.SetFailureType<Self, E> where E : Error

Never Publisher 轉變為可能發出失敗事件的 Publisher 的第一種方法是使用 setFailureType。這是另一個僅適用於失敗型別為 Never 的 Publisher 的 Operator:

example("setFailureType") {  Just("Hello")    .setFailureType(to: MyError.self) }

可以使用 .eraseToAnyPublisher(),來確認已改變的 Publisher 型別:

image-20221224151735624.png

繼續修改上述程式碼:

enum MyError: Error {    case ohNo } ​ example("setFailureType") {    Just("Hello")        .setFailureType(to: MyError.self)        .sink(            receiveCompletion: { completion in                switch completion {                case .failure(.ohNo):                    print("Finished with OhNo!")                case .finished:                    print("Finished successfully!")                }            },            receiveValue: { value in                print("Got value: (value)")            }        )        .store(in: &subscriptions) }

現在我們只能使用 sink(receiveCompletion:receiveValue:)sink(receiveValue:) 過載不再可用,因為此 Publisher 可能會發出失敗事件。可以嘗試註釋掉 receiveCompletion檢視編譯錯誤。

此外,失敗型別為為 MyError,這使我們可以針對.failure(.ohNo) 情況而無需進行不必要的強制轉換來處理該錯誤。

當然,setFailureType 的作用只是型別定義。 由於原始 Publisher 是 Just,因此實際上也不會引發任何錯誤。

assign(to:on:)

assign Operator 僅適用於不會發出失敗事件的 Publisher,與 setFailureType 相同。 向提供的 keypath 傳送錯誤會導致未定義的行為。新增以下示例進行測試:

example("assign(to:on:)") {    class Person {        var name = "Unknown"    }    let person = Person()    print(person.name)    Just("Layer")        .handleEvents(            receiveCompletion: { _ in                print(person.name)            }        )        .assign(to: .name, on: person)        .store(in: &subscriptions) }

我們定義一個具有 name 屬性的 Person 類。建立一個 Person 例項並立即列印其 name。一旦 Publisher 傳送完成事件,使用 handleEvents 再次列印此 name。最後,使用 assignname 設定為 Publisher 發出的值:

--- assign(to:on:) --- Unknown Layer

Just("Layer") 正下方新增以下行:

.setFailureType(to: Error.self)

這意味著它不再是 Publisher<String, Never>,而是現在的 Publisher<String, Error>。執行 Playground,我們將進行驗證:

Referencing instance method 'assign(to:on:)' on 'Publisher' requires the types 'any Error' and 'Never' be equivalent

assign(to:)

assign(to:on:) 有一個棘手的部分——它會 strong 捕獲提供給 on 引數的物件。在上一個示例之後新增以下程式碼:

example("assign(to:)") {  class MyViewModel: ObservableObject {    @Published var currentDate = Date() ​    init() {      Timer.publish(every: 1, on: .main, in: .common)        .autoconnect()        .prefix(3)        .assign(to: .currentDate, on: self)        .store(in: &subscriptions)    }  } ​  let vm = MyViewModel()  vm.$currentDate    .sink(receiveValue: { print($0) })    .store(in: &subscriptions) }

我們 MyViewModel 中定義一個 @Published 屬性。 它的初始值為當前日期。在 init 中建立一個 Timer Publisher,它每秒發出當前日期。使用 prefix Operator 只接受 3 個更新。使用 assign(to:on:) 將每個日期更新給@Published 屬性。例項化 MyViewModelsink vm.$currentDate,並打印出每個值:

--- assign(to:) --- 2022-12-24 07:32:33 +0000 2022-12-24 07:32:34 +0000 2022-12-24 07:32:35 +0000 2022-12-24 07:32:36 +0000

看起來一切都很好。但是對assign(to:on:) 的呼叫建立了一個 strong 持有 self 的 Subscription。 導致 self 掛在Subscription 上,而 Subscription 掛在 self 上,建立了一個導致記憶體洩漏的引用迴圈。

因此引入了該 Operator 的另一個過載 assign(to:)。該 Operator 通過對 Publisher 的 inout 引用來將值分配給 @Published 屬性。因此以下兩行:

.assign(to: .currentDate, on: self) .store(in: &subscriptions)

可以被替換為:

.assign(to: &$currentDate)

使用 assign(to:) Operator 將 inout 引用 Publisher 會打破引用迴圈。此外,它會在內部自動處理 Subscription 的記憶體管理,這樣我們就可以省略 store(in: &subscriptions)

assertNoFailure(_:file:line:)

當我們在開發過程確認 Publisher 以失敗事件完成時,assertNoFailure Operator 非常有用。它不會阻止上游發出失敗事件。但是,如果它檢測到錯誤,它會因錯誤而崩潰:

example("assertNoFailure") {  Just("Hello")    .setFailureType(to: MyError.self)    .assertNoFailure()    .sink(receiveValue: { print("Got value: ($0) ")})    .store(in: &subscriptions) }

我們使用 Just 建立一個“可靠”的 Publisher 並將其錯誤型別設定為 MyError。如果 Publisher 以錯誤事件完成,則使用 assertNoFailure 以崩潰。這會將 Publisher 的失敗型別轉回 Never。使用 sink 打印出任何接收到的值。請注意,由於 assertNoFailure 將失敗型別設定回 Never,因此 sink(receiveValue:) 過載可以直接使用。

執行 Playground,它可以正常工作:

--- assertNoFailure --- Got value: Hello

setFailureType 之後,新增以下行:

.tryMap { _ in throw MyError.ohNo }

一旦 Hello 被推送到下游,使用 tryMap 丟擲錯誤。再次執行 Playground:

Playground execution failed: ​ error: Execution was interrupted, reason: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0). ​ ... ​ frame #0: 0x00007fff232fbbf2 Combine`Combine.Publishers.AssertNoFailure...

由於 Publisher 發出失敗事件,playground 會 crash。 在某種程度上,我們可以將 assertNoFailure() 視為程式碼的保護機制。 雖然我們不應該在生產環境中使用它,但在開發過程中提前發現問題非常有用。

處理錯誤

try* Operator

Combine 提供了一個區分可能引發錯誤和可能不會引發錯誤的 Operator 的方法:try 字首。

注意:Combine 中所有以 try 為字首的 Operator 在遇到錯誤時的行為相同。我們將只在本章中嘗試使用 tryMap Operator。

example("tryMap") {    enum NameError: Error {        case tooShort(String)        case unknown    }        ["Aaaa", "Bbbbb", "Cccccc"]        .publisher        .map { value in            return value.count        }        .sink(            receiveCompletion: { print("Completed with ($0)") },            receiveValue: { print("Got value: ($0)") }        ) }

在上面的示例中,我們定義一個 NameError 錯誤列舉。建立釋出三個字串的 Publisher。將每個字串對映到它的長度。執行示例並檢視控制檯輸出:

--- tryMap --- Got value: 4 Got value: 5 Got value: 6 Completed with finished

將上面示例中的 map 替換為以下內容:

.tryMap { value -> Int in    let length = value.count    guard length >= 5 else {        throw NameError.tooShort(value)    }    return value.count }

我們檢查字串的長度是否大於等於 5。否則,我們會丟擲錯誤:

--- tryMap --- Completed with failure(Page_Contents.(unknown context at $10e3cb984).(unknown context at $10e3cba6c).(unknown context at $10e3cbaa8).NameError.tooShort("Aaaa"))

對映錯誤

maptryMap 之間的區別不僅僅是後者允許丟擲錯誤。 map 繼承了現有的失敗型別並且只操作 Publisher 的值,但 tryMap 沒有——它實際上將錯誤型別擦除為普通的 Swift 錯誤。 與帶有 try 字首的所有 Operator 都是如此。

example("map vs tryMap") {  enum NameError: Error {    case tooShort(String)    case unknown  } ​  Just("Hello")    .setFailureType(to: NameError.self)    .map { $0 + " World!" }    .sink(      receiveCompletion: { completion in        switch completion {        case .finished:          print("Done!")        case .failure(.tooShort(let name)):          print("(name) is too short!")        case .failure(.unknown):          print("An unknown name error occurred")        }      },      receiveValue: { print("Got value ($0)") }    )    .store(in: &subscriptions) }

我們定義一個用於此示例的 NameError。建立一個只發出字串 HelloJust。使用 setFailureType 設定失敗型別為 NameError。使用 map 將另一個字串附加。最後,使用 sinkreceiveCompletionNameError 的每個情況打印出適當的訊息。執行 Playground:

--- map vs tryMap --- Got value Hello World! Done!

image-20221224163358544.png

Completion 的失敗型別是 NameError,這正是我們想要的。 setFailureType 允許我們專門針對 NameError 進行處理,例如 failure(.tooShort(let name))

map 更改為 tryMap

.tryMap { throw NameError.tooShort($0) }

我們會立即注意到 Playground 不再編譯。 再次點選 completion

image-20221224163616392.png

tryMap 刪除了我們的型別錯誤並將其替換為通用 Swift.Error 型別。即使我們實際上並沒有從 tryMap 中丟擲錯誤,也會發生這種情況。

原因很簡單:Swift 還不支援型別化 throws,儘管自 2015 年以來 Swift Evolution 中一直在討論這個主題。這意味著當我們使用帶有 try 字首的 Operator 時,我們的錯誤型別將總是被抹去到最常見的父類:Swift.Error

一種方法是將通用錯誤手動轉換為特定的錯誤型別,但這不是最理想的。它打破了嚴格型別錯誤的整個目的。幸運的是,Combine 為這個問題提供了一個很好的解決方案,稱為 mapError

在呼叫 tryMap 之後,新增以下行:

.mapError { $0 as? NameError ?? .unknown }

mapError 接收上游 Publisher 丟擲的任何錯誤,並將其對映到我們想要的任何錯誤。在這種情況下,我們可以利用它將錯誤轉換回 NameError。這會將 Failure 恢復為所需要的型別,並將我們的 Publisher 轉回 Publisher<String, NameError>。構建並執行 Playground,最終可以按預期編譯和工作:

--- map vs tryMap --- Hello is too short!

捕獲錯誤並重試

很多時候,當我們請求資源或執行某些計算時,失敗可能是由於網路不穩定或其他資源不可用而導致的一次性事件。

在這些情況下,我們通常會編寫一個機制來重試不同的工作,跟蹤嘗試次數,並處理如果所有嘗試都失敗的情況。Combine 讓這一切變得非常簡單。

retry Operator 接受一個數字。如果 Publisher 失敗,它將重新訂閱上游並重試至我們指定的次數。如果所有重試都失敗,它將錯誤推送到下游,就像沒有 retry Operator 一樣:

example("Catching and retrying") {    enum MyError: Error {        case network    }    var service1 = PassthroughSubject<Int, MyError>()    service1.send(completion: .failure(.network))      service1        .handleEvents(            receiveSubscription: { _ in print("Trying ...") },            receiveCompletion: {                guard case .failure(let error) = $0 else { return }                print("Got error: (error)")            }        )        .retry(3)        .sink(            receiveCompletion: { print("($0)") },            receiveValue: { number in                print("Got Number: (number)")            }        )        .store(in: &subscriptions) }

我們有一個 service1,它發出了失敗事件。因此,訂閱 service1 肯定會獲得失敗事件。我們嘗試三次,並通過 handleEvents 列印訂閱和完成:

--- Catching and retrying --- Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network failure(Page_Contents.(unknown context at $10fc7b584).(unknown context at $10fc7b77c).(unknown context at $10fc7b7b8).MyError.network)

執行 Playerground,我們會看到有四次 Trying。初始 Trying,加上由 retry Operator 觸發的三次重試。 由於 service1 不斷失敗,因此 Operator 會耗盡所有重試嘗試並將錯誤推送到 sink

調整程式碼:

example("Catching and retrying") {    enum MyError: Error {        case network    }    var service1 = PassthroughSubject<Int, MyError>()    service1.send(completion: .failure(.network))        service1        .handleEvents(            receiveSubscription: { _ in print("Trying ...") },            receiveCompletion: {                guard case .failure(let error) = $0 else { return }                print("Got error: (error)")            }        )        .retry(3)        .replaceError(with: 1)        .sink(            receiveCompletion: { print("($0)") },            receiveValue: { number in                print("Got Number: (number)")            }        )        .store(in: &subscriptions) }

service1 重試後,若還是失敗,我們將通過 replaceError 將失敗替換為 1:

--- Catching and retrying --- Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Got Number: 1 finished

或者,我們可以使用 catch 捕獲 service1 的失敗,併為下游提供另一個 Publisher:

example("Catching and retrying") {    enum MyError: Error {        case network    }    var service1 = PassthroughSubject<Int, MyError>()    service1.send(completion: .failure(.network))    var service2 = PassthroughSubject<Int, MyError>()        service1        .handleEvents(            receiveSubscription: { _ in print("Trying ...") },            receiveCompletion: {                guard case .failure(let error) = $0 else { return }                print("Got error: (error)")            }        )        .retry(3)        .catch { error in            return service2        }        .sink(            receiveCompletion: { print("($0)") },            receiveValue: { number in                print("Got Number: (number)")            }        )        .store(in: &subscriptions)        service2.send(2)    service2.send(completion: .finished) }

此時,下游將獲得到 service2 發出的值 2 和完成事件:

--- Catching and retrying --- Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Trying ... Got error: network Got Number: 2 finished

cheduler

我們已經遇到了一些將 Scheduler 作為引數的 Operator。大多數情況下,我們會簡單地使用 DispatchQueue.main,因為它方便、易於理解。除了 DispatchQueue.main,我們肯定已經使用了全域性併發佇列,或建立一個序列排程佇列來執行操作。

但是為什麼 Combine 需要一個新的類似概念呢?我們接著將瞭解為什麼會出現 Scheduler 的概念,將探索 Combine 如何使非同步事件和操作更易於使用,當然,我們還會試使用 Combine 提供的所有 Scheduler。

Scheduler 簡介

根據 Apple 的文件,Scheduler 是一種定義何時及如何執行閉包的協議。Scheduler 提供上下文以儘快或在將來的某個事件執行未來的操作。該操作就是協議本身中定義的閉包。閉包也可以隱藏 Publisher 在特定 Scheduler 上執行的某些值的傳遞。

我們會注意到此定義有意避免對執行緒的任何引用,這是因為具體的實現是在 Scheduler 協議中,提供的“上下文”中的。因此,我們的程式碼將在哪個執行緒上執行取決於選擇的 Scheduler。

記住這個重要的概念:Scheduler 不等於執行緒。我們將在後面詳細瞭解這對每個 Scheduler 意味著什麼。讓我們從事件流的角度來看 Scheduler 的概念:

Scheduler.png

我們在上圖中看到的內容:

  • 在主 (UI) 執行緒上發生使用者操作,如按鈕按下;

  • 它會觸發一些工作在 Background Scheduler 上進行處理;

  • 要顯示的最終資料在主執行緒上傳遞給 Subscriber,Subscriber 可以更新 UI。

我們可以看到 Scheduler 的概念深深植根於前臺/後臺執行的概念。此外,根據我們選擇的實現,工作可以序列化或並行化。

因此,要全面瞭解 Scheduler,需要檢視哪些類符合 Scheduler 協議。首先,我們需要了解與 Scheduler 相關的兩個重要 Operator。

Scheduler Operator

Combine 提供了兩個基本的 Operator 來使用 Scheduler:

  • subscribe(on:)subscribe(on:options:) 在指定的 Scheduler 上建立 Subscription(開始工作);
  • receive(on:)receive(on:options:) 在指定的 Scheduler 上傳遞值。

此外,以下 Operator 將 Scheduler 和 Scheduler options 作為引數:

  • debounce(for:scheduler:options:)

  • delay(for:tolerance:scheduler:options:)

  • measureInterval(using:options:)

  • throttle(for:scheduler:latest:)

  • timeout(_:scheduler:options:customError:)

subscribe(on:)receive(on:)

在我們訂閱它之前,Publisher 是一個無生命的實體。但是當我們訂閱 Publisher 時會發生什麼?有幾個步驟:

subscribe.png

  1. Publiser receive Subscriber 並建立 Subscription;
  2. Subscriber receive Subscription 並從 Publiser 請求值(虛線);
  3. Publiser 開始工作(通過 Subscription);
  4. Publiser 發出值(通過 Subscription);
  5. Operator 轉換值;
  6. Subscriber 收到最終值。

當代碼訂閱 Publiser 時,步驟一、二和三通常發生在當前執行緒上。 但是當我們使用 subscribe(on:) Operator 時,所有這些操作都在我們指定的 Scheduler 上執行。

我們可能希望 Publiser 在後臺執行一些昂貴的計算以避免阻塞主執行緒。 執行此操作的簡單方法是使用 subscribe(on:)。以下是虛擬碼:

swift let queue = DispatchQueue(label: "serial queue") let subscription = publisher .subscribe(on: queue) .sink { value in ...

如果我們收到值後,想更新一些 UI 怎麼辦?我們可以在閉包中執行類似 DispatchQueue.main.async { ... } 的操作,從主執行緒執行 UI 更新。有一種更有效的方法可以使用 Combine 的 receive(on:):

swift let subscription = publisher .subscribe(on: queue) .receive(on: DispatchQueue.main) .sink { value in ...

即使計算工作正常並從後臺執行緒發出結果,我們現在也可以保證始終在主佇列上接收值。這是安全地執行 UI 更新所需要的。

Scheduler 實現

Apple 提供了幾種 Scheduler 協議的具體實現:

  • ImmediateScheduler:一個簡單的 Scheduler,它立即在當前執行緒上執行程式碼,這是預設的執行上下文,除非使用 subscribe(on:)receive(on:) 或任何其他將 Scheduler 作為引數的 Operator 進行修改。
  • RunLoop:繫結到 Foundation 的 Thread 物件。
  • DispatchQueue:可以是序列的或併發的。
  • OperationQueue:規範工作項執行的佇列。

這裡省略了 TestScheduler,是一個虛擬的、模擬的 Scheduler,它是任何響應式程式設計框架測試時不可或缺的一部分。

ImmediateScheduler

在 Playground 中新增程式碼:

```swift example("ImmediateScheduler") { let source = Timer .publish(every: 1.0, on: .main, in: .common) .autoconnect() .scan(0) { counter, _ in counter + 1 }

let publisher = source
    .receive(on: ImmediateScheduler.shared)
    .eraseToAnyPublisher()

publisher.sink(receiveValue: { _ in
    print(Thread.current)
})
.store(in: &subscriptions)

} ```

執行 Playground,我們會看到 Publisher 發出的每個值,都是在 MainThread 上:

--- ImmediateScheduler --- <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main} <_NSMainThread: 0x129617390>{number = 1, name = main}

當前執行緒是主執行緒, ImmediateScheduler 立即在當前執行緒上排程。當我們在 .receive(on: ImmediateScheduler.shared) 前新增一行:

swift .receive(on: DispatchQueue.global())

執行 Playground,我們將在不同的執行緒收到值:

--- ImmediateScheduler --- <NSThread: 0x12e7286c0>{number = 4, name = (null)} <NSThread: 0x12e7286c0>{number = 4, name = (null)} <NSThread: 0x11f005310>{number = 2, name = (null)} <NSThread: 0x11f005310>{number = 2, name = (null)} <NSThread: 0x12e7286c0>{number = 4, name = (null)}

ImmediateScheduler options 由於大多數 Operator 在其引數中接受 Scheduler,我們還可以找到一個接受 SchedulerOptions 值的引數。在 ImmediateScheduler 的情況下,此型別被定義為 Never,因此在使用 ImmediateScheduler 時,我們永遠不應該為 Operator 的 options 引數傳遞值。

ImmediateScheduler 的陷阱 關於 ImmediateScheduler 的一件事是它是即時的。我們無法使用 Scheduler 協議的任何 schedule(after:) 變體,因為我們需要指定的 SchedulerTimeType 沒有初始化方法,對於 ImmediateScheduler 無意義。

RunLoop scheduler

RunLoop 早於 DispatchQueue,它是一種線上程級別管理輸入源的方法。主執行緒有一個關聯的 RunLoop,我們還可以通過從當前執行緒呼叫 RunLoop.current 為任何執行緒獲取一個 RunLoop。

在 Playground 中新增此程式碼:

```swift example("RunLoop") { let source = Timer .publish(every: 1.0, on: .main, in: .common) .autoconnect() .scan(0) { counter, _ in counter + 1 }

let publisher = source
    .receive(on: DispatchQueue.global())
    .handleEvents(receiveOutput: { _ in
        print("DispatchQueue.global: \(Thread.current)")
    })
    .receive(on: RunLoop.current)
    .handleEvents(receiveOutput: { _ in
        print("RunLoop.current: \(Thread.current)")
    })
    .eraseToAnyPublisher()

publisher.sink(receiveValue: { _ in
})
.store(in: &subscriptions)

} ```

當前 RunLoop.current 就是主執行緒的 RunLoop。執行 Playground:

--- RunLoop --- DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)} RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main} DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)} RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main} DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)} RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}

每發出一個值,都通過一個全域性併發佇列的執行緒,然後在主執行緒上繼續。

RunLoop OptionsImmediateScheduler 一樣,RunLoop 不提供 SchedulerOptions 引數。

RunLoop 陷阱 RunLoop 的使用應僅限於主執行緒的 RunLoop,以及我們在需要時控制的 Foundation 執行緒中可用的 RunLoop。要避免的一個是在 DispatchQueue 上執行的程式碼中使用 RunLoop.current。這是因為 DispatchQueue 執行緒可能是短暫的,這使得它們幾乎不可能依賴 RunLoop。

DispatchQueue Scheduler

DispatchQueue 符合 Scheduler 協議,並且完全可用於所有將 Scheduler 作為引數的 Operator。Dispatch 框架是 Foundation 的一個強大元件,它允許我們通過向系統管理的排程佇列提交工作來在多核硬體上同時執行程式碼。DispatchQueue 可以是序列的(預設)或併發的。序列佇列按順序執行你提供給它的所有工作項。併發佇列將並行啟動多個工作項,以最大限度地提高 CPU 使用率:

  • 序列佇列通常用於保證某些操作不重疊。因此,如果所有操作都發生在同一個佇列中,他們可以使用共享資源而無需加鎖。
  • 併發佇列將同時執行儘可能多的操作。因此,它更適合純計算。

我們一直使用的最熟悉的佇列是 DispatchQueue.main。它直接對映到主執行緒,在這個佇列上執行的所有操作都可以自由地更新使用者介面。 當然,UI 更新只能在主執行緒進行。所有其他佇列,無論是序列的還是併發的,都在系統管理的執行緒池中執行它們的程式碼。這意味著我們永遠不應該對佇列中執行的程式碼中的當前執行緒做出任何假設。尤其不應使用 RunLoop.current 來安排工作,因為 DispatchQueue 管理其執行緒的方式有不同。

所有排程佇列共享同一個執行緒池,執行的序列佇列將使用該池中的任何可用執行緒。一個直接的結果是,來自同一佇列的兩個連續工作項可能使用不同的執行緒,但仍可以按順序執行。這是一個重要的區別:當使用 subscribe(on:)receive(on:) 或任何其他有 Scheduler 引數的 Operator 時,我們永遠不應假設執行緒每次都是相同的。

在 Playground 中新增程式碼:

```swift example("DispatchQueue") { let source = PassthroughSubject() let sourceQueue = DispatchQueue.main let subscription = sourceQueue.schedule(after: sourceQueue.now, interval: .seconds(1)) { source.send() } .store(in: &subscriptions)

let serialQueue = DispatchQueue(label: "Serial queue")
source
    .handleEvents(receiveOutput: { _ in
        print("\(Thread.current)")
    })
    .receive(on: serialQueue)
    .handleEvents(receiveOutput: { _ in
        print("\(Thread.current)")
    })
    .sink(receiveValue: { _ in
    })
    .store(in: &subscriptions)

} ```

Timer 在主佇列 sourceQueue 上觸發並通過 source 傳送 Void 值。接著在序列佇列 serialQueue 上接收值:

--- DispatchQueue --- <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x128025cd0>{number = 2, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x1178243e0>{number = 6, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x117904d90>{number = 5, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x1178243e0>{number = 6, name = (null)} <_NSMainThread: 0x126f0a250>{number = 1, name = main} <NSThread: 0x1178243e0>{number = 6, name = (null)}

將 sourceQueue 也改為 DispatchQueue(label: "Serial queue"),也將在全域性併發佇列上發出值:

--- DispatchQueue --- <NSThread: 0x137e275b0>{number = 6, name = (null)} <NSThread: 0x130905310>{number = 2, name = (null)} <NSThread: 0x130905310>{number = 2, name = (null)} <NSThread: 0x130905310>{number = 2, name = (null)} <NSThread: 0x127e0f400>{number = 4, name = (null)} <NSThread: 0x137e275b0>{number = 6, name = (null)}

DispatchQueue Options DispatchQueue 是唯一提供一組 Options 的 Scheduler,當 Operator 需要 SchedulerOptions 引數時,我們可以傳遞這些 Options。主要圍繞 QoS(服務質量)值,獨立於 DispatchQueue 上已設定的值。例如:

swift .receive( on: serialQueue, options: DispatchQueue.SchedulerOptions(qos: .userInteractive) )

我們將 DispatchQueue.SchedulerOptions 的例項傳遞.userInteractive。在實際開發中使用這些 Options 有助於作業系統決定在同時有許多佇列忙碌的情況下首先安排哪個任務。

OperationQueue Scheduler

由於 OperationQueue 在內部使用 Dispatch,因此在表面上幾乎沒有區別:

swift example("OperationQueue") { let queue = OperationQueue() let subscription = (1...10).publisher .receive(on: queue) .print() .sink { value in print("Received \(value)") } .store(in: &subscriptions) }

建立一個簡單的 Publisher 發出 1 到 10 之間的數字,然後列印該值,執行 Playground:

--- OperationQueue --- receive subscription: (ReceiveOn) request unlimited receive value: (1) Received 1 receive value: (8) Received 8 receive value: (9) Received 9 receive value: (6) Received 6 receive value: (3) Received 3 receive value: (5) Received 5 receive finished receive value: (10) receive value: (4) receive value: (7) receive value: (2)

按順序發出但無序到達!我們可以更改列印行以顯示當前執行緒:

swift print("Received \(value) on thread \(Thread.current)")

再次執行 Playground:

--- OperationQueue --- receive subscription: (ReceiveOn) request unlimited receive value: (4) Received 4 on thread <NSThread: 0x14d720980>{number = 2, name = (null)} receive value: (10) Received 10 on thread <NSThread: 0x14d720980>{number = 2, name = (null)} receive value: (3) Received 3 on thread <NSThread: 0x14e833620>{number = 6, name = (null)} receive value: (5) Received 5 on thread <NSThread: 0x14e80dfd0>{number = 4, name = (null)} receive value: (1) Received 1 on thread <NSThread: 0x14d70d840>{number = 5, name = (null)} receive finished receive value: (2) receive value: (9) receive value: (8) receive value: (6)

每個值都是在不同的執行緒上接收的!如果我們檢視有關 OperationQueue 的文件,有一條關於執行緒的說明,OperationQueue 使用 Dispatch 框架(因此是 DispatchQueue)來執行操作。這意味著它不保證它會為每個交付的值使用相同的底層執行緒。

此外,每個 OperationQueue 中都有一個引數可以解釋一切:它是 maxConcurrentOperationCount。它預設為系統定義的數字,允許操作佇列同時執行大量操作。由於 Publisher 幾乎在同一時間發出所有值,它們被 Dispatch 的併發佇列分派到多個執行緒。

對程式碼進行一些修改:

swift queue.maxConcurrentOperationCount = 1

再次執行 Playground:

--- OperationQueue --- receive subscription: (ReceiveOn) request unlimited receive value: (1) Received 1 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (2) Received 2 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (3) Received 3 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (4) Received 4 on thread <NSThread: 0x117609390>{number = 4, name = (null)} receive value: (5) Received 5 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (6) Received 6 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (7) Received 7 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (8) Received 8 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (9) Received 9 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive value: (10) Received 10 on thread <NSThread: 0x117627160>{number = 6, name = (null)} receive finished

這一次,我們將獲得真正的順序執行——將 maxConcurrentOperationCount 設定為 1 相當於使用序列佇列。

OperationQueue Options OperationQueue 沒有可用的 SchedulerOptions。它實際上是 RunLoop.SchedulerOptions 型別,本身沒有提供任何 Options。

OperationQueue 陷阱 我們剛剛看到 OperationQueue 預設併發執行操作,我們需要非常清楚這一點,因為它可能會給我們帶來麻煩。當我們的 Publisher 發出值時都有大量工作要執行時,它可能是一個很好的工具。我們可以通過調整 maxConcurrentOperationCount 引數來控制負載。

內容參考