OneFlow的大模型分片儲存和載入策略

語言: CN / TW / HK

撰文 |李響

1

大規模模型分片儲存簡介

在模型比較小時(如 100G 以下),還有可能採用單機儲存。當模型引數量比較大時,要求的樣本數也更大,訓練後做 dump 出來的模型也會很大,單機肯定放不下。

比如,由 DeepSpeed 和 Megatron 驅動的 Megatron 圖靈自然語言生成模型(MT-NLG)具有 5300 億個引數,是迄今為止訓練過的最大和最強大的單片 Transformer 語言模型,支援這樣的大規模語言模型需要分片儲存和載入,不會使用單機記憶體。此外,在其他 CV、搜尋、推薦和廣告類等場景下,讀取樣本量增多和模型複雜度增加都會帶來模型儲存上的難題。

本文將介紹 OneFlow 的大模型分片儲存、載入策略以及使用方法。

2

OneFlow 模型分片儲存和載入

OneFlow 的大模型分片儲存和載入的實現基於全域性視角(Global View, https://docs.oneflow.org/master/cookies/global_tensor.html )的概念,既利用 Placement 與 SBP 完成模型檔案(下文都用 state dict 表示)在各個物理裝置上的切分,適用於當模型大到無法在單個裝置的記憶體或視訊記憶體上容納下的場景。

flow.utils.global_view.to_global() 介面介紹

為了更好理解下文儲存模型和載入模型兩個部分的內容,首先對 flow.utils.global_view.to_global()介面和其實現思路進行分析。

區別於現有的 Tensor.to_global() 模式(可以處理普通的 Tensor, https://oneflow.readthedocs.io/en/master/generated/oneflow.Tensor.to_global.html?highlight=to_global%28%29 ),提供了多種型別的輸入支援,包括 None、Tensor、List、Tuple、nn.Module 的 state dict 、nn.Graph 的 state dict 和幾種型別的任意組合,既將 List/Tuple/Dict 中的輸入 Tensor 轉換為 Global Tensor。值得注意的是,其傳入引數中的 SBP 支援使用者自定義一個 (x, tensor) -> sbp的函式來解決不同 Tensor 對應不同 SBP 的需求。

並且,與 to_global() 對應的還有 flow.utils.global_view.to_local()介面。可以參考 API 文件中關於 to_global() 和 to_local() 更詳細的介紹( https://oneflow.readthedocs.io/en/master/utils.global_view.html )。在\ flow.utils.global_view.to_global() 的實現( https://github.com/Oneflow-Inc/oneflow/blob/master/python/oneflow/utils/global_view/to_global.py )中,支援了多種輸入型別適用於現有的Tensor.to_global() 介面。實現的整體思路大致為檢查輸入、廣播(空)結構,遍歷節點、呼叫回撥函式和返回 to_global() 後的結果。

再回到我們關注的地方,這個介面如何做到模型分片儲存和載入?

比如對於模型並行/流水並行,模型的引數分散在多個 Rank 上,在儲存模型前通過\ flow.utils.global_view.to_global() 將 state dict 裡的每個 Tensor 在指定 Placement 上轉為 Global Tensor,SBP 的型別為flow.sbp.split,可以設定在特定維度上的切分。同樣的,模型也可以按 Split 被載入。當然,SBP 也可以為 Broadcast,支援不同的 SBP 和 Placement 組合。這樣,超大規模模型分片儲存的問題就被非常好地解決了。

儲存模型

大致瞭解 flow.utils.global_view.to_global() 介面後,在這一部分演示瞭如何分片儲存模型,程式碼如下:

```

自定義 get_sbp 函式。

def get_sbp(state_dict, tensor): if tensor is state_dict["System-Train-TrainStep"]: return flow.sbp.broadcast if tensor is state_dict["module_pipeline"]["m_stage3.linear.weight"]: return flow.sbp.split(1) if tensor is state_dict["module_pipeline"]["m_stage3.linear.bias"]: return flow.sbp.broadcast return flow.sbp.split(0)

model_file_state_dict = flow.utils.global_view.to_global( state_dict, placement=model_file_placement, sbp=get_sbp, ) # 使用 sbp=get_sbp 處理特殊的鍵,也支援指定普通的 SBP。

rank_id = flow.env.get_rank()

儲存模型分片的路徑,一個 rank 對應一個路徑。

state_dict_dir = "./graph_save_load_global_" + str(rank_id)

if flow.env.get_rank() in model_file_placement.ranks: flow.save( flow.utils.global_view.to_local(model_file_state_dict), state_dict_dir, ) ```

首先,將原模型(state_dict)轉化到模型檔案的 Placement 和 SBP 上,model_file_placement 為要分片儲存模型的裝置陣列,也就是將 state dict 按 split(0) 分片到 model_file_placement 上。

這裡之所以自定義 get_sbp 函式,是因為使用者可以傳進來一個 (x, tensor) -> sbp 的函式來解決特殊 Tensor 對應不同 SBP 的需求。

舉個例子(當前例子基於 Graph 模式),對於 state_dict["System-Train-TrainStep"] 這種 shape 為 [1] 的 Tensor,我們就不能按 split(0) 分片了,SBP 可以選用 broadcast。而 state_dict["module_pipeline"]["m_stage3.linear.weight"] 只能在第 1 維度切分,對於 state_dict["module_pipeline"]["m_stage3.linear.bias"] 這種不可切分的小 Tensor(s),SBP 可以選用 broadcast。這樣支援使用者 DIY SBP 的處理,更加靈活。

在後面的處理中,使用 flow.utils.global_view.to_local() 介面得到 model_file_state_dict 的本地分量,並呼叫 save() 儲存模型。其中,state_dict_dir 是帶有裝置 id 的目錄,需要區分不同裝置,推薦一個 rank 對應一個路徑,路徑名用 rank id 的方式。

載入模型

在指定裝置上分片儲存模型後,載入模型的程式碼如下:

``` if cur_rank in model_file_placement.ranks: local_state_dict = flow.load(state_dict_dir) else: local_state_dict = None

global_state_dict = flow.utils.global_view.to_global( local_state_dict, placement=model_file_placement, sbp=get_sbp, ) graph_model.load_state_dict(global_state_dict) ```

首先,用 load() 方法在每個儲存切片的裝置上載入 state dict。對應的,需要把 local rank 上的 state dict 轉換到模型檔案的 placement 和 sbp 上,得到了 global_state_dict。這一步和儲存模型應該是對應的,SBP 和 Placement 也是一致的。

最後,global_state_dict 可以成功載入到 graph_model(nn.Graph) 中。當然,nn.Module 和 nn.Graph 處理方法是一致的。

將 state dict 載入到 nn.Module 中

除了以上兩個特徵外,在將 state dict 載入到 nn.Module 時,OneFlow 提供了 SBP 和 Placement 的自動轉換。

在下面的例子中,首先構造一個 m(nn.Module)物件,再將 global_state_dict 的 SBP 設定為 split(0),而 m 的 SBP 為 broadcast。同時 placement 也放生了變化,從 placement("cpu", ranks=[0, 1]) 到 flow.placement("cpu", ranks=[0])。這時使用者不需要其他操作,OneFlow 會自動做 SBP 和 placement 的轉換過程。

``` import oneflow as flow

m = flow.nn.Linear(2,6) model_file_placement = flow.placement("cpu", ranks=[0, 1])

state_dict = {"weight":flow.ones(3,2), "bias":flow.zeros(3)} global_state_dict = flow.utils.global_view.to_global( state_dict, placement=model_file_placement, sbp=flow.sbp.split(0), )

m.to_global(placement=flow.placement("cpu", ranks=[0]), sbp=flow.sbp.broadcast) m.load_state_dict(global_state_dict) print(m.state_dict()) ```

使用 2 卡執行上面的程式碼,可以看到,我們自己構造的字典中的全域性張量,已經被載入到 m Module 中。此外,輸出 OrderedDict 的 tensor 的 SBP 已經從 split(0) 自動轉換為 broadcast,'weight' 對應 tensor 的形狀也是我們期待的 [6, 2],'bias' 形狀為 [6]。

OrderedDict([('weight', tensor([[1., 1.], [1., 1.], [1., 1.], [1., 1.], [1., 1.], [1., 1.]], placement=oneflow.placement(type="cpu", ranks=[0]), sbp=(oneflow.sbp.broadcast,), dtype=oneflow.float32, requires_grad=True)), ('bias', tensor([0., 0., 0., 0., 0., 0.], placement=oneflow.placement(type="cpu", ranks=[0]), sbp=(oneflow.sbp.broadcast,), dtype=oneflow.float32, requires_grad=True))])

3

一個完整示例

上面演示瞭如何分片儲存和載入模型。在這一部分,提供一份完整的程式碼參考,下面的例子為 4 個 ranks 上的流水並行,模擬了模型分片儲存和載入的過程。

``` import os import numpy as np

import oneflow as flow

model_tensor_placement = flow.placement("cuda", ranks=[0, 1, 2, 3])

model_file_placement 為儲存模型分片的裝置的 placement,表示在 Rank 2 和 Rank 3 上可為 None。

model_file_placement = flow.placement("cpu", ranks=[0, 1]) P0 = flow.placement(model_tensor_placement.type, ranks=[0]) P1 = flow.placement(model_tensor_placement.type, ranks=[1]) P2 = flow.placement(model_tensor_placement.type, ranks=[2]) P3 = flow.placement(model_tensor_placement.type, ranks=[3])

def get_sbp(state_dict, tensor): if tensor is state_dict["System-Train-TrainStep"]: return flow.sbp.broadcast if tensor is state_dict["module_pipeline"]["m_stage3.linear.weight"]: return flow.sbp.split(1) if tensor is state_dict["module_pipeline"]["m_stage3.linear.bias"]: return flow.sbp.broadcast return flow.sbp.split(0)

class Stage0Module(flow.nn.Module): def init(self): super().init() self.linear = flow.nn.Linear(16, 8) self.relu = flow.nn.ReLU()

def forward(self, x):
    return self.relu(self.linear(x))

class Stage1Module(flow.nn.Module): def init(self): super().init() self.linear = flow.nn.Linear(8, 4) self.relu = flow.nn.ReLU()

def forward(self, x):
    return self.relu(self.linear(x))

class Stage2Module(flow.nn.Module): def init(self): super().init() self.linear = flow.nn.Linear(4, 2) self.relu = flow.nn.ReLU()

def forward(self, x):
    return self.relu(self.linear(x))

class Stage3Module(flow.nn.Module): def init(self): super().init() self.linear = flow.nn.Linear(2, 1)

def forward(self, x):
    return self.linear(x)

模擬 4 個 ranks 上的流水並行

class PipelineModule(flow.nn.Module): def init(self): super().init() self.m_stage0 = Stage0Module() self.m_stage1 = Stage1Module() self.m_stage2 = Stage2Module() self.m_stage3 = Stage3Module()

    self.m_stage0.to_global(placement=P0, sbp=flow.sbp.broadcast)
    self.m_stage1.to_global(placement=P1, sbp=flow.sbp.broadcast)
    self.m_stage2.to_global(placement=P2, sbp=flow.sbp.broadcast)
    self.m_stage3.to_global(placement=P3, sbp=flow.sbp.broadcast)

def forward(self, x):
    out_stage0 = self.m_stage0(x)

    in_stage1 = out_stage0.to_global(placement=P1, sbp=flow.sbp.broadcast)
    out_stage1 = self.m_stage1(in_stage1)

    in_stage2 = out_stage1.to_global(placement=P2, sbp=flow.sbp.broadcast)
    out_stage2 = self.m_stage2(in_stage2)

    in_stage3 = out_stage2.to_global(placement=P3, sbp=flow.sbp.broadcast)
    out_stage3 = self.m_stage3(in_stage3)

    return out_stage3

class PipelineGraph(flow.nn.Graph): def init(self, module_pipeline): super().init() self.module_pipeline = module_pipeline self.module_pipeline.m_stage0.config.set_stage(0, P0) self.module_pipeline.m_stage1.config.set_stage(1, P1) self.module_pipeline.m_stage2.config.set_stage(2, P2) self.module_pipeline.m_stage3.config.set_stage(3, P3) self.config.set_gradient_accumulation_steps(2) self.add_optimizer( flow.optim.SGD(self.module_pipeline.parameters(), lr=0.001) )

def build(self, x):
    out = self.module_pipeline(x)
    out = out.sum()
    out.backward()
    return out

def train_with_graph(call_cnt=0, state_dict_dir=None, last_state_dict=None): # 形狀為 [2, 16] 的固定輸入張量 x = flow.tensor( [ [ 0.4286, 0.7402, 0.4161, 0.6103, 0.7394, 1.1330, -0.2311, -0.1013, 0.8537, 0.9757, -0.9842, 0.3839, -0.5551, -0.8832, 0.7820, 0.7421, ], [ -0.1581, -1.0319, 1.8430, 0.3576, 0.7288, -0.6912, 0.9966, 1.0840, -1.1760, 1.5683, -0.2098, -1.6439, -2.7049, 0.1949, 1.6377, 0.0745, ], ], dtype=flow.float32, placement=P0, sbp=flow.sbp.broadcast, )

module_pipeline = PipelineModule()
graph_model = PipelineGraph(module_pipeline)
cur_rank = flow.env.get_rank()

if call_cnt == 1:
    if cur_rank in model_file_placement.ranks:
        local_state_dict = flow.load(state_dict_dir)
    else:
        local_state_dict = None

    # 使用 sbp=get_sbp 處理特殊的鍵
    global_state_dict = flow.utils.global_view.to_global(
        local_state_dict, placement=model_file_placement, sbp=get_sbp,
    )
    graph_model.load_state_dict(global_state_dict)

graph_model(x)
state_dict = graph_model.state_dict()

if call_cnt == 0:
    model_file_state_dict = flow.utils.global_view.to_global(
        state_dict, placement=model_file_placement, sbp=get_sbp,
    )
    if flow.env.get_rank() in model_file_placement.ranks:
        flow.save(
            flow.utils.global_view.to_local(model_file_state_dict),
            state_dict_dir,
        )

if name=="main": rank_id = flow.env.get_rank() # 儲存路徑,一個 rank 對應一個路徑。 state_dict_dir = "./graph_save_load_global_" + str(rank_id) # 儲存模型 train_with_graph(0, state_dict_dir) # 載入模型 train_with_graph(1, state_dict_dir) ```

4

結語

本文從簡單介紹大規模模型分片儲存開始,最終演示了 OneFlow 的如何做模型分片儲存和載入的過程,後續 OneFlow 的大模型分片儲存的介面還會不斷完善。

歡迎下載體驗 OneFlow v0.8.0 最新版本: https://github.com/Oneflow-Inc/oneflow/