Go內存管理一文足矣

語言: CN / TW / HK

最早學習C、C++語言時,它們都是把內存的管理全部交給開發者,這種方式最靈活但是也最容易出問題,對人員要求極高;後來出現的一些高級語言像Java、JavaScript、C#、Go,都有語言自身解決了內存分配和回收問題,降低開發門檻,釋放生產力。然而對於想要深入理解原理的同學來説卻帶來了負擔,本篇文章主要從內存分配角度來梳理個人理解,後續文章中會介紹Go的垃圾回收機制。

進程的內存空間

  • 程序文件段(.text),包括二進制可執行代碼;
  • 已初始化數據段(.data),包括靜態常量;
  • 未初始化數據段(.bss),包括未初始化的靜態變量;(bss與data一般作為靜態存儲區)
  • 堆段,包括動態分配的內存,從低地址開始向上增長;
  • 文件映射段,包括動態庫、共享內存等,從低地址開始向上增長(跟硬件和內核版本有關 (opens new window));
  • 棧段,包括局部變量和函數調用的上下文等。棧的大小是固定的,一般是 8 MB。當然系統也提供了參數,以便我們自定義大小;

(以上來自小林coding)

上面是以進程為單位的視圖,進程中可能有多個線程,每個線程的棧空間是獨立的,但是都位於進程的棧區域中,而進程的堆區這是所有線程共享的,如下圖所示

Go語言中的GMP管理機制來説,只有M對應的是操作系統中的線程,所以goroutine中是保留了必要的(rp、bp、pc指針),當goroutine執行時,對應到指定的棧空間地址區中。

説的有點遠,回到本文主題。

內存分配一般有三種方式:靜態存儲區(根對象、靜態變量、常量)、棧(函數中的臨時局部變量)、堆(malloc、new等);

一般最長討論的是棧和堆,棧的特點可以認為是線性內存,管理簡單,分配比堆上更快,棧上分配的內存一般不需要程序員關心,程序語言都有專門的棧幀來管理(一般來説線程的棧空間是2M有的是8M,不能變化超過會崩潰,Go語言中goroutine是2kb,Go語言來有自己的棧擴容和縮容能力,64位系統超過1G則會崩潰)。(這裏説的線性內存其實在真實的機器物理內存中並不一定連續,這是因為操作系統提供了虛擬內存,讓各個程序看起來是獨佔整個物理內存,實際上對程序來説連續的地址空間,在操作系統視角下未必是連續的,可以參考這篇文章)

因為堆區是多個線程共用的,所以就需要一套機制來進行分配(考慮內存碎片、公平性、衝突解決);不同的內存分配管理方式的適用場景都不同。在詳細講解Go內存分配策略之前,我們先來看一個簡單的內存分配。

堆內存分配

堆內存在最開始時時連續的,當程序運行過程中大家都去堆中申請自己的使用空間,如果不做任何處理,那麼會產生兩個主要問題:

第一個內存碎片問題:

假設堆有100M,線程A申請500M,線程B申請200M,線程C申請300M,此時堆空間為A(500)B(200)C(300),然後A和C把空間釋放了,空間變為 空閒區(500m)線程B空間(200M)空閒區(300M) 這時候線程D需要留600M就會發現此時沒有完成的一塊空間給線程D;

所以一些高級語言中堆空間分配以類似操作系統的頁式分配的方式進行管理,分割出一個個小塊,一個小塊中包含一些元數據(如用户數據大小、是否空閒、頭指針、尾指針)、用户數據區、對齊padding區;

因為現代操作系統一個頁的區域一般是4kb,所以每次分配堆內存塊也會把用户數據區設置為4kb的倍數,同時因為還需要額外的區域來存儲元數據信息,但是元數據大小未必是4字節的倍數(像C++中可以設置4字節對齊 http://blog.csdn.net/sinat_2... ),在加上要考慮到CPU的偽共享緩存帶來的性能問題,所以需要一些額外的空閒空間來做補充(這就是對齊字節的意義)。

那麼如果只用鏈表形式來管理堆內存,看起來就像是下面這樣:

第二個則是併發衝突問題

因為多個線程在同時向堆內存中申請資源,如果沒有控制必然會出現衝突和覆寫問題,所以常見的方案是使用鎖,但是鎖則不可避免的帶來性能問題;所以有各種各樣的方案兼顧性能和碎片化以及預分配的策略來進行內存分配。

一個簡單的內存分配器

我們先按照上面的介紹來實現一個簡單的內存分配器,即實現一個malloc、free方法。

在這裏我們我們把data、bss、heap三個區域統稱為“data segment”,datasegment的結尾由一個指向此處的指針brk(program break)確定。如果想在heap上分配更多的空間,只需要請求系統由低像高移動brk指針,並把對應的內存首地址返回,釋放內存時,只需要向下移動brk指針即可。

在Linux和unix系統中,我們這裏就調用sbrk()方法來操縱brk指針:

  • sbrk(0)獲取當前brk的地址
  • 調用sbrk(x),x為正數時,請求分配x bytes的內存空間,x為負數時,請求釋放x bytes的內存空間

現在寫一個簡易版本的malloc:

void *malloc(size_t size) {
    void *block;
    block = sbrk(size);
    if (block == (void *) -1) {
        return NULL;    
    }
    return block;
}

現在問題是我們可以申請內存,但是如何釋放呢?因為釋放內存需要sbrk來移動brk指針向下縮減,但是我們目前沒有記錄這個區域的尺寸信息;

還有另外一個問題,假設我們現在申請了兩塊內存,A\B,B在A的後面,如果這時候用户想將A釋放,這時候brk指針在B的末尾處,那麼如果簡單的移動brk指針,就會對B進行破壞,所以對於A區域,我們不能直接還給操作系統,而是等B也同時被是釋放時再還給操作系統,同時也可以把A作為一個緩存,等下次有小於等於A區域的內存需要申請時,可以直接使用A內存,也可以將AB進行合併來統一分配(當然會存在內存碎片問題,這裏我就先不考慮)。

所以現在我們將內存按照塊的結構來進行劃分,為了簡單起見,我們使用鏈表的方式來管理;那麼除了本身用户申請的內存區域外,還需要一些額外的信息來記錄塊的大小、下一個塊的位置,當前塊是否在使用。整個結構如下:

typedef char ALIGN[16]; // padding字節對齊使用

union header {
    struct {
        size_t size; // 塊大小
        unsigned is_free; // 是否有在使用
        union header *next; // 下一個塊的地址
    } s;
    ALIGN stub;
};
typedef union header header_t;

這裏將一個結構體與一個16字節的數組封裝進一個union,這就保證了這個header始終會指向一個對齊16字節的地址(union的尺寸等於成員中最大的尺寸)。而header的尾部是實際給用户的內存的起始位置,所以這裏給用户的內存也是一個16字節對齊的(字節對齊目的為了提升緩存命中率和批處理能力提升系統效率)。

現在的內存結構如下圖所示:

現在我們使用head和tail來使用這個鏈表

header_t *head, *tail

為了支持多線程併發訪問內存,我們這裏簡單的使用全局鎖。

pthread_mutex_t global_malloc_lock;

我們的malloc現在是這樣:

void *malloc(size_t size)
{
    size_t total_size;
    void *block;
    header_t *header;
    if (!size) // 如果size為0或者NULL直接返回null
        return NULL;
    pthread_mutex_lock(&global_malloc_lock); // 全局加鎖
    header = get_free_block(size); // 先從已空閒區域找一塊合適大小的內存
    if (header) { // 如果能找到就直接使用,無需每次向操作系統申請
        header->s.is_free = 0; // 標誌這塊區域非空閒
        pthread_mutex_unlock(&global_malloc_lock); // 解鎖
                // 這個header對外部應該是完全隱藏的,真正用户需要的內存在header尾部的下一個位置
        return (void*)(header + 1); 
    }
         // 如果空閒區域沒有則向操作系統申請一塊內存,因為我們需要header存儲一些元數據
         // 所以這裏要申請的內存實際是元數據區+用户實際需要的大小
    total_size = sizeof(header_t) + size;
    block = sbrk(total_size);
    if (block == (void*) -1) { // 獲取失敗解鎖、返回NULL
        pthread_mutex_unlock(&global_malloc_lock);
        return NULL;
    }
         // 申請成功設置元數據信息
    header = block;
    header->s.size = size;
    header->s.is_free = 0;
    header->s.next = NULL;
         // 更新鏈表對應指針
    if (!head)
        head = header;
    if (tail)
        tail->s.next = header;
    tail = header;
         // 解鎖返回給用户內存
    pthread_mutex_unlock(&global_malloc_lock);
    return (void*)(header + 1);
}

// 這個函數從鏈表中已有的內存塊進行判斷是否存在空閒的,並且能夠容得下申請區域的內存塊
// 有則返回,每次都從頭遍歷,暫不考慮性能和內存碎片問題。
header_t *get_free_block(size_t size)
{
    header_t *curr = head;
    while(curr) {
        if (curr->s.is_free && curr->s.size >= size)
            return curr;
        curr = curr->s.next;
    }
    return NULL;
}

可以看下現在我們的內存分配具有的基本能力:

  • 通過加鎖保證線程安全
  • 通過鏈表的方式管理內存塊,並解決內存複用問題。

接下來我們來寫free函數,首先要看下需要釋放的內存是否在brk的位置,如果是,則直接還給操作系統,如果不是,標記為空閒,以後複用。

void free(void *block)
{
    header_t *header, *tmp;
    void *programbreak;

    if (!block)
        return;
    pthread_mutex_lock(&global_malloc_lock); // 全局加鎖
    header = (header_t*)block - 1; // block轉變為header_t為單位的結構,並向前移動一個單位,也就是拿到了這個塊的元數據的起始地址

    programbreak = sbrk(0); // 獲取當前brk指針的位置
    if ((char*)block + header->s.size == programbreak) { // 如果當前內存塊的末尾位置(即tail塊)剛好是brk指針位置就把它還給操作系統
        if (head == tail) { // 只有一個塊,直接將鏈表設置為空
            head = tail = NULL;
        } else {// 存在多個塊,則找到tail的前一個快,並把它next設置為NULL
            tmp = head;
            while (tmp) {
                if(tmp->s.next == tail) {
                    tmp->s.next = NULL;
                    tail = tmp;
                }
                tmp = tmp->s.next;
            }
        }
                  // 將內存還給操作系統
        sbrk(0 - sizeof(header_t) - header->s.size);
        pthread_mutex_unlock(&global_malloc_lock); // 解鎖
        return;
    }
         // 如果不是最後的鏈表就標誌位free,後面可以複用
    header->s.is_free = 1;
    pthread_mutex_unlock(&global_malloc_lock);
}

以上就是一個簡單的內存分配器;可以看到我們使用鏈表來管理堆內存區域,並通過全局鎖來線程安全問題,同時也提供一定的內存複用能力。當然這個內存分配器也存在幾個嚴重的問題:

  • 全局鎖在高併發場景下會帶來嚴重性能問題
  • 內存複用每次從頭遍歷也存在一些性能問題
  • 內存碎片問題,我們內存複用時只是簡單的判斷塊內存是否大於需要的內存區域,如果極端情況下,我們一塊空閒內存為1G,而新申請內存為1kb,那就造成嚴重的碎片浪費
  • 內存釋放存在問題,只會把末尾處的內存還給操作系統,中間的空閒部分則沒有機會還給操作系統。

那麼下面我們介紹一些完善的內存分配器是如何處理的,以及Go中的內存分配策略

TCMalloc

內存分配器多種多樣,概括起來主要是以下幾個思想:

1、劃分內存分配粒度,先將內存區域以最小單位定義出來,然後區分對象大小分別對待。小對象分為若干類,使用對應的數據結構來管理,降低內存碎片化

2、垃圾回收及預測優化:釋放內存時,能夠合併小內存為大內存,根據策略進行緩存,下次可以直接複用提升性能。達到一定條件釋放回操作系統,避免長期佔用導致內存不足。

3、優化多線程下的性能:針對多線程每個線程有自己獨立的一段堆內存分配區。線程對這片區域可以無鎖訪問,提升性能

這其中谷歌的TCMalloc是業界的佼佼者,Go也是借鑑了它的思想,接下來我們來介紹一下。

TCMalloc的幾個重要概念:

  1. Page:操作系統對內存管理以頁為單位,TCMalloc也是這樣,只不過TCMalloc裏的Page大小與操作系統裏的大小並不一定相等,而是倍數關係。《TCMalloc解密》裏稱x64下Page大小是8KB。
  2. Span:一組連續的Page被稱為Span,比如可以有2個頁大小的Span,也可以有16頁大小的Span,Span比Page高一個層級,是為了方便管理一定大小的內存區域,Span是TCMalloc中內存管理的基本單位。
  3. ThreadCache:每個線程各自的Cache,一個Cache包含多個空閒內存塊鏈表,每個鏈表連接的都是內存塊,同一個鏈表上內存塊的大小是相同的,也可以説按內存塊大小,給內存塊分了個類,這樣可以根據申請的內存大小,快速從合適的鏈表選擇空閒內存塊。由於每個線程有自己的ThreadCache,所以ThreadCache訪問是無鎖的。
  4. CentralCache:是所有線程共享的緩存,也是保存的空閒內存塊鏈表,鏈表的數量與ThreadCache中鏈表數量相同,當ThreadCache內存塊不足時,可以從CentralCache取,當ThreadCache內存塊多時,可以放回CentralCache。由於CentralCache是共享的,所以它的訪問是要加鎖的。
  5. PageHeap:PageHeap是堆內存的抽象,PageHeap存的也是若干鏈表,鏈表保存的是Span,當CentralCache沒有內存的時,會從PageHeap取,把1個Span拆成若干內存塊,添加到對應大小的鏈表中,當CentralCache內存多的時候,會放回PageHeap。如上圖,分別是1頁Page的Span鏈表,2頁Page的Span鏈表等,最後是large span set,這個是用來保存中大對象的。毫無疑問,PageHeap也是要加鎖的。

TCMalloc中區分了不同級別的對象,對應不同的分配流程:

  1. 小對象大小:0~256KB;分配流程:ThreadCache -> CentralCache -> HeapPage,大部分時候,ThreadCache緩存都是足夠的,不需要去訪問CentralCache和HeapPage,無鎖分配加無系統調用,分配效率是非常高的。
  2. 中對象大小:257~1MB;分配流程:直接在PageHeap中選擇適當的大小即可,128 Page的Span所保存的最大內存就是1MB。
  3. 大對象大小:>1MB;分配流程:從large span set選擇合適數量的頁面組成span,用來存儲數據。

(以上圖文借鑑自:圖解TCMalloc、 Go內存分配那些事

除此之外,TCMalloc中還涉及內存釋放時多個小區域合併為大區域的方法,大家感興趣的可以看這篇文章:TCMalloc解密

Go內存分配方案

Go中的內存分配策略是借鑑TCMalloc的方案來進行內存分配。同時結合Go自身特點,比TCMalloc更加細緻的劃分對象等級,將TCMalloc中針對線程的緩存變更為綁定到邏輯處理器P上的緩存區域。除此之外Go還結合自身的逃逸分析和垃圾回收策略整體制定了一套內存分配策略。

Go通過編譯階段的逃逸分析來判斷變量應該被分配到棧還是堆上,關於逃逸分析我們不做過多介紹,總結以下幾點:

  • 棧比堆更高效,不需要GC,因此Go會盡可能的將內存分配到棧上。Go的協程棧可以自動擴容和縮容
  • 當分配到棧上可能會引起非法內存訪問等問題,則會使用堆,如:
    • 當一個值在函數被調用後訪問(即作為返回值返回變量地址),這個值極有可能被分配到堆上
    • 當編譯器檢測到某個值過大,這個值被分配到堆上(棧擴容和縮容有成本)
    • 當編譯時,編譯器不知道這個值的大小(slice、map等引用類型)這個值會被分配到堆上
  • 最後,不要去猜值在哪,只有編譯器和編譯器開發者知道

Go通過細緻的對象劃分、極致的多級緩存+無鎖策略緩存、精確的位圖管理來進行精細化的內存管理和性能保障。Go中把所有對象分為三個層級:

  • 微小對象(0,16byte):分配流程為,mache->mcentral->mheap位圖查找->mheap基數樹查找->操作系統分配
  • 小對象 [16byte, 32KB]:分配流程與微小對象一樣
  • 大對象(32KB以上):分為流程為,mheap基數樹查找->操作系統分配(不經過mcache和mcentral)

Go中的內存分配流程可以看下面的概覽圖:

主要涉及如下概念:

page

與TCMalloc中的Page相同,一個page大小為8kb(為操作系統中頁的兩倍),上圖中一個淺藍色的長方形代表一個page

span

span是Go中內存管理的基本單位,go中為mspan,span的大小是page的倍數,上圖中一個淡紫色的長方形為一個span

Go1.9.2往後一共劃分了67級的mspan;

比如第一級span中每個對象大小是8b、第一級span的大小是一個page即8192b、一共可以存放1024個對象。

對應到代碼中放在一個叫做class_to_size的數組中,存儲每個級別的span中的object的大小

// path: /usr/local/go/src/runtime/sizeclasses.go
const _NumSizeClasses = 67
var class_to_size = [_NumSizeClasses]uint16{0, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 288, 320, 352, 384, 416, 448, 480, 512, 576, 640, 704, 768, 896, 1024, 1152, 1280, 1408, 1536,1792, 2048, 2304, 2688, 3072, 3200, 3456, 4096, 4864, 5376, 6144, 6528, 6784, 6912, 8192, 9472, 9728, 10240, 10880, 12288, 13568, 14336, 16384, 18432, 19072, 20480, 21760, 24576, 27264, 28672, 32768}

還有一個class_to_allocnpages數組存儲每個級別的span對應的page的個數

// path: /usr/local/go/src/runtime/sizeclasses.go

const _NumSizeClasses = 67

var class_to_allocnpages = [_NumSizeClasses]uint8{0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 2, 1, 2, 1, 3, 2, 3, 1, 3, 2, 3, 4, 5, 6, 1, 7, 6, 5, 4, 3, 5, 7, 2, 9, 7, 5, 8, 3, 10, 7, 4}

代碼中mspan結構體的定義如下:

// path: /usr/local/go/src/runtime/mheap.go

type mspan struct {
    //鏈表前向指針,用於將span鏈接起來
    next *mspan    
    
    //鏈表前向指針,用於將span鏈接起來
    prev *mspan    
    
    // 起始地址,也即所管理頁的地址
    startAddr uintptr 
    
    // 管理的頁數
    npages uintptr     
    // 塊個數,表示有多少個塊可供分配
    nelems uintptr 
        // 用來輔助確定當前span中的元素分配到了哪裏         
          freeindex uintptr

    //分配位圖,每一位代表一個塊是否已分配
    allocBits *gcBits 
         // allocBits的補碼,以用來快速查找內存中未被使用的內存
         allocCache unit64

    // 已分配塊的個數
    allocCount uint16 
    
    // class表中的class ID,和Size Classs相關
    spanclass spanClass  

    // class表中的對象大小,也即塊大小
    elemsize uintptr 
         // GC中來標記哪些塊已經釋放了
         gcmarkBits *gcBits
}

這裏有一個spanClass需要注意下,他其實是class_to_size的兩倍,這是因為每個類別的對象對應兩個mspan,一個分配給含有指針的的對象,一個分配給不含有指針的對象,這樣垃圾回收時,針對無指針對象的span區域不需要進行復雜的標記處理,提升效果。

舉個例子,第10級的size_class中一個對象是144字節,一個span佔用一個page,共可以存儲56個對象(可以看到56個對象佔不滿1個page,所以尾部會有128字節是無用的),它的mspan結構如下:

當然微小對象的分配會複用一個對象,比如兩個char類型都放在一個object中。後續會介紹。

mcache

mcache與TCMalloc中的ThreadCache類似,每個層級的span都會在mcache中保存一份;每個邏輯處理器P會有自己的mcache,對這部分區域的訪問是無鎖的。mcache的結構中有幾個字段需要關注:

//path: /usr/local/go/src/runtime/mcache.go

type mcache struct {
    // mcache中對應各個等級的span都會有兩份緩存
    alloc [numSpanClasses]*mspan
    // 下面三個是在微小對象分配時專門使用
    tiny             uintptr
    tinyoffset       uintptr
    local_tinyallocs uintptr
}

numSpanClasses = _NumSizeClasses << 1

可以看到macache包含所有規格的span,微小對象和小對象都會先從這裏開始找空間,大對象(超過32kb)沒有對應的class索引,不經過這裏。alloc數組中一共有134個元素,每一個級別的span在其中有兩個即67x2;因為每一個級別對應兩個span,一個給無指針的對象使用一半給有指針的對象使用(無指針對象在垃圾回收時不需要去掃描他是否引用了其他活躍對象),結構如下:

mcache也是從mcentral中獲取的內存,Go運行時初始化時會調用runtime.allocmache初始化線程緩存

// init initializes pp, which may be a freshly allocated p or a
// previously destroyed p, and transitions it to status _Pgcstop.
func (pp *p) init(id int32) {
   pp.id = id
   ////////
   .........
   /////////
   if pp.mcache == nil {
      if id == 0 {
         if mcache0 == nil {
            throw("missing mcache?")
         }
         // Use the bootstrap mcache0. Only one P will get
         // mcache0: the one with ID 0.
         pp.mcache = mcache0
      } else {
         pp.mcache = allocmcache()
      }
   }
   ..........
}

該函數會在系統棧中調用runtime.mheap中的緩存分配器初始化新的runtime.mcache結構體:

// dummy mspan that contains no free objects.
var emptymspan mspan
func allocmcache() *mcache {
    var c *mcache
         // 在系統棧中調用mheap的緩存分配器創建mcache
    systemstack(func() {
        lock(&mheap_.lock) // mheap是所有協程共用的需要加鎖訪問
        c = (*mcache)(mheap_.cachealloc.alloc())
        c.flushGen = mheap_.sweepgen
        unlock(&mheap_.lock)
    })
         // 將alloc數組設置為空span
    for i := range c.alloc {
        c.alloc[i] = &emptymspan
    }
    c.nextSample = nextSample()
    return c
}

但是剛剛初始化的mcache中所有的mspan都是空的佔位符emptymspan

之後需要時會從mcentral中獲取指定spanClass的span:

// refill acquires a new span of span class spc for c. This span will
// have at least one free object. The current span in c must be full.
//
// Must run in a non-preemptible context since otherwise the owner of
// c could change.
func (c *mcache) refill(spc spanClass) {
   // Return the current cached span to the central lists.
   s := c.alloc[spc]
    ...............
   if s != &emptymspan {
      // Mark this span as no longer cached.
      if s.sweepgen != mheap_.sweepgen+3 {
         throw("bad sweepgen in refill")
      }
      mheap_.central[spc].mcentral.uncacheSpan(s)
   }

   // Get a new cached span from the central lists.
   s = mheap_.central[spc].mcentral.cacheSpan()
   ................
   ...............
   c.alloc[spc] = s
}

refill這個方法在runtime.malloc方法中會調用;

mcentral

mcentral是所有線程共享的的緩存,需要加鎖訪問;它的主要作用是為mcache提供切分好的mspan資源。每個spanClass對應一個級別的mcentral;mcentral整體是在mheap中管理的,它之中包含兩個mspan鏈表,Go1.17.7版本中分別為partial代表有空閒區域的span、full代表無空閒區域的span列表。(這裏並不是網上很多文章講的nonempty和empty隊列)

type mcentral struct {
   spanclass spanClass
   partial [2]spanSet // list of spans with a free object
   full    [2]spanSet // list of spans with no free objects
}
type spanSet struct {
   spineLock mutex
   spine     unsafe.Pointer // *[N]*spanSetBlock, accessed atomically
   spineLen  uintptr        // Spine array length, accessed atomically
   spineCap  uintptr        // Spine array cap, accessed under lock
   
   index headTailIndex
}

對於微小對象和小對象的內存會首先從mcache和mcentral中獲取,這部分要看runtime.malloc代碼

微小對象分配

Go中小於16字節的作為微小對象,微小對象會被放入sizeClass為2的span中即16字節,這裏並不是説每次微小對象分配都分配一個16字節的空間,而是會把一個16字節的空間按照2、4、8的規則進行字節對齊的形式來存儲,比如1字節的char會被分配2字節空間,9字節的數據會被分配2+8=10字節空間。

off := c.tinyoffset
// Align tiny pointer for required (conservative) alignment.
if size&7 == 0 {
   off = alignUp(off, 8)
} else if sys.PtrSize == 4 && size == 12 {
   // Conservatively align 12-byte objects to 8 bytes on 32-bit
   // systems so that objects whose first field is a 64-bit
   // value is aligned to 8 bytes and does not cause a fault on
   // atomic access. See issue 37262.
   // TODO(mknyszek): Remove this workaround if/when issue 36606
   // is resolved.
   off = alignUp(off, 8)
} else if size&3 == 0 {
   off = alignUp(off, 4)
} else if size&1 == 0 {
   off = alignUp(off, 2)
}

如果當前的一個16字節元素能夠容納新的微小對象則充分利用當前元素空間

if off+size <= maxTinySize && c.tiny != 0 {
   // The object fits into existing tiny block.
   x = unsafe.Pointer(c.tiny + off)
   c.tinyoffset = off + size
   c.tinyAllocs++
   mp.mallocing = 0
   releasem(mp)
   return x
}

否則從下一個元素中去分配空間

// Allocate a new maxTinySize block.
span = c.alloc[tinySpanClass]
v := nextFreeFast(span)
if v == 0 {
   v, span, shouldhelpgc = c.nextFree(tinySpanClass)
}
x = unsafe.Pointer(v)
(*[2]uint64)(x)[0] = 0
(*[2]uint64)(x)[1] = 0
// See if we need to replace the existing tiny block with the new one
// based on amount of remaining free space.
if !raceenabled && (size < c.tinyoffset || c.tiny == 0) {
   // Note: disabled when race detector is on, see comment near end of this function.
   c.tiny = uintptr(x)
   c.tinyoffset = size
}
size = maxTinySize

nextFreeFast和nextFree的內容在下面介紹

小對象分配

var sizeclass uint8
if size <= smallSizeMax-8 {
   sizeclass = size_to_class8[divRoundUp(size, smallSizeDiv)]
} else {
   sizeclass = size_to_class128[divRoundUp(size-smallSizeMax, largeSizeDiv)]
}
size = uintptr(class_to_size[sizeclass])
spc := makeSpanClass(sizeclass, noscan)
span = c.alloc[spc]
v := nextFreeFast(span)
if v == 0 {
   v, span, shouldhelpgc = c.nextFree(spc)
}
x = unsafe.Pointer(v)
if needzero && span.needzero != 0 {
   memclrNoHeapPointers(unsafe.Pointer(v), size)
}

1-6行,根據參數中要分配的空間大小計算對應的sizeClass即對象大小

7-9行,根據對象大小的等級以及是否有指針(noscan)找到mcache的alloc數組中對應的span

第10行,先計算當前的span中是否有空閒空間,並返回可分配的空閒空間地址

11-13行,如果mcache當前對應的span沒有空閒空間,則進入到nextFree函數尋找一個空閒的span

然後經過其他處理(垃圾回收標記、鎖定關係標識等)返回給調用方

同時也需要注意到,這裏的空間分配都是需要做內存對齊的,比如申請17字節的空間,但是span的分類中是按照8的倍數進行增長的,比17大且最接近的級別是32,所以即使需要17字節,在內部也會使用一個32字節的空間,這也是上面代碼中需要根據size計算sizeClass的原因;也可以看到這種分配方式必然會存在內存浪費,TCMalloc算法機儘量將浪費率控制在15%以內

nextFreeFast中可以看到用到了上面mspan中的freeIndex、allocCache等屬性;

因為這裏使用了allocCache來對前64字節進行快速訪問,如果當前分配字節在allocCache範圍之內,可以直接利用位圖緩存來進行快速計算可分配的區域;至於為什麼是64字節,我猜與CPU中CacheLine的大小有關,64位CPU的cache line就是64字節,利用此來提升CPU緩存命中率,提升性能。

// nextFreeFast returns the next free object if one is quickly available.
// Otherwise it returns 0.
func nextFreeFast(s *mspan) gclinkptr {
   theBit := sys.Ctz64(s.allocCache) // Is there a free object in the allocCache?
   if theBit < 64 {
      result := s.freeindex + uintptr(theBit)
      if result < s.nelems {
         freeidx := result + 1
         if freeidx%64 == 0 && freeidx != s.nelems {
            return 0
         }
         s.allocCache >>= uint(theBit + 1)
         s.freeindex = freeidx
         s.allocCount++
         return gclinkptr(result*s.elemsize + s.base())
      }
   }
   return 0
}

關於freeIndex和allocCache的關係,實際是利用了bitmap位圖緩存和階段標記的方式來進行配合,因為allocCache一次只能緩存64字節數據,所以在span被分配過程中,allocCache是滾動前進的,一次標識一塊64字節區域,而freeIndex代表上次分配結束的元素位置,通過當前allocCache中的空閒位置+freeIndex即可以算出當前span被分配的區域。

具體計算方式可以mbitmap.go中的nextFreeIndex方法

// nextFreeIndex returns the index of the next free object in s at
// or after s.freeindex.
// There are hardware instructions that can be used to make this
// faster if profiling warrants it.
func (s *mspan) nextFreeIndex() uintptr {
   sfreeindex := s.freeindex
   snelems := s.nelems
   if sfreeindex == snelems {
      return sfreeindex
   }
   if sfreeindex > snelems {
      throw("s.freeindex > s.nelems")
   }

   aCache := s.allocCache

   bitIndex := sys.Ctz64(aCache)
   for bitIndex == 64 {
      // Move index to start of next cached bits.
      sfreeindex = (sfreeindex + 64) &^ (64 - 1)
      if sfreeindex >= snelems {
         s.freeindex = snelems
         return snelems
      }
      whichByte := sfreeindex / 8
      // Refill s.allocCache with the next 64 alloc bits.
      s.refillAllocCache(whichByte)
      aCache = s.allocCache
      bitIndex = sys.Ctz64(aCache)
      // nothing available in cached bits
      // grab the next 8 bytes and try again.
   }
   result := sfreeindex + uintptr(bitIndex)
   if result >= snelems {
      s.freeindex = snelems
      return snelems
   }

   s.allocCache >>= uint(bitIndex + 1)
   sfreeindex = result + 1

   if sfreeindex%64 == 0 && sfreeindex != snelems {
      // We just incremented s.freeindex so it isn't 0.
      // As each 1 in s.allocCache was encountered and used for allocation
      // it was shifted away. At this point s.allocCache contains all 0s.
      // Refill s.allocCache so that it corresponds
      // to the bits at s.allocBits starting at s.freeindex.
      whichByte := sfreeindex / 8
      s.refillAllocCache(whichByte)
   }
   s.freeindex = sfreeindex
   return result
}

在回到nextFree函數中

func (c *mcache) nextFree(spc spanClass) (v gclinkptr, s *mspan, shouldhelpgc bool) {
   s = c.alloc[spc]
   shouldhelpgc = false
   freeIndex := s.nextFreeIndex() // 獲取可分配的元素位置
   if freeIndex == s.nelems { 
       //如果當前span沒有可分配空間,調用refill方法把當前span交給mcentral的full隊列
       // 並從mcentral的partial隊列取一個有空閒的span放到mcache上
      // The span is full.
      if uintptr(s.allocCount) != s.nelems {
         println("runtime: s.allocCount=", s.allocCount, "s.nelems=", s.nelems)
         throw("s.allocCount != s.nelems && freeIndex == s.nelems")
      }
      c.refill(spc)
      shouldhelpgc = true
      s = c.alloc[spc]

      freeIndex = s.nextFreeIndex() // 在新獲取的span中重新計算freeIndex
   }

   if freeIndex >= s.nelems {
      throw("freeIndex is not valid")
   }

   v = gclinkptr(freeIndex*s.elemsize + s.base()) // 獲取span中數據的起始地址加上當前已分配的區域獲取一個可分配的空閒區域
   s.allocCount++
   if uintptr(s.allocCount) > s.nelems {
      println("s.allocCount=", s.allocCount, "s.nelems=", s.nelems)
      throw("s.allocCount > s.nelems")
   }
   return
}

函數第4行獲取下一個被分配的元素位置,如果freeIndex等於span中的最大元素數目,代表當前級別span已經被分配完了,這時候需要調用mcache的refill方法去mheap中對應的spanClass的mcentral中,把當前沒有空閒的span還給mcentral的full隊列,並從partail對列中獲取一個有空閒區域的span放到mcache上。

下方可以看到refill方法,如果mcache對應等級的span沒有則直接從mcentral中獲取,否則代表當前span已經沒有可分配的空間,所以需要把這個span重新交給mcentral,等待垃圾回收器標記完成之後則可以後面繼續使用。

func (c *mcache) refill(spc spanClass) {
   // Return the current cached span to the central lists.
   s := c.alloc[spc]
    ...............
   if s != &emptymspan {
      // Mark this span as no longer cached.
      if s.sweepgen != mheap_.sweepgen+3 {
         throw("bad sweepgen in refill")
      }
      mheap_.central[spc].mcentral.uncacheSpan(s)
   }

   // Get a new cached span from the central lists.
   s = mheap_.central[spc].mcentral.cacheSpan()
   ................
   ...............
   c.alloc[spc] = s
}

進入到cacheSpan函數中可以看到,這裏的獲取空閒span經過以下幾個順序:

  1. 是先從partail隊列中已經被垃圾回收清掃的部分嘗試拿一個span
  2. 如果pop沒有代表當前沒有被GC清掃的span,從partial隊列中未被GC清掃的部分嘗試獲取空閒span,並進行清掃
  3. 如果partail隊列都沒獲取到,嘗試從full隊列的未清掃區獲取一個span,進行清掃,並放入到full隊列的以清掃區,代表這個span不會分配給其他的mcache了;
  4. 如果未清掃區也沒有獲取到對應的span則代表mcentral需要擴容,向mheap申請一塊區域。

同時可以發現這裏的遍歷次數寫死為100,可能是覺得差不多就得了,畢竟這些操作也需要耗時,先跟mheap要一個得了。

如果獲得了空閒span,跳轉到haveSpan代碼段,這裏更新freeindex和allocCache位圖緩存,返回span;

// Allocate a span to use in an mcache.
func (c *mcentral) cacheSpan() *mspan {
   // Deduct credit for this span allocation and sweep if necessary.
   spanBytes := uintptr(class_to_allocnpages[c.spanclass.sizeclass()]) * _PageSize
   deductSweepCredit(spanBytes, 0)

   traceDone := false
   if trace.enabled {
      traceGCSweepStart()
   }
   
   spanBudget := 100

   var s *mspan
   sl := newSweepLocker()
   sg := sl.sweepGen

   // Try partial swept spans first.
   if s = c.partialSwept(sg).pop(); s != nil {
      goto havespan
   }

   // Now try partial unswept spans.
   for ; spanBudget >= 0; spanBudget-- {
      s = c.partialUnswept(sg).pop()
      if s == nil {
         break
      }
      if s, ok := sl.tryAcquire(s); ok {
         // We got ownership of the span, so let's sweep it and use it.
         s.sweep(true)
         sl.dispose()
         goto havespan
      }
   }
   // Now try full unswept spans, sweeping them and putting them into the
   // right list if we fail to get a span.
   for ; spanBudget >= 0; spanBudget-- {
      s = c.fullUnswept(sg).pop()
      if s == nil {
         break
      }
      if s, ok := sl.tryAcquire(s); ok {
         // We got ownership of the span, so let's sweep it.
         s.sweep(true)
         // Check if there's any free space.
         freeIndex := s.nextFreeIndex()
         if freeIndex != s.nelems {
            s.freeindex = freeIndex
            sl.dispose()
            goto havespan
         }
         // Add it to the swept list, because sweeping didn't give us any free space.
         c.fullSwept(sg).push(s.mspan)
      }
      // See comment for partial unswept spans.
   }
   sl.dispose()
   if trace.enabled {
      traceGCSweepDone()
      traceDone = true
   }

   // We failed to get a span from the mcentral so get one from mheap.
   s = c.grow()
   if s == nil {
      return nil
   }

   // At this point s is a span that should have free slots.
havespan:
   if trace.enabled && !traceDone {
      traceGCSweepDone()
   }
   n := int(s.nelems) - int(s.allocCount)
   if n == 0 || s.freeindex == s.nelems || uintptr(s.allocCount) == s.nelems {
      throw("span has no free objects")
   }
   freeByteBase := s.freeindex &^ (64 - 1)
   whichByte := freeByteBase / 8
   // Init alloc bits cache.
   s.refillAllocCache(whichByte)

   // Adjust the allocCache so that s.freeindex corresponds to the low bit in
   // s.allocCache.
   s.allocCache >>= s.freeindex % 64

   return s
}

對於mcache如果覺得當前級別的span剩餘空間無法滿足用户要求的大小,則會把這個span交給mcentral;mcentral根據條件判斷是直接放到堆中等待回收還是需要放到自己來管理,如果自己管理那麼再判斷這個span的freeIndex與容量的關係如果還有剩餘容量則進入partialSweep隊列,如果麼有容量則進入fullSweep中。

func (c *mcentral) uncacheSpan(s *mspan) {
   if s.allocCount == 0 {
      throw("uncaching span but s.allocCount == 0")
   }

   sg := mheap_.sweepgen
   stale := s.sweepgen == sg+1

   // Fix up sweepgen.
   if stale {
      // Span was cached before sweep began. It's our
      // responsibility to sweep it.
      //
      // Set sweepgen to indicate it's not cached but needs
      // sweeping and can't be allocated from. sweep will
      // set s.sweepgen to indicate s is swept.
      atomic.Store(&s.sweepgen, sg-1)
   } else {
      // Indicate that s is no longer cached.
      atomic.Store(&s.sweepgen, sg)
   }

   // Put the span in the appropriate place.
   if stale {
      // It's stale, so just sweep it. Sweeping will put it on
      // the right list.
      //
      // We don't use a sweepLocker here. Stale cached spans
      // aren't in the global sweep lists, so mark termination
      // itself holds up sweep completion until all mcaches
      // have been swept.
      ss := sweepLocked{s}
      ss.sweep(false)
   } else {
      if int(s.nelems)-int(s.allocCount) > 0 {
         // Put it back on the partial swept list.
         c.partialSwept(sg).push(s)
      } else {
         // There's no free space and it's not stale, so put it on the
         // full swept list.
         c.fullSwept(sg).push(s)
      }
   }
}

可以看到mcentral中的partial和full都是擁有兩個元素的spanSet數組,這樣的目的其實是雙緩存策略,當垃圾回收只回收和用户協程併發進行,每次回收一半而寫入另一半,下一次交替過來,這樣保證永遠有空間可以分配,而不是串行等待垃圾回收完成後在分配空間,以空間換時間來提升響應性能

type mcentral struct {
   spanclass spanClass

   partial [2]spanSet // list of spans with a free object
   full    [2]spanSet // list of spans with no free objects
}

mcentral中的grow方法涉及到mheap的內存分配和管理,下面介紹。

mheap

mheap與TCMalloc中的PageHeap類似,代表Go中所持有的堆空間,mcentral管理的span也是從這裏拿到的。當mcentral沒有空閒span時,會向mheap申請,如果mheap中也沒有資源了,會向操作系統來申請內存。向操作系統申請是按照頁為單位來的(4kb),然後把申請來的內存頁按照page(8kb)、span(page的倍數)、chunk(512kb)、heapArena(64m)這種級別來組織起來。

pageCache的位圖緩存

mcentral中的grow方法會調用mheap的alloc方法

// grow allocates a new empty span from the heap and initializes it for c's size class.
func (c *mcentral) grow() *mspan {
   npages := uintptr(class_to_allocnpages[c.spanclass.sizeclass()])
   size := uintptr(class_to_size[c.spanclass.sizeclass()])

   s, _ := mheap_.alloc(npages, c.spanclass, true)
   if s == nil {
      return nil
   }

   // Use division by multiplication and shifts to quickly compute:
   // n := (npages << _PageShift) / size
   n := s.divideByElemSize(npages << _PageShift)
   s.limit = s.base() + size*n
   heapBitsForAddr(s.base()).initSpan(s)
   return s
}

然後內部調用allocSpan方法。

func (h *mheap) alloc(npages uintptr, spanclass spanClass, needzero bool) (*mspan, bool) {
   // Don't do any operations that lock the heap on the G stack.
   // It might trigger stack growth, and the stack growth code needs
   // to be able to allocate heap.
   var s *mspan
   systemstack(func() {
      // To prevent excessive heap growth, before allocating n pages
      // we need to sweep and reclaim at least n pages.
      if !isSweepDone() {
         h.reclaim(npages)
      }
      s = h.allocSpan(npages, spanAllocHeap, spanclass)
   })

   if s == nil {
      return nil, false
   }
   isZeroed := s.needzero == 0
   if needzero && !isZeroed {
      memclrNoHeapPointers(unsafe.Pointer(s.base()), s.npages<<_PageShift)
      isZeroed = true
   }
   s.needzero = 0
   return s, isZeroed
}

而在allocSpan方法中,如果要分配的區域不大,並且不需要考慮物理對齊的情況下,會首先從邏輯處理器的pageCache緩存上去獲取空間,這樣的目的是為了無鎖分配空間提升性能(又是空間換時間)。

下面的16行可以看到先從邏輯處理器P的pcache上嘗試獲取對應的空間。

func (h *mheap) allocSpan(npages uintptr, typ spanAllocType, spanclass spanClass) (s *mspan) {
   // Function-global state.
   gp := getg()
   base, scav := uintptr(0), uintptr(0)

   // On some platforms we need to provide physical page aligned stack
   // allocations. Where the page size is less than the physical page
   // size, we already manage to do this by default.
   needPhysPageAlign := physPageAlignedStacks && typ == spanAllocStack && pageSize < physPageSize

   // If the allocation is small enough, try the page cache!
   // The page cache does not support aligned allocations, so we cannot use
   // it if we need to provide a physical page aligned stack allocation.
   pp := gp.m.p.ptr()
   if !needPhysPageAlign && pp != nil && npages < pageCachePages/4 {
      c := &pp.pcache

      // If the cache is empty, refill it.
      if c.empty() {
         lock(&h.lock)
         *c = h.pages.allocToCache()
         unlock(&h.lock)
      }

      // Try to allocate from the cache.
      base, scav = c.alloc(npages)
      if base != 0 {
         s = h.tryAllocMSpan()
         if s != nil {
            goto HaveSpan
         }
         // We have a base but no mspan, so we need
         // to lock the heap.
      }
   }

pageCache的結構如下:

代碼在runtime/mpagecache.go中

// 代表pageCache能夠使用的空間數,8x64一共是512kb空間
const pageCachePages = 8 * unsafe.Sizeof(pageCache{}.cache)

// pageCache represents a per-p cache of pages the allocator can
// allocate from without a lock. More specifically, it represents
// a pageCachePages*pageSize chunk of memory with 0 or more free
// pages in it.
type pageCache struct {
   base  uintptr // base代表該虛擬內存的基線地址
   // cache和scav都是起到位圖標記的作用,cache主要是標記哪些內存位置已經被使用了,scav標記已經被清除的區域
   // 用來加速垃圾未收,在垃圾回收一定條件下兩個可以互換,提升分配和垃圾回收效率。
   cache uint64  // 64-bit bitmap representing free pages (1 means free)
   scav  uint64  // 64-bit bitmap representing scavenged pages (1 means scavenged)
}

下面回到mheap的allocSpan方法中

基數樹

如果pageCache不滿足分配條件或者沒有空閒空間了,則對mheap進行全局加鎖獲取內存

// For one reason or another, we couldn't get the
// whole job done without the heap lock.
lock(&h.lock)

.................
if base == 0 {
   // Try to acquire a base address.
   base, scav = h.pages.alloc(npages)
   if base == 0 {
      if !h.grow(npages) {
         unlock(&h.lock)
         return nil
      }
      base, scav = h.pages.alloc(npages)
      if base == 0 {
         throw("grew heap, but no adequate free space found")
      }
   }
}
................

unlock(&h.lock)

這裏首先從mheap的pages中去獲取,這個pages是一個pageAlloc的結構體實例,它是以基數樹的形式來進行管理。最多有5層,每個節點都對應一個pallocSum對象,除葉子節點外每個節點都包含連續8個子節點的內存信息,越上層的節點包含的內存信息越多,一顆完整的基數樹最多能夠代表16G內存空間。同時這裏面還做了一些搜索優化

然後當mheap沒有空間時,會向操作系統去申請,這部分代碼在mheap的grow函數中,會調用到pageAlloc的grow和sysGrow方法,內部會調用平台相關的sysUsed方法來向操作系統去申請內存。

mheap中還有一個要注意的地方,就是對mcentral的管理

//path: /usr/local/go/src/runtime/mheap.go

type mheap struct {
    lock mutex
    
    // spans: 指向mspans區域,用於映射mspan和page的關係
    spans []*mspan 
    
    // 指向bitmap首地址,bitmap是從高地址向低地址增長的
    bitmap uintptr 

    // 指示arena區首地址
    arena_start uintptr 
    
    // 指示arena區已使用地址位置
    arena_used  uintptr 
    
    // 指示arena區末地址
    arena_end   uintptr 

    central [67*2]struct {
        mcentral mcentral
        pad [sys.CacheLineSize - unsafe.Sizeof(mcentral{})%sys.CacheLineSize]byte
    }
}

首先注意到這裏的sys.CacheLineSize,根據這個對mcentral做空餘對齊,來防止CPU的偽共享緩存帶來的性能問題(關於偽共享緩存推薦看我的這篇文章: http://www.cnblogs.com/dojo-... )。

其次要注意到這裏mcentral的個數是67x2=134,也是針對有指針和無指針對象分別處理,提升垃圾回收效率,進而提升整體性能。

借用一下這張圖看的更清晰一些

總結來看通過細緻的對象劃分、極致的多級緩存+無鎖策略緩存、精確的位圖管理來進行精細化的內存管理和性能保障。

整個文章大概花費一個月時間,通過自己看源碼能夠發現,現有講解Go內存分配的資料要麼已經老舊、要麼人云亦云,還是獨立思考實踐最能揭露本質。

參考文章