Combine | (V) Combine 中的錯誤處理和 Scheduler
錯誤處理
到目前為止,在我們編寫的大部分程式碼中,我們沒有處理錯誤,而處理的都是“happy path”。在前面的文章中,我們瞭解到,Combine Publisher 聲明瞭兩個約束:
Output
定義 Publisher 發出的值的型別;Failure
定義 Publisher 發出的失敗的型別。
現在,我們將深入瞭解 Failure
在 Publisher 中的作用。
錯誤
Never
失敗型別為 Never
的 Publisher 表示永遠不會發出失敗。它為這些 Publisher 提供了強大的保證。這類 Publisher 可讓我們專注於使用值,同時絕對確保 Publisher 只有成功完成的事件。
在新的 Playground 頁面新增以下程式碼:
import Combine
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
func example(_ desc: String, _ action:() -> Void) {
print("--- (desc) ---")
action()
}
var subscriptions = Set<AnyCancellable>()
example("Just") {
Just("Hello")
}
我們建立了一個帶有 Hello
字串值的 Just
。 Just 是不會發出失敗的。 請按住 Command 並單擊 Just 初始化程式並選擇 Jump to Definition,檢視定義:
In contrast with Result.Publisher, a Just publisher can’t fail with an error. And unlike Optional.Publisher, a Just publisher always produces a value.
Combine 對 Never
的障保證不僅是理論上的,而是深深植根於框架及其各種 API 中。Combine 提供了幾個 Operator,這些 Operator 僅在保證 Publisher 永遠不會發出失敗事件時才可用。第一個是 sink
的變體,只處理值:
example("Just") {
Just("Hello")
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)
}
在上面的示例中,我們使用 sink(receiveValue:)
,這種特定的過載使我們可以忽略 Publisher 的完成事件,而只處理其發出的值。
此過載僅適用於這類“可靠”的 Publisher。在錯誤處理方面,Combine 是智慧且安全的,如果可能丟擲錯誤,它會強制我們處理完成事件。要看到這一點,我們需要將 Never
的 Publisher 變成可能發出失敗事件的 Publisher。
setFailureType(to:)
func setFailureType<E>(to failureType: E.Type) -> Publishers.SetFailureType<Self, E> where E : Error
將 Never
Publisher 轉變為可能發出失敗事件的 Publisher 的第一種方法是使用 setFailureType
。這是另一個僅適用於失敗型別為 Never 的 Publisher 的 Operator:
example("setFailureType") {
Just("Hello")
.setFailureType(to: MyError.self)
}
可以使用 .eraseToAnyPublisher()
,來確認已改變的 Publisher 型別:
繼續修改上述程式碼:
enum MyError: Error {
case ohNo
}
example("setFailureType") {
Just("Hello")
.setFailureType(to: MyError.self)
.sink(
receiveCompletion: { completion in
switch completion {
case .failure(.ohNo):
print("Finished with OhNo!")
case .finished:
print("Finished successfully!")
}
},
receiveValue: { value in
print("Got value: (value)")
}
)
.store(in: &subscriptions)
}
現在我們只能使用 sink(receiveCompletion:receiveValue:)
。 sink(receiveValue:)
過載不再可用,因為此 Publisher 可能會發出失敗事件。可以嘗試註釋掉 receiveCompletion
檢視編譯錯誤。
此外,失敗型別為為 MyError
,這使我們可以針對.failure(.ohNo)
情況而無需進行不必要的強制轉換來處理該錯誤。
當然,setFailureType
的作用只是型別定義。 由於原始 Publisher 是 Just
,因此實際上也不會引發任何錯誤。
assign(to:on:)
assign
Operator 僅適用於不會發出失敗事件的 Publisher,與 setFailureType
相同。 向提供的 keypath 傳送錯誤會導致未定義的行為。新增以下示例進行測試:
example("assign(to:on:)") {
class Person {
var name = "Unknown"
}
let person = Person()
print(person.name)
Just("Layer")
.handleEvents(
receiveCompletion: { _ in
print(person.name)
}
)
.assign(to: .name, on: person)
.store(in: &subscriptions)
}
我們定義一個具有 name
屬性的 Person
類。建立一個 Person
例項並立即列印其 name
。一旦 Publisher 傳送完成事件,使用 handleEvents
再次列印此 name
。最後,使用 assign
將 name
設定為 Publisher 發出的值:
--- assign(to:on:) ---
Unknown
Layer
在 Just("Layer")
正下方新增以下行:
.setFailureType(to: Error.self)
這意味著它不再是 Publisher<String, Never>
,而是現在的 Publisher<String, Error>
。執行 Playground,我們將進行驗證:
Referencing instance method 'assign(to:on:)' on 'Publisher' requires the types 'any Error' and 'Never' be equivalent
assign(to:)
assign(to:on:)
有一個棘手的部分——它會 strong 捕獲提供給 on 引數的物件。在上一個示例之後新增以下程式碼:
example("assign(to:)") {
class MyViewModel: ObservableObject {
@Published var currentDate = Date()
init() {
Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.prefix(3)
.assign(to: .currentDate, on: self)
.store(in: &subscriptions)
}
}
let vm = MyViewModel()
vm.$currentDate
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)
}
我們 MyViewModel
中定義一個 @Published
屬性。 它的初始值為當前日期。在 init
中建立一個 Timer Publisher,它每秒發出當前日期。使用 prefix
Operator 只接受 3 個更新。使用 assign(to:on:)
將每個日期更新給@Published
屬性。例項化 MyViewModel
,sink
vm.$currentDate
,並打印出每個值:
--- assign(to:) ---
2022-12-24 07:32:33 +0000
2022-12-24 07:32:34 +0000
2022-12-24 07:32:35 +0000
2022-12-24 07:32:36 +0000
看起來一切都很好。但是對assign(to:on:)
的呼叫建立了一個 strong 持有 self 的 Subscription。 導致 self 掛在Subscription 上,而 Subscription 掛在 self 上,建立了一個導致記憶體洩漏的引用迴圈。
因此引入了該 Operator 的另一個過載 assign(to:)
。該 Operator 通過對 Publisher 的 inout 引用來將值分配給 @Published
屬性。因此以下兩行:
.assign(to: .currentDate, on: self)
.store(in: &subscriptions)
可以被替換為:
.assign(to: &$currentDate)
使用 assign(to:)
Operator 將 inout 引用 Publisher 會打破引用迴圈。此外,它會在內部自動處理 Subscription 的記憶體管理,這樣我們就可以省略 store(in: &subscriptions)
。
assertNoFailure(_:file:line:)
當我們在開發過程確認 Publisher 以失敗事件完成時,assertNoFailure
Operator 非常有用。它不會阻止上游發出失敗事件。但是,如果它檢測到錯誤,它會因錯誤而崩潰:
example("assertNoFailure") {
Just("Hello")
.setFailureType(to: MyError.self)
.assertNoFailure()
.sink(receiveValue: { print("Got value: ($0) ")})
.store(in: &subscriptions)
}
我們使用 Just
建立一個“可靠”的 Publisher 並將其錯誤型別設定為 MyError
。如果 Publisher 以錯誤事件完成,則使用 assertNoFailure
以崩潰。這會將 Publisher 的失敗型別轉回 Never。使用 sink
打印出任何接收到的值。請注意,由於 assertNoFailure
將失敗型別設定回 Never
,因此 sink(receiveValue:)
過載可以直接使用。
執行 Playground,它可以正常工作:
--- assertNoFailure ---
Got value: Hello
在 setFailureType
之後,新增以下行:
.tryMap { _ in throw MyError.ohNo }
一旦 Hello 被推送到下游,使用 tryMap
丟擲錯誤。再次執行 Playground:
Playground execution failed:
error: Execution was interrupted, reason: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0).
...
frame #0: 0x00007fff232fbbf2 Combine`Combine.Publishers.AssertNoFailure...
由於 Publisher 發出失敗事件,playground 會 crash。 在某種程度上,我們可以將 assertNoFailure()
視為程式碼的保護機制。 雖然我們不應該在生產環境中使用它,但在開發過程中提前發現問題非常有用。
處理錯誤
try* Operator
Combine 提供了一個區分可能引發錯誤和可能不會引發錯誤的 Operator 的方法:try
字首。
注意:Combine 中所有以 try 為字首的 Operator 在遇到錯誤時的行為相同。我們將只在本章中嘗試使用 tryMap Operator。
example("tryMap") {
enum NameError: Error {
case tooShort(String)
case unknown
}
["Aaaa", "Bbbbb", "Cccccc"]
.publisher
.map { value in
return value.count
}
.sink(
receiveCompletion: { print("Completed with ($0)") },
receiveValue: { print("Got value: ($0)") }
)
}
在上面的示例中,我們定義一個 NameError
錯誤列舉。建立釋出三個字串的 Publisher。將每個字串對映到它的長度。執行示例並檢視控制檯輸出:
--- tryMap ---
Got value: 4
Got value: 5
Got value: 6
Completed with finished
將上面示例中的 map 替換為以下內容:
.tryMap { value -> Int in
let length = value.count
guard length >= 5 else {
throw NameError.tooShort(value)
}
return value.count
}
我們檢查字串的長度是否大於等於 5。否則,我們會丟擲錯誤:
--- tryMap ---
Completed with failure(Page_Contents.(unknown context at $10e3cb984).(unknown context at $10e3cba6c).(unknown context at $10e3cbaa8).NameError.tooShort("Aaaa"))
對映錯誤
map
和 tryMap
之間的區別不僅僅是後者允許丟擲錯誤。 map
繼承了現有的失敗型別並且只操作 Publisher 的值,但 tryMap
沒有——它實際上將錯誤型別擦除為普通的 Swift 錯誤。 與帶有 try 字首的所有 Operator 都是如此。
example("map vs tryMap") {
enum NameError: Error {
case tooShort(String)
case unknown
}
Just("Hello")
.setFailureType(to: NameError.self)
.map { $0 + " World!" }
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("Done!")
case .failure(.tooShort(let name)):
print("(name) is too short!")
case .failure(.unknown):
print("An unknown name error occurred")
}
},
receiveValue: { print("Got value ($0)") }
)
.store(in: &subscriptions)
}
我們定義一個用於此示例的 NameError
。建立一個只發出字串 Hello
的 Just
。使用 setFailureType
設定失敗型別為 NameError
。使用 map
將另一個字串附加。最後,使用 sink
的 receiveCompletion
為 NameError
的每個情況打印出適當的訊息。執行 Playground:
--- map vs tryMap ---
Got value Hello World!
Done!
Completion
的失敗型別是 NameError
,這正是我們想要的。 setFailureType
允許我們專門針對 NameError
進行處理,例如 failure(.tooShort(let name))
。
將 map
更改為 tryMap
。
.tryMap { throw NameError.tooShort($0) }
我們會立即注意到 Playground 不再編譯。 再次點選 completion
:
tryMap
刪除了我們的型別錯誤並將其替換為通用 Swift.Error
型別。即使我們實際上並沒有從 tryMap
中丟擲錯誤,也會發生這種情況。
原因很簡單:Swift 還不支援型別化 throws
,儘管自 2015 年以來 Swift Evolution 中一直在討論這個主題。這意味著當我們使用帶有 try 字首的 Operator 時,我們的錯誤型別將總是被抹去到最常見的父類:Swift.Error
。
一種方法是將通用錯誤手動轉換為特定的錯誤型別,但這不是最理想的。它打破了嚴格型別錯誤的整個目的。幸運的是,Combine 為這個問題提供了一個很好的解決方案,稱為 mapError
。
在呼叫 tryMap 之後,新增以下行:
.mapError { $0 as? NameError ?? .unknown }
mapError
接收上游 Publisher 丟擲的任何錯誤,並將其對映到我們想要的任何錯誤。在這種情況下,我們可以利用它將錯誤轉換回 NameError
。這會將 Failure
恢復為所需要的型別,並將我們的 Publisher 轉回 Publisher<String, NameError>
。構建並執行 Playground,最終可以按預期編譯和工作:
--- map vs tryMap ---
Hello is too short!
捕獲錯誤並重試
很多時候,當我們請求資源或執行某些計算時,失敗可能是由於網路不穩定或其他資源不可用而導致的一次性事件。
在這些情況下,我們通常會編寫一個機制來重試不同的工作,跟蹤嘗試次數,並處理如果所有嘗試都失敗的情況。Combine 讓這一切變得非常簡單。
retry
Operator 接受一個數字。如果 Publisher 失敗,它將重新訂閱上游並重試至我們指定的次數。如果所有重試都失敗,它將錯誤推送到下游,就像沒有 retry
Operator 一樣:
example("Catching and retrying") {
enum MyError: Error {
case network
}
var service1 = PassthroughSubject<Int, MyError>()
service1.send(completion: .failure(.network))
service1
.handleEvents(
receiveSubscription: { _ in print("Trying ...") },
receiveCompletion: {
guard case .failure(let error) = $0 else { return }
print("Got error: (error)")
}
)
.retry(3)
.sink(
receiveCompletion: { print("($0)") },
receiveValue: { number in
print("Got Number: (number)")
}
)
.store(in: &subscriptions)
}
我們有一個 service1
,它發出了失敗事件。因此,訂閱 service1
肯定會獲得失敗事件。我們嘗試三次,並通過 handleEvents
列印訂閱和完成:
--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
failure(Page_Contents.(unknown context at $10fc7b584).(unknown context at $10fc7b77c).(unknown context at $10fc7b7b8).MyError.network)
執行 Playerground,我們會看到有四次 Trying。初始 Trying,加上由 retry
Operator 觸發的三次重試。 由於 service1
不斷失敗,因此 Operator 會耗盡所有重試嘗試並將錯誤推送到 sink
。
調整程式碼:
example("Catching and retrying") {
enum MyError: Error {
case network
}
var service1 = PassthroughSubject<Int, MyError>()
service1.send(completion: .failure(.network))
service1
.handleEvents(
receiveSubscription: { _ in print("Trying ...") },
receiveCompletion: {
guard case .failure(let error) = $0 else { return }
print("Got error: (error)")
}
)
.retry(3)
.replaceError(with: 1)
.sink(
receiveCompletion: { print("($0)") },
receiveValue: { number in
print("Got Number: (number)")
}
)
.store(in: &subscriptions)
}
在 service1
重試後,若還是失敗,我們將通過 replaceError
將失敗替換為 1:
--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 1
finished
或者,我們可以使用 catch
捕獲 service1
的失敗,併為下游提供另一個 Publisher:
example("Catching and retrying") {
enum MyError: Error {
case network
}
var service1 = PassthroughSubject<Int, MyError>()
service1.send(completion: .failure(.network))
var service2 = PassthroughSubject<Int, MyError>()
service1
.handleEvents(
receiveSubscription: { _ in print("Trying ...") },
receiveCompletion: {
guard case .failure(let error) = $0 else { return }
print("Got error: (error)")
}
)
.retry(3)
.catch { error in
return service2
}
.sink(
receiveCompletion: { print("($0)") },
receiveValue: { number in
print("Got Number: (number)")
}
)
.store(in: &subscriptions)
service2.send(2)
service2.send(completion: .finished)
}
此時,下游將獲得到 service2
發出的值 2 和完成事件:
--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 2
finished
cheduler
我們已經遇到了一些將 Scheduler 作為引數的 Operator。大多數情況下,我們會簡單地使用 DispatchQueue.main
,因為它方便、易於理解。除了 DispatchQueue.main
,我們肯定已經使用了全域性併發佇列,或建立一個序列排程佇列來執行操作。
但是為什麼 Combine 需要一個新的類似概念呢?我們接著將瞭解為什麼會出現 Scheduler 的概念,將探索 Combine 如何使非同步事件和操作更易於使用,當然,我們還會試使用 Combine 提供的所有 Scheduler。
Scheduler 簡介
根據 Apple 的文件,Scheduler 是一種定義何時及如何執行閉包的協議。Scheduler 提供上下文以儘快或在將來的某個事件執行未來的操作。該操作就是協議本身中定義的閉包。閉包也可以隱藏 Publisher 在特定 Scheduler 上執行的某些值的傳遞。
我們會注意到此定義有意避免對執行緒的任何引用,這是因為具體的實現是在 Scheduler 協議中,提供的“上下文”中的。因此,我們的程式碼將在哪個執行緒上執行取決於選擇的 Scheduler。
記住這個重要的概念:Scheduler 不等於執行緒。我們將在後面詳細瞭解這對每個 Scheduler 意味著什麼。讓我們從事件流的角度來看 Scheduler 的概念:
我們在上圖中看到的內容:
-
在主 (UI) 執行緒上發生使用者操作,如按鈕按下;
-
它會觸發一些工作在 Background Scheduler 上進行處理;
-
要顯示的最終資料在主執行緒上傳遞給 Subscriber,Subscriber 可以更新 UI。
我們可以看到 Scheduler 的概念深深植根於前臺/後臺執行的概念。此外,根據我們選擇的實現,工作可以序列化或並行化。
因此,要全面瞭解 Scheduler,需要檢視哪些類符合 Scheduler 協議。首先,我們需要了解與 Scheduler 相關的兩個重要 Operator。
Scheduler Operator
Combine 提供了兩個基本的 Operator 來使用 Scheduler:
subscribe(on:)
和subscribe(on:options:)
在指定的 Scheduler 上建立 Subscription(開始工作);receive(on:)
和receive(on:options:)
在指定的 Scheduler 上傳遞值。
此外,以下 Operator 將 Scheduler 和 Scheduler options 作為引數:
-
debounce(for:scheduler:options:)
-
delay(for:tolerance:scheduler:options:)
-
measureInterval(using:options:)
-
throttle(for:scheduler:latest:)
-
timeout(_:scheduler:options:customError:)
subscribe(on:)
和 receive(on:)
在我們訂閱它之前,Publisher 是一個無生命的實體。但是當我們訂閱 Publisher 時會發生什麼?有幾個步驟:
- Publiser
receive
Subscriber 並建立 Subscription; - Subscriber
receive
Subscription 並從 Publiser 請求值(虛線); - Publiser 開始工作(通過 Subscription);
- Publiser 發出值(通過 Subscription);
- Operator 轉換值;
- Subscriber 收到最終值。
當代碼訂閱 Publiser 時,步驟一、二和三通常發生在當前執行緒上。 但是當我們使用 subscribe(on:)
Operator 時,所有這些操作都在我們指定的 Scheduler 上執行。
我們可能希望 Publiser 在後臺執行一些昂貴的計算以避免阻塞主執行緒。 執行此操作的簡單方法是使用 subscribe(on:)
。以下是虛擬碼:
swift
let queue = DispatchQueue(label: "serial queue")
let subscription = publisher
.subscribe(on: queue)
.sink { value in ...
如果我們收到值後,想更新一些 UI 怎麼辦?我們可以在閉包中執行類似 DispatchQueue.main.async { ... }
的操作,從主執行緒執行 UI 更新。有一種更有效的方法可以使用 Combine 的 receive(on:)
:
swift
let subscription = publisher
.subscribe(on: queue)
.receive(on: DispatchQueue.main)
.sink { value in ...
即使計算工作正常並從後臺執行緒發出結果,我們現在也可以保證始終在主佇列上接收值。這是安全地執行 UI 更新所需要的。
Scheduler 實現
Apple 提供了幾種 Scheduler 協議的具體實現:
ImmediateScheduler
:一個簡單的 Scheduler,它立即在當前執行緒上執行程式碼,這是預設的執行上下文,除非使用subscribe(on:)
、receive(on:)
或任何其他將 Scheduler 作為引數的 Operator 進行修改。RunLoop
:繫結到 Foundation 的 Thread 物件。DispatchQueue
:可以是序列的或併發的。OperationQueue
:規範工作項執行的佇列。
這裡省略了
TestScheduler
,是一個虛擬的、模擬的 Scheduler,它是任何響應式程式設計框架測試時不可或缺的一部分。
ImmediateScheduler
在 Playground 中新增程式碼:
```swift example("ImmediateScheduler") { let source = Timer .publish(every: 1.0, on: .main, in: .common) .autoconnect() .scan(0) { counter, _ in counter + 1 }
let publisher = source
.receive(on: ImmediateScheduler.shared)
.eraseToAnyPublisher()
publisher.sink(receiveValue: { _ in
print(Thread.current)
})
.store(in: &subscriptions)
} ```
執行 Playground,我們會看到 Publisher 發出的每個值,都是在 MainThread
上:
--- ImmediateScheduler ---
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
當前執行緒是主執行緒, ImmediateScheduler
立即在當前執行緒上排程。當我們在 .receive(on: ImmediateScheduler.shared)
前新增一行:
swift
.receive(on: DispatchQueue.global())
執行 Playground,我們將在不同的執行緒收到值:
--- ImmediateScheduler ---
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
ImmediateScheduler options 由於大多數 Operator 在其引數中接受 Scheduler,我們還可以找到一個接受 SchedulerOptions
值的引數。在 ImmediateScheduler
的情況下,此型別被定義為 Never
,因此在使用 ImmediateScheduler
時,我們永遠不應該為 Operator 的 options 引數傳遞值。
ImmediateScheduler 的陷阱 關於 ImmediateScheduler
的一件事是它是即時的。我們無法使用 Scheduler 協議的任何 schedule(after:)
變體,因為我們需要指定的 SchedulerTimeType
沒有初始化方法,對於 ImmediateScheduler
無意義。
RunLoop scheduler
RunLoop 早於 DispatchQueue,它是一種線上程級別管理輸入源的方法。主執行緒有一個關聯的 RunLoop,我們還可以通過從當前執行緒呼叫 RunLoop.current
為任何執行緒獲取一個 RunLoop。
在 Playground 中新增此程式碼:
```swift example("RunLoop") { let source = Timer .publish(every: 1.0, on: .main, in: .common) .autoconnect() .scan(0) { counter, _ in counter + 1 }
let publisher = source
.receive(on: DispatchQueue.global())
.handleEvents(receiveOutput: { _ in
print("DispatchQueue.global: \(Thread.current)")
})
.receive(on: RunLoop.current)
.handleEvents(receiveOutput: { _ in
print("RunLoop.current: \(Thread.current)")
})
.eraseToAnyPublisher()
publisher.sink(receiveValue: { _ in
})
.store(in: &subscriptions)
} ```
當前 RunLoop.current 就是主執行緒的 RunLoop。執行 Playground:
--- RunLoop ---
DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)}
RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}
DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)}
RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}
DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)}
RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}
每發出一個值,都通過一個全域性併發佇列的執行緒,然後在主執行緒上繼續。
RunLoop Options 與 ImmediateScheduler
一樣,RunLoop 不提供 SchedulerOptions
引數。
RunLoop 陷阱 RunLoop 的使用應僅限於主執行緒的 RunLoop,以及我們在需要時控制的 Foundation 執行緒中可用的 RunLoop。要避免的一個是在 DispatchQueue 上執行的程式碼中使用 RunLoop.current。這是因為 DispatchQueue 執行緒可能是短暫的,這使得它們幾乎不可能依賴 RunLoop。
DispatchQueue Scheduler
DispatchQueue 符合 Scheduler 協議,並且完全可用於所有將 Scheduler 作為引數的 Operator。Dispatch 框架是 Foundation 的一個強大元件,它允許我們通過向系統管理的排程佇列提交工作來在多核硬體上同時執行程式碼。DispatchQueue 可以是序列的(預設)或併發的。序列佇列按順序執行你提供給它的所有工作項。併發佇列將並行啟動多個工作項,以最大限度地提高 CPU 使用率:
- 序列佇列通常用於保證某些操作不重疊。因此,如果所有操作都發生在同一個佇列中,他們可以使用共享資源而無需加鎖。
- 併發佇列將同時執行儘可能多的操作。因此,它更適合純計算。
我們一直使用的最熟悉的佇列是 DispatchQueue.main。它直接對映到主執行緒,在這個佇列上執行的所有操作都可以自由地更新使用者介面。 當然,UI 更新只能在主執行緒進行。所有其他佇列,無論是序列的還是併發的,都在系統管理的執行緒池中執行它們的程式碼。這意味著我們永遠不應該對佇列中執行的程式碼中的當前執行緒做出任何假設。尤其不應使用 RunLoop.current 來安排工作,因為 DispatchQueue 管理其執行緒的方式有不同。
所有排程佇列共享同一個執行緒池,執行的序列佇列將使用該池中的任何可用執行緒。一個直接的結果是,來自同一佇列的兩個連續工作項可能使用不同的執行緒,但仍可以按順序執行。這是一個重要的區別:當使用 subscribe(on:)
、receive(on:)
或任何其他有 Scheduler 引數的 Operator 時,我們永遠不應假設執行緒每次都是相同的。
在 Playground 中新增程式碼:
```swift
example("DispatchQueue") {
let source = PassthroughSubject
let serialQueue = DispatchQueue(label: "Serial queue")
source
.handleEvents(receiveOutput: { _ in
print("\(Thread.current)")
})
.receive(on: serialQueue)
.handleEvents(receiveOutput: { _ in
print("\(Thread.current)")
})
.sink(receiveValue: { _ in
})
.store(in: &subscriptions)
} ```
Timer 在主佇列 sourceQueue
上觸發並通過 source
傳送 Void 值。接著在序列佇列 serialQueue
上接收值:
--- DispatchQueue ---
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x128025cd0>{number = 2, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x117904d90>{number = 5, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
將 sourceQueue 也改為 DispatchQueue(label: "Serial queue")
,也將在全域性併發佇列上發出值:
--- DispatchQueue ---
<NSThread: 0x137e275b0>{number = 6, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x127e0f400>{number = 4, name = (null)}
<NSThread: 0x137e275b0>{number = 6, name = (null)}
DispatchQueue Options DispatchQueue 是唯一提供一組 Options 的 Scheduler,當 Operator 需要 SchedulerOptions
引數時,我們可以傳遞這些 Options。主要圍繞 QoS(服務質量)值,獨立於 DispatchQueue 上已設定的值。例如:
swift
.receive(
on: serialQueue,
options: DispatchQueue.SchedulerOptions(qos: .userInteractive)
)
我們將 DispatchQueue.SchedulerOptions
的例項傳遞.userInteractive
。在實際開發中使用這些 Options 有助於作業系統決定在同時有許多佇列忙碌的情況下首先安排哪個任務。
OperationQueue Scheduler
由於 OperationQueue 在內部使用 Dispatch,因此在表面上幾乎沒有區別:
swift
example("OperationQueue") {
let queue = OperationQueue()
let subscription = (1...10).publisher
.receive(on: queue)
.print()
.sink { value in
print("Received \(value)")
}
.store(in: &subscriptions)
}
建立一個簡單的 Publisher 發出 1 到 10 之間的數字,然後列印該值,執行 Playground:
--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1
receive value: (8)
Received 8
receive value: (9)
Received 9
receive value: (6)
Received 6
receive value: (3)
Received 3
receive value: (5)
Received 5
receive finished
receive value: (10)
receive value: (4)
receive value: (7)
receive value: (2)
按順序發出但無序到達!我們可以更改列印行以顯示當前執行緒:
swift
print("Received \(value) on thread \(Thread.current)")
再次執行 Playground:
--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (4)
Received 4 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x14e833620>{number = 6, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x14e80dfd0>{number = 4, name = (null)}
receive value: (1)
Received 1 on thread <NSThread: 0x14d70d840>{number = 5, name = (null)}
receive finished
receive value: (2)
receive value: (9)
receive value: (8)
receive value: (6)
每個值都是在不同的執行緒上接收的!如果我們檢視有關 OperationQueue 的文件,有一條關於執行緒的說明,OperationQueue 使用 Dispatch 框架(因此是 DispatchQueue)來執行操作。這意味著它不保證它會為每個交付的值使用相同的底層執行緒。
此外,每個 OperationQueue 中都有一個引數可以解釋一切:它是 maxConcurrentOperationCount。它預設為系統定義的數字,允許操作佇列同時執行大量操作。由於 Publisher 幾乎在同一時間發出所有值,它們被 Dispatch 的併發佇列分派到多個執行緒。
對程式碼進行一些修改:
swift
queue.maxConcurrentOperationCount = 1
再次執行 Playground:
--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (2)
Received 2 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (4)
Received 4 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (6)
Received 6 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (7)
Received 7 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (8)
Received 8 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (9)
Received 9 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive finished
這一次,我們將獲得真正的順序執行——將 maxConcurrentOperationCount
設定為 1 相當於使用序列佇列。
OperationQueue Options OperationQueue 沒有可用的 SchedulerOptions
。它實際上是 RunLoop.SchedulerOptions
型別,本身沒有提供任何 Options。
OperationQueue 陷阱 我們剛剛看到 OperationQueue 預設併發執行操作,我們需要非常清楚這一點,因為它可能會給我們帶來麻煩。當我們的 Publisher 發出值時都有大量工作要執行時,它可能是一個很好的工具。我們可以通過調整 maxConcurrentOperationCount
引數來控制負載。
內容參考
-
來自 Kodeco 的書籍《Combine: Asynchronous Programming with Swift》;
-
對上述 Kodeco 書籍的漢語自譯版 《Combine: Asynchronous Programming with Swift》整理與補充。
- 淺析三款大規模分散式檔案系統架構設計
- 動轉靜兩大升級!一鍵轉靜成功率領先,重點模型訓練提速18%
- 文心ERNIE 3.0 Tiny新升級!端側壓縮部署“小” “快” “靈”!
- [Android禪修之路] 解讀Layer
- 35張圖,直觀理解Stable Diffusion
- Combine | (V) Combine 中的錯誤處理和 Scheduler
- Combine | (III) Combine Operator:時間操作|序列
- 「Apple Watch 應用開發系列」複雜功能(Complication)基礎
- 「Apple Watch 應用開發系列」複雜功能實踐
- 在 iOS 上實現使用者主動觸發的 App Icon 切換
- view和layer知識點整理
- 實時活動(Live Activity) - 在鎖定螢幕和靈動島上顯示應用程式的實時資料
- 「Apple Watch 應用開發系列」Dock 快照
- Layer 1新方向 Meta系公鏈Aptos、Sui 、Linera盤點
- Synapse Chain:多鏈世界的Layer0跨鏈基礎設施
- V神:什麼樣的layer 3才有意義?
- 以太坊升級將如何影響Layer2的發展?
- Vitalik:什麼樣的 Layer3 才有意義
- 從系統架構分析安全問題及應對措施
- 【雲原生】Kubernetes(K8S)與資料庫