【精通內核】Linux 內核併發控制原理信號量與 P-V 原語源碼解析

語言: CN / TW / HK

前言

:mailbox: 作者簡介 :小明java問道之路,專注於研究計算機底層/Java/Liunx 內核,就職於大型金融公司後端高級工程師,擅長交易領域的高安全/可用/併發/性能的架構設計:mailbox: 

:trophy: InfoQ 簽約博主 、CSDN 專家博主/Java 領域優質創作者、阿里雲專家/簽約博主、華為雲專家、51CTO 專家/TOP 紅人 :trophy:

:fire:如果此文還不錯的話,還請:+1: 關注 、點贊 、收藏 三連支持:+1:一下博主~

本文導讀

本文深入 Linux 內核源碼,從核心源碼入口講起,詳細對信號量、互斥量的內核代碼講解,其中對 P-V 操作實現逐行剖析,Linux 內核併發控制原理的鎖實現和原理在後續文章中一一講解,本文深入淺出 Linux 中斷控制的實現原理。

一、Linux 內核信號量

信號量原理是什麼,多線程的時候,我們需要一個變量來表示信號量狀態,同時需要一個隊列,當線程無法獲取信號量時,需要進行阻塞。

1、atomic_t 結構體

atomict 為原子性變量、_wait_queue_head 為等待隊列頭部、wait_queue 為等待任務的表節點、semaphore 為信號量結構體。我們看到原子性整型變量聲明 ,其中通過 counter 變量來描述信號量的個數,然而這裏為了避免編譯器優化用 volatile 來修飾。

    typedef struct {        volatile int counter;    } atomic_t; // 等待隊列頭部,通過自旋鎖來保證操作隊列線程安全        struct_waitqueue_head {        spinlock_t lock;             // 保護阻塞隊列的自旋鎖	        struct list_head task_list;  // 任務阻塞隊列    };
struct__wait_queue { // 等待任務節點 unsigned int flags; // 任務結構體,這裏只需要知道它就是進程控制塊 PCB 代表了進程即可 struct task_struct * task; wait_queue_func_t func; // 等待函數指針 struct list head task list; // 所處鏈表的結構體 };
struct semaphore { // 信號量結構體 atomic_t count; // 信號量計數 int sleepers; // 等待任務數 wait_queue_head_t wait; // 等待隊列 };

複製代碼

接下來我們來看看信號量的初始化的內核源碼。

2、信號量的初始化的內核源碼

首先初始化信號量,原子性設置信號量的初始值,就是將(count)->counter=val,初始化 sleeper 為 0,最後初始化等待鏈表。

    static inline void sema_init(struct semaphore*sem, int val) { // 初始化信號量        atomic_set(&sem->count, val);           // 原子性設置信號量的初始值,就是將(count)->counter=val        sem -> sleepers = 0;                    // 初始化 sleeper 為0	        init_waitqueue_head( &sem -> wait);     // 初始化等待鏈表    }        // 初始化操作    static inline void init_waitqueue_head(wait_queue_head_t*q){        //自旋鎖狀態初始為spinlock t結構體         q->lock = SPIN LOCK UNLOCKED;        //初始化等待鏈表,頭尾相接:(ptr)-> next=(ptr); (ptr)->prev=(ptr);         INIT_LIST_HEAD(&q->task_list);    }

複製代碼

二、Linux 內核 P-V 原語詳解

1、獲取信號量 P 操作原理解析

獲取信號量 P 操作,通過內聯彙編原子性對 counte 操作。首先通過 decl 同時根據是否是多處理器加 lock 前綴,保證了單條指令的原子性,然後根據遞減後的值是否為負數來判斷獲取信號量是否成功,如果失敗,那麼需要將線程進行睡眠,此時調用 _down_failed 函數完成此操作。

具體實現原理如下。

    static inline void down(struct semaphore*sem) {                _asm__volatile_( // 通過 lock 前綴實現原子性的-sem->count操作,decl指令相當於對操作數自減             LOCK "decl %O" // 如果減完後發現sign標誌位為1,則表明count值為負,往前跳到標號2處,調用 __down_failed處理,否則獲取成功,直接退出            "js 2f"            "1: "            LOCK_SECTION_START("")             "2:call__down_failed"             "jmp 1b"            // 這裏採用了 LOCK SECTION START 和LOCK SECTION END 宏定義,將call            // __down_failed 和 jmp 1b的彙編代碼放到.textlock段中            // 所以如果執行完 __down_failed 方法後調用jmp 1b            // 會回到 LOCK SECTION START之前的段中,即退出down方法                            LOCK SECTION END:            : "=m" (sem -> count)            :"c" (sem)            :"memory");    }
// 通過彙編聲明瞭 __down_failed的代碼地址 asm( ".text" ".align 4" // 4字節對齊 ".globl___down_failed" "__down failed:" #if defined(CONFIG_FRAME_POINTER) // 如果定義了棧幀指針,那麼開闢新的方法幀 "pushl %ebp" "movl %esp, %ebp"
#endif // 保存影響的寄存器值,因為隨後要調用_down 函數, 可能會影響 eax、edx、ecx 寄存器, // 所以這裏需要先對其進行保存,在方法返回後再還原 "pushl %eax" "pushl %edx" "pushl %ecx" "call __down" // 調用 __down來執行當counter為0時的操作 "popl %ecx" // 調用返回後恢復保存的寄存器 "popl %edx" "popl %eax"
#if defined(CONFIG_FRAME_POINTER) //還原方法幀 "movl %ebp,%esp" "popl %ebp" #endif "ret" );

複製代碼

我們最終是調用函數 __ down 來執行最終的 __down_failed 操作:

下面是 void__dow 函數源碼,通過 current 宏獲取當前任務結構體,獲取到了任務 PCB,初始化 wait_queuet,也就是等待線程代表,宏定義為;wait_queue_t name = {.task = tsk, .func = defauft_wake_function, .tasklist = {NULL, NULL}}

設置任務狀態為 TASK_UNINTERRUPTIBLE,表明不可中斷的阻塞,獲取自旋鎖,將等待任務節點插入等待鏈表的隊尾處,增加等待計數,循環等待釋放信號量,對等待線程減 1 後與當前信號量的 counter 值相加

如果結果等於 0 則結束循環,這裏等於 0 的條件就是等待信號量足夠容納更多的線程,所以不需要阻塞,設置等待任務數為 1,釋放自旋鎖,喚醒調度器執行其他任務,當前任務就被阻塞在了等待隊列裏

當任務重新被喚醒時,將重新獲取自旋鎖,重新設置任務狀,喚醒等待任務,釋放自旋鎖,設置當前任務狀態為 TASK_RUNNING。

以下代碼我們可以看到,使用了自旋鎖、P-V 操作,並增加了阻塞隊列實現信號量。如果讀者對 Linux 進程調度原理不清楚,這裏面方法 schedule ,其作用就是朱勇釋放 CPU 的控制權,交給調度程序,然後由調度程序切換到其他進程執行,直到信號量釋放後,再由其他進程將其狀態設置為 RUNNABLE 後,交由調度進程重新調度執行。

    void__down(struct semaphore *sem) {        // 通過current 宏獲取當前任務結構體,獲取到了任務PCB        struct task_struct *tsk = current;        // 初始化wait_queue t,也就是等待線程代表        // 宏定義為;wait_queue_t name = {.task = tsk, .func = defauft_wake_function, .tasklist = {NULL, NULL}}        DECLARE_WAITQUEUE(wait, tsk);        unsigned long flags;        tsk->state = TASK UNINTERRUPTIBLE;           // 設置任務狀態為TASK_UNINTERRUPTIBLE,表明不可中斷的阻塞         spin_lock_irqsave(&sem -> waitlock, flags); // 獲取自旋鎖        add_wait_queue_exclusive_locked(&sem -> wait, &wait); // 將等待任務節點插入等待鏈表的隊尾處        sem -> sleepers++;          // 增加等待計數        for (; ; ) {                // 循環等待釋放信號量            int sleepers = sem -> sleepers;            // 對等待線程減1後與當前信號量的counter值相加            // 如果結果等於0則結束循環,這裏等於0的條件就是等待信號量足夠容納更多的線程,所以不需要阻塞            if (!atomic_add_negative(sleepers - 1, & sem -> count)){                sem -> sleepers = 0;                break;            }            sem -> sleepers = 1;    // 設置等待任務數為1            spin_unlock_irqrestore( & sem -> wait.lock, flags); // 釋放自旋鎖            // 喚醒調度器執行其他任務,當前任務就被阻塞在了等待隊列裏            schedule();            spin_lock_irqsave( & sem -> wait.lock flags);  // 當任務重新被喚醒時,將重新獲取自旋鎖             tsk -> state = TASK UNINTERRUPTIBLE;           // 重新設置任務狀態為不可中斷狀態,繼續循環         }                // 至此任務已經獲取了信號量,等待線程從隊列中移出來         remove_wait_queue_locked( & sem -> wait, &wait);        wake_up_locked( & sem -> wait);                     // 喚醒等待任務        spin_unlock_irqrestore( & sem -> wait.lock, flags); // 釋放自旋鎖        tsk->state = TASK_RUNNING;                          // 設置當前任務狀態為TASK_RUNNING     }

複製代碼

2、釋放信號量 V 操作原理解析

喚醒信號量通過對 semaphore 中的 counter 進行加 1,同樣通過 LOCK 前綴保證指令的原子性,根據返回值是否小於或等於 0 來判斷是否有線程在等待,如果有線程等待,那麼調用_up_wakeup 函數喚醒等待信號量的線程。

    static inline void up(struct semaphore*sem) {        _asm___volatile_(            LOCK "incl %O" // 原子性實現++sem->count            "jle 2f"       // 如果小於或等於0,則跳到2標誌處,調用__up_wakeup函數喚醒等待任務            "1:"            LOCK SECTION_START ("")             "2:call__up_wakeup"             "jmp 1b"            LOCK_SECTION_END             ".subsection 0"            :"=m” (sem->count)"            :"c" (sem)"            :" memory ");    }
// 彙編代碼保存影響的寄存器,然後調用_up函數喚醒任務 asm( ".text" ".align 4" ".globl___up_wakeup" "__up_wakeup: " "pushl %eax" "pushl %edx" "pushl %ecx" "call_ up" // 調用__up 函數 "popl %ecx" "popl %edx" "popl %eax" "ret");

複製代碼

下面看下__up 喚醒的源碼實現,__up 直接調用 wake_up,通過喚醒宏定義,調用喚醒函數實現__wake_up,獲取自旋鎖,調用__wake_up_common 函數喚醒任務,注意這裏傳入的是 sync 為 0,釋放自旋鎖的過程

    void __up(struct semaphore *sem) {        wake_up( & sem -> wait); // 直接調用wake_up    }
//喚醒宏定義 # define wake_up(x) __wake_up((x),TASK_UNINTERRUPTIBLE |TASK_INTERRUPTIBLE,1)
// 喚醒函數實現 void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); // 獲取自旋鎖 // 調用__wake_up_common函數喚醒任務,注意這裏傳入的是sync為0 _wake_up_common(q, mode, nr_exclusive, 0); spin_unlock_irqrestore(&q->lock, flags); // 釋放自旋鎖 } // 喚醒操作_wake_up_common 函數的實現原理。 static void _wake_up_common(wait_queue_head_*qunsigned int mode, int r_exclusive, int sync) {
struct list_head *tmp,*next; list_for_each_safe(tmp, next, & q -> task_list){ // 遍歷等待列表 wait_queue_t * curr; unsigned flags; //獲取當前任務節點wait_queue_I curr = list_entry(tmp, wait_queue_, task_list); flags = curr -> flags//獲取當前等待標誌位
// 調用喚醒函數 // 如果成功喚醒任務、 當前任務標誌位 WQ_FLAG_EXCLUSIVE、 // nr_exclusive 互斥數量,自減為0,那麼退出循環 if (curr -> func(curr, mode, sync) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break; } }
// 默認喚醒函數 default wake function。其實現過程如下 int default_wake_function(walt_queue_curr, unsigned mode, int sync) { task_t * p = curr -> task; // 獲取當前任務 return try_to_wake_up(p, mode, sync); // 調用try_to_wake_up函數喚醒 }

複製代碼

我們可以看到是通過調用 try_to_wake_up 函數喚醒,其中涉及 Linux 調度器,且該方法涉及大量實現,這裏我們給出源碼的相關原理

首先獲取到當前 CPU 的執行隊列 runqueue,並且關閉中斷,保存當前狀態信息,如果當前狀態信息和傳入狀態不為 0 並且如果當前任務不屬於任何優先級隊列,執行

任務重調度實現:如果是對稱多處理器結構,那麼需要通過跨 CPU 調用,觸發目標 CPU 調度器的調度工作。

信號量代碼邏輯較為簡單,對於 P-V 操作,我們通過原子性對 counter 變量操作,然後其放入信號量的等待隊列中,或者將其從等待隊列中取出。

由於是多線程操作阻塞隊列,因此需要把自旋鎖來保護阻塞隊列。接着判斷任務是否處於任務就緒隊列 runqueue 中,如果在隊列中,則設置標誌為 TASKRUNNING 狀態,否將入 runqueue 中。這裏 runqueue 是每個 CPU 都擁有的任務就緒調度隊列。

三、Linux 內核互斥量代碼詳解

互斥量就是特殊版本的信號量,即 count 為 1 時的特殊信號量,互斥量就是特殊的信號量

    // 將struct semaphor 類型定義為 mutex_t     typedef struct semaphore mutex_t;        // mutex_init 其實傳入的type和name都沒用,直接是通過初始化信號量為1來代替    #define mutex_init(lock, type, name)	sema_init(lock,1)                // mutex_destroy則為初始化信號量-99    #define mutex_destroy(lock)	sema_init(lock,-99)
// 上鎖調用的是信號量的down操作 #define mutex_lock(lock, num) down(lock) // trylock 調用的是down_trylock,這裏不再講解。這裏的trylock 就是非阻塞的lock,獲取到鎖,返回0,否則返回1 #define mutex_trylock(lock) (down_trylock(lock)?0:1) //解鎖直接調用up操作 #define mutex_unlock(lock) up(lock)

複製代碼

總結

本文深入 Linux 內核源碼,從核心源碼入口講起,詳細對信號量、互斥量的內核代碼講解,其中對 P-V 操作實現逐行剖析,Linux 內核併發控制原理的鎖實現和原理在後續文章中一一講解,本文深入淺出 Linux 中斷控制的實現原理。