【精通內核】Linux 內核併發控制原理信號量與 P-V 原語源碼解析
前言
:mailbox: 作者簡介 :小明java問道之路,專注於研究計算機底層/Java/Liunx 內核,就職於大型金融公司後端高級工程師,擅長交易領域的高安全/可用/併發/性能的架構設計:mailbox:
:trophy: InfoQ 簽約博主 、CSDN 專家博主/Java 領域優質創作者、阿里雲專家/簽約博主、華為雲專家、51CTO 專家/TOP 紅人 :trophy:
:fire:如果此文還不錯的話,還請:+1: 關注 、點贊 、收藏 三連支持:+1:一下博主~
本文導讀
本文深入 Linux 內核源碼,從核心源碼入口講起,詳細對信號量、互斥量的內核代碼講解,其中對 P-V 操作實現逐行剖析,Linux 內核併發控制原理的鎖實現和原理在後續文章中一一講解,本文深入淺出 Linux 中斷控制的實現原理。
一、Linux 內核信號量
信號量原理是什麼,多線程的時候,我們需要一個變量來表示信號量狀態,同時需要一個隊列,當線程無法獲取信號量時,需要進行阻塞。
1、atomic_t 結構體
atomict 為原子性變量、_wait_queue_head 為等待隊列頭部、wait_queue 為等待任務的表節點、semaphore 為信號量結構體。我們看到原子性整型變量聲明 ,其中通過 counter 變量來描述信號量的個數,然而這裏為了避免編譯器優化用 volatile 來修飾。
typedef struct {
volatile int counter;
} atomic_t; // 等待隊列頭部,通過自旋鎖來保證操作隊列線程安全
struct_waitqueue_head {
spinlock_t lock; // 保護阻塞隊列的自旋鎖
struct list_head task_list; // 任務阻塞隊列
};
struct__wait_queue { // 等待任務節點
unsigned int flags; // 任務結構體,這裏只需要知道它就是進程控制塊 PCB 代表了進程即可
struct task_struct * task;
wait_queue_func_t func; // 等待函數指針
struct list head task list; // 所處鏈表的結構體
};
struct semaphore { // 信號量結構體
atomic_t count; // 信號量計數
int sleepers; // 等待任務數
wait_queue_head_t wait; // 等待隊列
};
複製代碼
接下來我們來看看信號量的初始化的內核源碼。
2、信號量的初始化的內核源碼
首先初始化信號量,原子性設置信號量的初始值,就是將(count)->counter=val,初始化 sleeper 為 0,最後初始化等待鏈表。
static inline void sema_init(struct semaphore*sem, int val) { // 初始化信號量
atomic_set(&sem->count, val); // 原子性設置信號量的初始值,就是將(count)->counter=val
sem -> sleepers = 0; // 初始化 sleeper 為0
init_waitqueue_head( &sem -> wait); // 初始化等待鏈表
}
// 初始化操作
static inline void init_waitqueue_head(wait_queue_head_t*q){
//自旋鎖狀態初始為spinlock t結構體
q->lock = SPIN LOCK UNLOCKED;
//初始化等待鏈表,頭尾相接:(ptr)-> next=(ptr); (ptr)->prev=(ptr);
INIT_LIST_HEAD(&q->task_list);
}
複製代碼
二、Linux 內核 P-V 原語詳解
1、獲取信號量 P 操作原理解析
獲取信號量 P 操作,通過內聯彙編原子性對 counte 操作。首先通過 decl 同時根據是否是多處理器加 lock 前綴,保證了單條指令的原子性,然後根據遞減後的值是否為負數來判斷獲取信號量是否成功,如果失敗,那麼需要將線程進行睡眠,此時調用 _down_failed 函數完成此操作。
具體實現原理如下。
static inline void down(struct semaphore*sem) {
_asm__volatile_( // 通過 lock 前綴實現原子性的-sem->count操作,decl指令相當於對操作數自減
LOCK "decl %O" // 如果減完後發現sign標誌位為1,則表明count值為負,往前跳到標號2處,調用 __down_failed處理,否則獲取成功,直接退出
"js 2f"
"1: "
LOCK_SECTION_START("")
"2:call__down_failed"
"jmp 1b"
// 這裏採用了 LOCK SECTION START 和LOCK SECTION END 宏定義,將call
// __down_failed 和 jmp 1b的彙編代碼放到.textlock段中
// 所以如果執行完 __down_failed 方法後調用jmp 1b
// 會回到 LOCK SECTION START之前的段中,即退出down方法
LOCK SECTION END:
: "=m" (sem -> count)
:"c" (sem)
:"memory");
}
// 通過彙編聲明瞭 __down_failed的代碼地址
asm(
".text"
".align 4" // 4字節對齊
".globl___down_failed"
"__down failed:"
#if defined(CONFIG_FRAME_POINTER) // 如果定義了棧幀指針,那麼開闢新的方法幀
"pushl %ebp"
"movl %esp, %ebp"
#endif
// 保存影響的寄存器值,因為隨後要調用_down 函數, 可能會影響 eax、edx、ecx 寄存器,
// 所以這裏需要先對其進行保存,在方法返回後再還原
"pushl %eax"
"pushl %edx"
"pushl %ecx"
"call __down" // 調用 __down來執行當counter為0時的操作
"popl %ecx" // 調用返回後恢復保存的寄存器
"popl %edx"
"popl %eax"
#if defined(CONFIG_FRAME_POINTER) //還原方法幀
"movl %ebp,%esp"
"popl %ebp"
#endif
"ret"
);
複製代碼
我們最終是調用函數 __ down 來執行最終的 __down_failed 操作:
下面是 void__dow 函數源碼,通過 current 宏獲取當前任務結構體,獲取到了任務 PCB,初始化 wait_queuet,也就是等待線程代表,宏定義為;wait_queue_t name = {.task = tsk, .func = defauft_wake_function, .tasklist = {NULL, NULL}}
設置任務狀態為 TASK_UNINTERRUPTIBLE,表明不可中斷的阻塞,獲取自旋鎖,將等待任務節點插入等待鏈表的隊尾處,增加等待計數,循環等待釋放信號量,對等待線程減 1 後與當前信號量的 counter 值相加
如果結果等於 0 則結束循環,這裏等於 0 的條件就是等待信號量足夠容納更多的線程,所以不需要阻塞,設置等待任務數為 1,釋放自旋鎖,喚醒調度器執行其他任務,當前任務就被阻塞在了等待隊列裏
當任務重新被喚醒時,將重新獲取自旋鎖,重新設置任務狀,喚醒等待任務,釋放自旋鎖,設置當前任務狀態為 TASK_RUNNING。
以下代碼我們可以看到,使用了自旋鎖、P-V 操作,並增加了阻塞隊列實現信號量。如果讀者對 Linux 進程調度原理不清楚,這裏面方法 schedule ,其作用就是朱勇釋放 CPU 的控制權,交給調度程序,然後由調度程序切換到其他進程執行,直到信號量釋放後,再由其他進程將其狀態設置為 RUNNABLE 後,交由調度進程重新調度執行。
void__down(struct semaphore *sem) {
// 通過current 宏獲取當前任務結構體,獲取到了任務PCB
struct task_struct *tsk = current;
// 初始化wait_queue t,也就是等待線程代表
// 宏定義為;wait_queue_t name = {.task = tsk, .func = defauft_wake_function, .tasklist = {NULL, NULL}}
DECLARE_WAITQUEUE(wait, tsk);
unsigned long flags;
tsk->state = TASK UNINTERRUPTIBLE; // 設置任務狀態為TASK_UNINTERRUPTIBLE,表明不可中斷的阻塞
spin_lock_irqsave(&sem -> waitlock, flags); // 獲取自旋鎖
add_wait_queue_exclusive_locked(&sem -> wait, &wait); // 將等待任務節點插入等待鏈表的隊尾處
sem -> sleepers++; // 增加等待計數
for (; ; ) { // 循環等待釋放信號量
int sleepers = sem -> sleepers;
// 對等待線程減1後與當前信號量的counter值相加
// 如果結果等於0則結束循環,這裏等於0的條件就是等待信號量足夠容納更多的線程,所以不需要阻塞
if (!atomic_add_negative(sleepers - 1, & sem -> count)){
sem -> sleepers = 0;
break;
}
sem -> sleepers = 1; // 設置等待任務數為1
spin_unlock_irqrestore( & sem -> wait.lock, flags); // 釋放自旋鎖
// 喚醒調度器執行其他任務,當前任務就被阻塞在了等待隊列裏
schedule();
spin_lock_irqsave( & sem -> wait.lock flags); // 當任務重新被喚醒時,將重新獲取自旋鎖
tsk -> state = TASK UNINTERRUPTIBLE; // 重新設置任務狀態為不可中斷狀態,繼續循環
}
// 至此任務已經獲取了信號量,等待線程從隊列中移出來
remove_wait_queue_locked( & sem -> wait, &wait);
wake_up_locked( & sem -> wait); // 喚醒等待任務
spin_unlock_irqrestore( & sem -> wait.lock, flags); // 釋放自旋鎖
tsk->state = TASK_RUNNING; // 設置當前任務狀態為TASK_RUNNING
}
複製代碼
2、釋放信號量 V 操作原理解析
喚醒信號量通過對 semaphore 中的 counter 進行加 1,同樣通過 LOCK 前綴保證指令的原子性,根據返回值是否小於或等於 0 來判斷是否有線程在等待,如果有線程等待,那麼調用_up_wakeup 函數喚醒等待信號量的線程。
static inline void up(struct semaphore*sem) {
_asm___volatile_(
LOCK "incl %O" // 原子性實現++sem->count
"jle 2f" // 如果小於或等於0,則跳到2標誌處,調用__up_wakeup函數喚醒等待任務
"1:"
LOCK SECTION_START ("")
"2:call__up_wakeup"
"jmp 1b"
LOCK_SECTION_END
".subsection 0"
:"=m” (sem->count)"
:"c" (sem)"
:" memory ");
}
// 彙編代碼保存影響的寄存器,然後調用_up函數喚醒任務
asm(
".text"
".align 4"
".globl___up_wakeup"
"__up_wakeup: "
"pushl %eax"
"pushl %edx"
"pushl %ecx"
"call_ up" // 調用__up 函數
"popl %ecx"
"popl %edx"
"popl %eax"
"ret");
複製代碼
下面看下__up 喚醒的源碼實現,__up 直接調用 wake_up,通過喚醒宏定義,調用喚醒函數實現__wake_up,獲取自旋鎖,調用__wake_up_common 函數喚醒任務,注意這裏傳入的是 sync 為 0,釋放自旋鎖的過程
void __up(struct semaphore *sem) {
wake_up( & sem -> wait); // 直接調用wake_up
}
//喚醒宏定義
# define wake_up(x) __wake_up((x),TASK_UNINTERRUPTIBLE |TASK_INTERRUPTIBLE,1)
// 喚醒函數實現
void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive) {
unsigned long flags;
spin_lock_irqsave(&q->lock, flags); // 獲取自旋鎖
// 調用__wake_up_common函數喚醒任務,注意這裏傳入的是sync為0
_wake_up_common(q, mode, nr_exclusive, 0);
spin_unlock_irqrestore(&q->lock, flags); // 釋放自旋鎖
}
// 喚醒操作_wake_up_common 函數的實現原理。
static void _wake_up_common(wait_queue_head_*qunsigned int mode, int r_exclusive, int sync) {
struct list_head *tmp,*next;
list_for_each_safe(tmp, next, & q -> task_list){ // 遍歷等待列表
wait_queue_t * curr;
unsigned flags;
//獲取當前任務節點wait_queue_I
curr = list_entry(tmp, wait_queue_, task_list);
flags = curr -> flags//獲取當前等待標誌位
// 調用喚醒函數
// 如果成功喚醒任務、 當前任務標誌位 WQ_FLAG_EXCLUSIVE、
// nr_exclusive 互斥數量,自減為0,那麼退出循環
if (curr -> func(curr, mode, sync) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
// 默認喚醒函數 default wake function。其實現過程如下
int default_wake_function(walt_queue_curr, unsigned mode, int sync) {
task_t * p = curr -> task; // 獲取當前任務
return try_to_wake_up(p, mode, sync); // 調用try_to_wake_up函數喚醒
}
複製代碼
我們可以看到是通過調用 try_to_wake_up 函數喚醒,其中涉及 Linux 調度器,且該方法涉及大量實現,這裏我們給出源碼的相關原理
首先獲取到當前 CPU 的執行隊列 runqueue,並且關閉中斷,保存當前狀態信息,如果當前狀態信息和傳入狀態不為 0 並且如果當前任務不屬於任何優先級隊列,執行
任務重調度實現:如果是對稱多處理器結構,那麼需要通過跨 CPU 調用,觸發目標 CPU 調度器的調度工作。
信號量代碼邏輯較為簡單,對於 P-V 操作,我們通過原子性對 counter 變量操作,然後其放入信號量的等待隊列中,或者將其從等待隊列中取出。
由於是多線程操作阻塞隊列,因此需要把自旋鎖來保護阻塞隊列。接着判斷任務是否處於任務就緒隊列 runqueue 中,如果在隊列中,則設置標誌為 TASKRUNNING 狀態,否將入 runqueue 中。這裏 runqueue 是每個 CPU 都擁有的任務就緒調度隊列。
三、Linux 內核互斥量代碼詳解
互斥量就是特殊版本的信號量,即 count 為 1 時的特殊信號量,互斥量就是特殊的信號量
// 將struct semaphor 類型定義為 mutex_t
typedef struct semaphore mutex_t;
// mutex_init 其實傳入的type和name都沒用,直接是通過初始化信號量為1來代替
#define mutex_init(lock, type, name) sema_init(lock,1)
// mutex_destroy則為初始化信號量-99
#define mutex_destroy(lock) sema_init(lock,-99)
// 上鎖調用的是信號量的down操作
#define mutex_lock(lock, num) down(lock)
// trylock 調用的是down_trylock,這裏不再講解。這裏的trylock 就是非阻塞的lock,獲取到鎖,返回0,否則返回1
#define mutex_trylock(lock) (down_trylock(lock)?0:1)
//解鎖直接調用up操作
#define mutex_unlock(lock) up(lock)
複製代碼
總結
本文深入 Linux 內核源碼,從核心源碼入口講起,詳細對信號量、互斥量的內核代碼講解,其中對 P-V 操作實現逐行剖析,Linux 內核併發控制原理的鎖實現和原理在後續文章中一一講解,本文深入淺出 Linux 中斷控制的實現原理。
- 【精通內核】Linux 內核併發控制原理信號量與 P-V 原語源碼解析
- 「趣學前端」SVG,邊學邊做
- 你真的理解 C 語言中的 “ 數組 ” 嗎?(初階篇)
- 深入思考 Schema 管理的幾個基本問題
- 軟件社區中的“沉默的大多數”
- 數據中心網絡架構的需求原則及策略
- 漢諾塔(遞歸 非遞歸版)
- 手把手帶你實現 JWT 登錄鑑權
- 關於鏈路追蹤所需要了解的知識
- k8s 自定義 controller 三部曲之一: 創建 CRD(Custom Resource Definition)
- Spring Security 系列教程 03-- 實現 HTTP 基本認證
- Spring Security 系列教程 06-- 前後端分離時的安全處理方案
- 在競爭中留住人才,員工認可比加薪更重要
- 每日一 R「21」Unsafe Rust
- 開源一夏 | 一文讀懂 Shiro 登錄認證全流程
- RTC 技術的試金石:火山引擎視頻會議場景技術實踐
- 一起學習設計模式:責任鏈模式
- 微軟向 Windows Server 添加虛擬核心許可,引來亞馬遜、谷歌等不滿
- 對 JavaBean 的特點寫法與實戰心得詳解
- 軟件安全領域的新面孔 Seal:探索開源安全新邊界 | Q推薦