iOS中為什麼會有這麼多鎖呢?
其實iOS領域很多文章都談到了關於鎖的文章,但是我為什麼要在這裏重新寫一篇文章呢?一是很多文章使用的觀點依然是很老的觀點,和我的測試結果不符合,二則是自己對這方面也比較生疏,所以就在最近重新梳理一下自己對着方面的調查,梳理一下這一塊的知識點。
首先是一波對比,我使用了10^7次遍歷,使用的開發語言是Swift,在iOS15.5系統版本的iPhone13真機上跑出的數據:
整體來説NSConditionLock的性能會略慢,但是其他的性能都類似,在這個量級的數據處理下,它們的表現都非常的接近。從圖中可以看出性能最好的三個鎖是os_unfair_lock、pthread_mutex以及DispatchSemaphore,前兩者是互斥鎖,後者是信號量。
首先我想提出一個問題,那就是鎖的目的是什麼?
在聊鎖的目的之前,那首先我們來看一個概念,那就是線程安全。什麼是線程安全?我的定義是當多線程都需要操作某個共享數據時,並不會引起意料之外的情況,能保證該共享數據的正確性。可是如何去實現一個線程安全類呢?通用的方式就是在一些數據的操作上加鎖。而鎖的目的就是確保多線程操作共享數據時,能保證數據的準確性和可預測性。
os_unfair_lock
我相信有很多人都閲讀過ibireme關於鎖的性能對比的知名文章《不再安全的 OSSpinLock》,其中提到了OSSpinLock不再安全的理由,但是由此卻引發一個問題,那就是OSSpinLock主要的使用場景是哪裏呢?
我們都知道在Objective-C中定義一個屬性的時候,有時屬性會被聲明為atomic,這就是説這個屬性的set操作和get操作是原子性的,那麼如何確保這些 操作的原子性呢?我想這個時候你已經猜到答案了,Apple使用的方案是OSSpinLock,這是一個自旋鎖,但是這個鎖有一個很嚴重的問題,那就是優先級反轉問題會導致自旋鎖發生死鎖。
iOS 系統中維護了 5 個不同的線程優先級/QoS: background,utility,default,user-initiated,user-interactive。高優先級線程始終會在低優先級線程前執行,一個線程不會受到比它更低優先級線程的干擾。這種線程調度算法會產生潛在的優先級反轉問題,從而破壞了 spin lock。
具體來説,如果一個低優先級的線程獲得鎖並訪問共享資源,這時一個高優先級的線程也嘗試獲得這個鎖,它會處於 spin lock 的忙等狀態從而佔用大量 CPU。此時低優先級線程無法與高優先級線程爭奪 CPU 時間,從而導致任務遲遲完不成、無法釋放 lock。這並不只是理論上的問題,libobjc 已經遇到了很多次這個問題了,於是蘋果的工程師停用了 OSSpinLock。
蘋果工程師 Greg Parker 提到,對於這個問題,一種解決方案是用 truly unbounded backoff 算法,這能避免 livelock 問題,但如果系統負載高時,它仍有可能將高優先級的線程阻塞數十秒之久;另一種方案是使用 handoff lock 算法,這也是 libobjc 目前正在使用的。鎖的持有者會把線程 ID 保存到鎖內部,鎖的等待者會臨時貢獻出它的優先級來避免優先級反轉的問題。理論上這種模式會在比較複雜的多鎖條件下產生問題,但實踐上目前還一切都好。
而在iOS 10之後,Apple使用了os_unfair_lock來替代了OSSpinLock, 這是一個高性能的互斥鎖,而不是自旋鎖,如果是阻止兩個線程可以同時訪問臨界區,那麼這個鎖無疑可以很好的完成工作,包括上述的pthread_mutex_lock 以及信號量都可以,但是如果我們需要鎖具備某些特性,那麼這個時候就需要其他多種類的鎖了。
```swift // os_unfair_lock的使用 var unfairLock = os_unfair_lock() os_unfair_lock_lock(&unfairLock) os_unfair_lock_unlock(&unfairLock)
// pthreadMutex的使用 var pthreadMutex = pthread_mutex_t() pthread_mutex_lock(&pthreadMutex) pthread_mutex_unlock(&pthreadMutex) ```
這裏再補充説明一下,Apple使用在保證原子性時實際會調用到的方法如下:
c
static inline void reallySetProperty() {
...
if (!atomic) {
oldValue = *slot;
*slot = newValue;
} else {
//PropertyLocks是一個StripedMap<spinlock_t>類型的全局變量
//而StripedMap是一個用數組來實現的hashmap,key是指針,value是類型是spinlock_t對象
//而spinlock_t則是mutex_tt<LOCKDEBUG>的類,而mutex_tt類內部是由os_unfair_lock mLock來實現
//所以,PropertyLocks[slot]目的就是獲取os_unfair_lock對象
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock();
oldValue = *slot;
*slot = newValue;
slotlock.unlock();
}
...
}
它通過地址從PropertyLocks數組中取出了spinlock_t鎖,可是如何使用地址作為數組下標呢?它使用了一個很巧妙的hash算法,來實現指針到數組下標的轉化:
c
static unsigned int indexForPointer(const void *p) {
uintptr_t addr = reinterpret_cast<uintptr_t>(p);
// 這是一個哈希算法,可以將對象的地址轉化為數組的下標
// 使得數組元素在0~StripeCount之間
return ((addr >> 4) ^ (addr >> 9)) % StripeCount;
}
當然這種方法也會偶爾導致哈希衝突,兩個不同的地址會導致獲取到同一個Lock,這樣會造成資源閒置,沒有充分利用CPU的資源,但是不妨礙這個哈希算法整體上是高效的。
NSLock
既然已經有了性能比較高的互斥鎖,那為什麼還需要有其它這些雜七雜八的鎖呢?比如説接下來我們要提到的NSLock,這個鎖也是一個互斥鎖,而它是基於pthread_mutex_lock的封裝,而在原有的基礎上增加了一個特性那就是超時!沒錯這就是有其他各種鎖的原因,給不同的鎖不同的特性,以滿足具體的開發場景,NSLock的API如下:
``swift
open class NSLock : NSObject, NSLocking {
open func
try`() -> Bool
open func lock(before limit: Date) -> Bool
open var name: String?
} ```
在某些時候,超時這個特性是非常有效的,因為在一些可能發生死鎖的場景中,使用NSLock可以讓我們有一個保險機制,即使發生了死鎖,也可以在一定的時間之後走出加鎖狀態,恢復到正常的程序處理邏輯。但是和以上的互斥鎖一樣,它都無法應對遞歸的情況,那使用什麼來處理遞歸鎖呢?NSRecursiveLock!
NSRecursiveLock
使用NSRecursiveLock可以使得該鎖被同一線程多次獲取而不會導致線程死鎖。但是每一次lock都對應一次unlock,這樣unlock結束之後,鎖才會釋放。而顧名思義,這種類型的鎖被用於一個遞歸方法內部來防止線程被阻塞。
```swift let rlock = NSRecursiveLock()
class RThread : Thread {
override func main(){
rlock.lock()
print("Thread acquired lock")
callMe()
rlock.unlock()
print("Exiting main")
}
func callMe(){
rlock.lock()
print("Thread acquired lock")
rlock.unlock()
print("Exiting callMe")
}
}
var tr = RThread() tr.start()
// 多次申請鎖,並不會導致崩潰,這就是遞歸鎖的作用 ```
NSConditionLock
條件鎖滿足NSLocking
協議,所以基本的NSLock類型鎖的基本lock,unlock這種全局的鎖方法它也是具備的,初次之外,它還具備自己的特性,通常情況下,當線程需要以某種特定的順序執行任務時,比如一個線程生產數據,而另一個線程消耗數據時,可以使用NSConditionLock(比如常見的生產者消費者模型)。接下來我們來看一個實例:
```swift let NODATA = 1 let GOTDATA = 2 let clock = NSConditionLock(condition: NODATA) var shareInt = 0
class ProducerThread: Thread { override func main() { for _ in 0..<100 { clock.lock(whenCondition: NODATA) LockFile.ProducerThread.sleep(forTimeInterval: 0.5) sharedInt = sharedInt + 1 NSLog("生產者:(sharedInt)") clock.unlock(withCondition: GOTDATA) } } }
class ConsumerThread: Thread {
override func main() {
for _ in 0..<100 {
clock.lock(whenCondition: GOTDATA)
sharedInt = sharedInt - 1
NSLog("消費者:\(sharedInt)")
clock.unlock(withCondition: NODATA)
}
}
}
let pt = ProducerThread.init() let ct = ConsumerThread.init() pt.start() ct.start() ```
當創建一個條件鎖的時候,需要指定一個特定Int類型的值。而lock(whenCondition:)
方法當條件滿足時會獲取這個鎖,或者條件和另一個線程在使用unlock(withCondition:)
釋放鎖時設置的值滿足時,NSConditionLock對象就會獲取鎖執行後續的代碼片段,但是當lock(whenCondition:)
方法沒有獲取鎖的時候(條件沒滿足時),這個方法會阻塞線程的執行,直到獲得鎖為止。
NSCondition
NSCondition和前者是很容易混淆的,但是這個鎖解決了什麼問題呢?
當一個已獲得鎖的線程發現執行其工作所需的附加條件(它需要一些資源、另一個處於特定狀態的對象等)暫時還沒有得到滿足時,它需要一種方法來暫停,並且一旦滿足條件就繼續工作的機制,可是如何實現呢?可以通過連續的檢查(忙等待)來實現,但是這樣做的話,線程持有的鎖會發生什麼?我們應該在等待時保留它們還是釋放它們?還是在滿足條件時再次獲得它們?
而NSCondition提供了一種簡潔的方式來提供了這種問題的解決方案,一旦一個線程被放在該Condition的等待列表中,它可以通過另一個線程Signal來喚醒。以下是具體的案例:
```swift let cond = NSCondition.init() var available = false var sharedString = ""
class WriterThread: Thread { override func main() { for _ in 0..<100 { cond.lock() sharedString = "🤣" available = true cond.signal() cond.unlock() } } }
class PrinterThread: Thread { override func main() { for _ in 0..<100 { cond.lock() while (!available) { cond.wait() } sharedString = "" available = false cond.unlock() } } } ```
當線程waits一個條件時,這個Condition對象會unlock當前鎖並且阻塞線程。當Condition發出信號時,系統會喚醒線程,然後這個Condition對象會在wait()或者wait(until:)返回之前,這個Condition對象會重新獲取到它的鎖,因此,從線程的角度來看,它似乎一直持有者鎖(雖然中途它會失去鎖)。
Dispatch Semaphore
最後我們聊一聊信號量,簡而言之,信號量是需要在不同的線程中進行鎖定和解鎖時使用的鎖。因為它的wait方法會阻塞當前線程,所以需要其他線程發來signal信號來喚醒它。
swift
let semaphore = DispatchSemaphore.init(value: 0)
DispatchQueue.global(qos: .userInitiated).async {
// to do some thing
semaphore.signal()
}
semaphore.wait() // will block thread
如上述例子一樣,信號量通常用於鎖定一個線程,直到另外一個線程中事件的完成後發出signal信號。從上述的測試圖標,以及其他諸多文章,信號量的速度是很快的。上述的生產者消費者模型也可以使用信號量來實現:
```swift let semaphore = DispatchSemaphore.init(value: 0)
DispatchQueue.global(qos: .userInitiated).async { while true { sleep(1) sharedInt = sharedInt + 1 NSLog("生產了: (sharedInt)") _ = semaphore.signal() } }
DispatchQueue.global(qos: .userInitiated).async { while true { if sharedInt <= 0 { _ = semaphore.wait(timeout: .distantFuture) } else { sharedInt = sharedInt - 1 NSLog("消耗了: (sharedInt)") } } } ```
好了,簡單説了一下我對於鎖的梳理,希望大家也可以從中學到一點東西吧~ 如果有什麼問題,或者錯誤希望大家可以留言指點。
- 我正在參與掘金技術社區創作者簽約計劃招募活動,點擊鏈接報名投稿。
參考
1、《不再安全的OSSPinLock》